diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java index 489200f156..04ecdf9711 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestParentControllerWithMultiDataCenter.java @@ -20,17 +20,20 @@ import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; import java.io.IOException; @@ -93,6 +96,91 @@ public void cleanUp() { Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); } + @Test + public void testRTTopicDeletionWithHybridAndIncrementalVersions() { + String storeName = Utils.getUniqueString("testRTTopicDeletion"); + String clusterName = CLUSTER_NAMES[0]; + String rtTopicName = Version.composeRealTimeTopic(storeName); + PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName); + String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); + ControllerClient parentControllerClient = + ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs); + + List topicManagers = new ArrayList<>(2); + topicManagers + .add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + topicManagers + .add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager()); + + NewStoreResponse newStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\"")); + Assert.assertFalse( + newStoreResponse.isError(), + "The NewStoreResponse returned an error: " + newStoreResponse.getError()); + + UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams(); + updateStoreParams.setIncrementalPushEnabled(true) + .setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) + .setNumVersionsToPreserve(1) + .setHybridRewindSeconds(1000) + .setHybridOffsetLagThreshold(1000); + TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams); + + // create new version by doing an empty push + parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); + } + + // create new version by doing an empty push + parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); + } + + // change store from hybrid to batch-only + UpdateStoreQueryParams params = new UpdateStoreQueryParams(); + params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1); + TestWriteUtils.updateStore(storeName, parentControllerClient, params); + + // create new version by doing an empty push + parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + // at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic + // should not get deleted + for (TopicManager topicManager: topicManagers) { + Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic)); + } + + // create new version by doing an empty push + parentControllerClient + .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); + + // now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService + for (TopicManager topicManager: topicManagers) { + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + true, + true, + () -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic))); + } + + /* + todo - RT topic is not deleted in parent colo yet. + we need to probably modify TopicCleanupServiceForParentController and check if all the child colos have deleted + RT topic, then it can also be deleted from parent colo. I believe we can extend it to non RT topics as well and + then can get rid of `storeToCountdownForDeletion` + + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> { + Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic)); + } + ); + */ + } + @Test(timeOut = TEST_TIMEOUT) public void testUpdateStore() { String clusterName = CLUSTER_NAMES[0]; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 7269235874..16dc66f3bd 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -532,6 +532,8 @@ String getNativeReplicationSourceFabric( TopicManager getTopicManager(String pubSubServerAddress); + boolean canDeleteRTTopic(String clusterName, String storeName); + /** * Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be * either a parent controller or a child controller since a cluster must have a leader child controller and a leader diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index db805e9600..7895e3ca39 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -3507,8 +3507,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver } PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) { - store = resources.getStoreMetadataRepository().getStore(storeName); - safeDeleteRTTopic(clusterName, storeName, store); + safeDeleteRTTopic(clusterName, storeName); } } } @@ -3523,17 +3522,26 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi } } - private void safeDeleteRTTopic(String clusterName, String storeName, Store store) { + private void safeDeleteRTTopic(String clusterName, String storeName) { + Store store = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository().getStore(storeName); + if (canDeleteRTTopic(clusterName, storeName, store)) { + deleteRTTopic(clusterName, storeName); + } else { + LOGGER.warn("Topic deletion for topic: {} is delayed.", Version.composeRealTimeTopic(storeName)); + } + } + + public boolean canDeleteRTTopic(String clusterName, String storeName, Store store) { // Perform RT cleanup checks for batch only store that used to be hybrid. Check the local versions first // to see if any version is still using RT and then also check other fabrics before deleting the RT. Since // we perform this check everytime when a store version is deleted we can afford to do best effort // approach if some fabrics are unavailable or out of sync (temporarily). - boolean canDeleteRT = !Version.containsHybridVersion(store.getVersions()); + return !Version.containsHybridVersion(store.getVersions()) && canDeleteRTTopic(clusterName, storeName); + } + + public boolean canDeleteRTTopic(String clusterName, String storeName) { Map controllerClientMap = getControllerClientMap(clusterName); for (Map.Entry controllerClientEntry: controllerClientMap.entrySet()) { - if (!canDeleteRT) { - return; - } StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName); if (storeResponse.isError()) { LOGGER.warn( @@ -3542,24 +3550,29 @@ private void safeDeleteRTTopic(String clusterName, String storeName, Store store clusterName, controllerClientEntry.getKey(), storeResponse.getError()); - return; + return false; + } + if (Version.containsHybridVersion(storeResponse.getStore().getVersions())) { + return false; } - canDeleteRT = !Version.containsHybridVersion(storeResponse.getStore().getVersions()); } - if (canDeleteRT) { - String rtTopicToDelete = Version.composeRealTimeTopic(storeName); - truncateKafkaTopic(rtTopicToDelete); + return true; + } + + private void deleteRTTopic(String clusterName, String storeName) { + Map controllerClientMap = getControllerClientMap(clusterName); + String rtTopicToDelete = Version.composeRealTimeTopic(storeName); + truncateKafkaTopic(rtTopicToDelete); + for (ControllerClient controllerClient: controllerClientMap.values()) { + controllerClient.deleteKafkaTopic(rtTopicToDelete); + } + // Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to + // delete it. + String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName); + if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) { + truncateKafkaTopic(incrementalPushRTTopicToDelete); for (ControllerClient controllerClient: controllerClientMap.values()) { - controllerClient.deleteKafkaTopic(rtTopicToDelete); - } - // Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to - // delete it. - String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName); - if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) { - truncateKafkaTopic(incrementalPushRTTopicToDelete); - for (ControllerClient controllerClient: controllerClientMap.values()) { - controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete); - } + controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete); } } } @@ -7366,7 +7379,7 @@ public void configureActiveActiveReplication( if (storeName.isPresent()) { /** * Legacy stores venice_system_store_davinci_push_status_store_ still exist. - * But {@link com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find + * But {@link HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find * them by store names. Skip davinci push status stores until legacy znodes are cleaned up. */ VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName.get()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index ab579a00eb..ccbd7d0b88 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -3827,6 +3827,11 @@ public TopicManager getTopicManager(String pubSubServerAddress) { return getVeniceHelixAdmin().getTopicManager(pubSubServerAddress); } + @Override + public boolean canDeleteRTTopic(String clusterName, String storeName) { + return false; + } + /** * @see VeniceHelixAdmin#isLeaderControllerFor(String) */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java index b48db2de4b..fdeecb1e03 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/TopicCleanupService.java @@ -76,14 +76,10 @@ public class TopicCleanupService extends AbstractVeniceService { private final int minNumberOfUnusedKafkaTopicsToPreserve; private final AtomicBoolean stop = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); - private final Set childRegions; - private final Map> multiDataCenterStoreToVersionTopicCount; private final PubSubTopicRepository pubSubTopicRepository; private final TopicCleanupServiceStats topicCleanupServiceStats; - private String localDatacenter; private boolean isRTTopicDeletionBlocked = false; private boolean isLeaderControllerOfControllerCluster = false; - private long refreshQueueCycle = Time.MS_PER_MINUTE; protected final VeniceControllerMultiClusterConfig multiClusterConfigs; private PubSubAdminAdapter sourceOfTruthPubSubAdminAdapter; @@ -109,23 +105,14 @@ public TopicCleanupService( this.multiClusterConfigs = multiClusterConfigs; this.pubSubTopicRepository = pubSubTopicRepository; this.topicCleanupServiceStats = topicCleanupServiceStats; - this.childRegions = multiClusterConfigs.getCommonConfig().getChildDatacenters(); - if (!admin.isParent()) { - // Only perform cross fabric VT check for RT deletion in child fabrics. - this.multiDataCenterStoreToVersionTopicCount = new HashMap<>(childRegions.size()); - for (String datacenter: childRegions) { - multiDataCenterStoreToVersionTopicCount.put(datacenter, new HashMap<>()); - } - } else { - this.multiDataCenterStoreToVersionTopicCount = Collections.emptyMap(); - } + Set childRegions = multiClusterConfigs.getCommonConfig().getChildDatacenters(); this.danglingTopicCleanupIntervalMs = Time.MS_PER_SECOND * multiClusterConfigs.getDanglingTopicCleanupIntervalSeconds(); - PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory = + PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory = multiClusterConfigs.getSourceOfTruthAdminAdapterFactory(); - PubSubAdminAdapterFactory pubSubAdminAdapterFactory = pubSubClientsFactory.getAdminAdapterFactory(); + PubSubAdminAdapterFactory pubSubAdminAdapterFactory = pubSubClientsFactory.getAdminAdapterFactory(); this.danglingTopicOccurrenceCounter = new HashMap<>(); this.danglingTopicOccurrenceThresholdForCleanup = multiClusterConfigs.getDanglingTopicOccurrenceThresholdForCleanup(); @@ -135,10 +122,26 @@ public TopicCleanupService( } else { this.sourceOfTruthPubSubAdminAdapter = null; } + + if (!this.admin.isParent()) { + String localPubSubBootstrapServer = getTopicManager().getPubSubClusterAddress(); + String localDatacenter = childRegions.stream() + .filter( + childFabric -> localPubSubBootstrapServer + .equals(multiClusterConfigs.getChildDataCenterKafkaUrlMap().get(childFabric))) + .findFirst() + .orElse(null); + if (localDatacenter == null) { + this.isRTTopicDeletionBlocked = true; + LOGGER.error( + "Blocking RT topic deletion. Cannot find local datacenter in child datacenter list: {}", + childRegions); + } + } } private PubSubAdminAdapter constructSourceOfTruthPubSubAdminAdapter( - PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory) { + PubSubAdminAdapterFactory sourceOfTruthAdminAdapterFactory) { VeniceProperties veniceProperties = admin.getPubSubSSLProperties(getTopicManager().getPubSubClusterAddress()); return sourceOfTruthAdminAdapterFactory.create(veniceProperties, pubSubTopicRepository); } @@ -180,7 +183,7 @@ boolean isStopped() { return this.stopped.get(); } - TopicManager getTopicManager() { + final TopicManager getTopicManager() { return admin.getTopicManager(); } @@ -235,49 +238,46 @@ Admin getAdmin() { * If version topic deletion takes more than certain time it refreshes the entire topic list and start deleting from RT topics again. */ void cleanupVeniceTopics() { + // todo - instead of adding all topics into the same queue we must simply use separate queues for RT and nonRT + // topics; this will save a lot of time wasted in sorting PriorityQueue allTopics = new PriorityQueue<>(topicPriorityComparator); populateDeprecatedTopicQueue(allTopics); topicCleanupServiceStats.recordDeletableTopicsCount(allTopics.size()); long refreshTime = System.currentTimeMillis(); + while (!allTopics.isEmpty()) { PubSubTopic topic = allTopics.poll(); + String storeName = topic.getStoreName(); try { - if (topic.isRealTime() && !multiDataCenterStoreToVersionTopicCount.isEmpty()) { - // Only delete realtime topic in child fabrics if all version topics are deleted in all child fabrics. - if (isRTTopicDeletionBlocked) { + if (topic.isRealTime() && (this.isRTTopicDeletionBlocked + || !admin.canDeleteRTTopic(admin.discoverCluster(storeName).getFirst(), storeName))) { + if (this.isRTTopicDeletionBlocked) { LOGGER.warn( "Topic deletion for topic: {} is blocked due to unable to fetch version topic info", topic.getName()); - topicCleanupServiceStats.recordTopicDeletionError(); - continue; - } - boolean canDelete = true; - for (Map.Entry> mapEntry: multiDataCenterStoreToVersionTopicCount.entrySet()) { - if (mapEntry.getValue().containsKey(topic.getStoreName())) { - canDelete = false; - LOGGER.info( - "Topic deletion for topic: {} is delayed due to {} version topics found in datacenter {}", - topic.getName(), - mapEntry.getValue().get(topic.getStoreName()), - mapEntry.getKey()); - break; - } - } - if (!canDelete) { - continue; + } else { + LOGGER.warn("Topic deletion for topic: {} is delayed.", topic.getName()); } + topicCleanupServiceStats.recordTopicDeletionError(); + continue; } - getTopicManager().ensureTopicIsDeletedAndBlockWithRetry(topic); - topicCleanupServiceStats.recordTopicDeleted(); + } catch (VeniceNoStoreException e) { + LOGGER.warn( + "Store {} not found. Exception when trying to delete topic: {} - {}", + storeName, + topic.getName(), + e.toString()); } catch (VeniceException e) { LOGGER.warn("Caught exception when trying to delete topic: {} - {}", topic.getName(), e.toString()); topicCleanupServiceStats.recordTopicDeletionError(); // No op, will try again in the next cleanup cycle. } - + topicCleanupServiceStats.recordTopicDeleted(); + getTopicManager().ensureTopicIsDeletedAndBlockWithRetry(topic); if (!topic.isRealTime()) { // If Version topic deletion took long time, skip further VT deletion and check if we have new RT topic to // delete + long refreshQueueCycle = Time.MS_PER_MINUTE; if (System.currentTimeMillis() - refreshTime > refreshQueueCycle) { allTopics.clear(); populateDeprecatedTopicQueue(allTopics); @@ -293,7 +293,6 @@ void cleanupVeniceTopics() { private void populateDeprecatedTopicQueue(PriorityQueue topics) { Map topicsWithRetention = getTopicManager().getAllTopicRetentions(); Map> allStoreTopics = getAllVeniceStoreTopicsRetentions(topicsWithRetention); - AtomicBoolean realTimeTopicDeletionNeeded = new AtomicBoolean(false); allStoreTopics.forEach((storeName, topicRetentions) -> { int minNumOfUnusedVersionTopicsOverride = minNumberOfUnusedKafkaTopicsToPreserve; PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); @@ -301,7 +300,6 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { if (admin.isTopicTruncatedBasedOnRetention(topicRetentions.get(realTimeTopic))) { topics.offer(realTimeTopic); minNumOfUnusedVersionTopicsOverride = 0; - realTimeTopicDeletionNeeded.set(true); } topicRetentions.remove(realTimeTopic); } @@ -311,9 +309,6 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { topics.addAll(oldTopicsToDelete); } }); - if (realTimeTopicDeletionNeeded.get() && !multiDataCenterStoreToVersionTopicCount.isEmpty()) { - refreshMultiDataCenterStoreToVersionTopicCountMap(topicsWithRetention.keySet()); - } // Check if there are dangling topics to be deleted. if (sourceOfTruthPubSubAdminAdapter != null @@ -327,60 +322,6 @@ private void populateDeprecatedTopicQueue(PriorityQueue topics) { } } - private void refreshMultiDataCenterStoreToVersionTopicCountMap(Set localTopics) { - if (localDatacenter == null) { - String localPubSubBootstrapServer = getTopicManager().getPubSubClusterAddress(); - for (String childFabric: childRegions) { - if (localPubSubBootstrapServer.equals(multiClusterConfigs.getChildDataCenterKafkaUrlMap().get(childFabric))) { - localDatacenter = childFabric; - break; - } - } - if (localDatacenter == null) { - String childFabrics = String.join(",", childRegions); - LOGGER.error( - "Blocking RT topic deletion. Cannot find local datacenter in child datacenter list: {}", - childFabrics); - isRTTopicDeletionBlocked = true; - return; - } - } - clearAndPopulateStoreToVersionTopicCountMap( - localTopics, - multiDataCenterStoreToVersionTopicCount.get(localDatacenter)); - if (childRegions.size() > 1) { - for (String childFabric: childRegions) { - try { - if (childFabric.equals(localDatacenter)) { - continue; - } - String pubSubBootstrapServer = multiClusterConfigs.getChildDataCenterKafkaUrlMap().get(childFabric); - Set remoteTopics = getTopicManager(pubSubBootstrapServer).listTopics(); - clearAndPopulateStoreToVersionTopicCountMap( - remoteTopics, - multiDataCenterStoreToVersionTopicCount.get(childFabric)); - } catch (Exception e) { - LOGGER.error("Failed to refresh store to version topic count map for fabric {}", childFabric, e); - isRTTopicDeletionBlocked = true; - return; - } - } - } - isRTTopicDeletionBlocked = false; - } - - private static void clearAndPopulateStoreToVersionTopicCountMap( - Set topics, - Map storeToVersionTopicCountMap) { - storeToVersionTopicCountMap.clear(); - for (PubSubTopic topic: topics) { - String storeName = topic.getStoreName(); - if (!storeName.isEmpty() && topic.isVersionTopic()) { - storeToVersionTopicCountMap.merge(storeName, 1, Integer::sum); - } - } - } - /** * @return a map object that maps from the store name to the Kafka topic name and its configured Kafka retention time. */ @@ -456,7 +397,7 @@ public static List extractVersionTopicsToCleanup( /** * Filter out resources, which haven't been fully removed in child fabrics yet. This is only performed in the * child fabric because parent fabric don't have storage node helix resources. - * + *

* The reason to filter out still-alive resource is to avoid triggering the non-existing topic issue * of Kafka consumer happening in Storage Node. */ diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java index 52e3fe2020..c6e24fbd3c 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupService.java @@ -22,6 +22,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository; +import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.Version; @@ -43,7 +44,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -190,12 +190,14 @@ public void testExtractVeniceTopicsToCleanup() { } @Test - public void testCleanupVeniceTopics() throws ExecutionException { + public void testCleanupVeniceTopics() { String storeName1 = Utils.getUniqueString("store1"); String storeName2 = Utils.getUniqueString("store2"); String storeName3 = Utils.getUniqueString("store3"); String storeName4 = Utils.getUniqueString("store4"); String storeName5 = Utils.getUniqueString("store5"); + String storeName6 = Utils.getUniqueString("store6"); + Map storeTopics = new HashMap<>(); storeTopics.put(getPubSubTopic(storeName1, "_v1"), 1000L); storeTopics.put(getPubSubTopic(storeName1, "_v2"), 1000L); @@ -208,6 +210,9 @@ public void testCleanupVeniceTopics() throws ExecutionException { storeTopics.put(getPubSubTopic(storeName3, "_v100"), Long.MAX_VALUE); storeTopics.put(getPubSubTopic(storeName4, "_rt"), Long.MAX_VALUE); storeTopics.put(getPubSubTopic(storeName5, "_v1"), Long.MAX_VALUE); + storeTopics.put(getPubSubTopic(storeName6, "_rt"), 1000L); + storeTopics.put(getPubSubTopic(storeName6, "_v1"), Long.MAX_VALUE); + storeTopics.put(getPubSubTopic(storeName6, "_v2"), Long.MAX_VALUE); storeTopics.put(getPubSubTopic(PubSubTopicType.ADMIN_TOPIC_PREFIX, "_cluster"), Long.MAX_VALUE); Map storeTopics2 = new HashMap<>(); @@ -231,6 +236,8 @@ public void testCleanupVeniceTopics() throws ExecutionException { doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); + doReturn(true).when(admin).canDeleteRTTopic(anyString(), anyString()); doReturn(Optional.of(new StoreConfig(storeName1))).when(storeConfigRepository).getStoreConfig(storeName1); Set pubSubTopicSet = new HashSet<>(); @@ -247,22 +254,41 @@ public void testCleanupVeniceTopics() throws ExecutionException { topicCleanupService.setSourceOfTruthPubSubAdminAdapter(apacheKafkaAdminAdapter); String clusterName = "clusterName"; Pair pair = new Pair<>(clusterName, ""); - doReturn(pair).when(admin).discoverCluster(storeName3); - doReturn(pair).when(admin).discoverCluster(storeName4); + doReturn(pair).when(admin).discoverCluster(anyString()); doThrow(new VeniceNoStoreException(storeName5)).when(admin).discoverCluster(storeName5); + Store store2 = mock(Store.class); Store store3 = mock(Store.class); doReturn(false).when(store3).containsVersion(100); Store store4 = mock(Store.class); doReturn(false).when(store4).isHybrid(); - Version version = mock(Version.class); - doReturn(null).when(version).getHybridStoreConfig(); + Store store6 = mock(Store.class); + + Version batchVersion = mock(Version.class); + doReturn(null).when(batchVersion).getHybridStoreConfig(); + + doReturn(store2).when(admin).getStore(clusterName, storeName2); doReturn(store3).when(admin).getStore(clusterName, storeName3); doReturn(store4).when(admin).getStore(clusterName, storeName4); + doReturn(store6).when(admin).getStore(clusterName, storeName6); + doReturn(mock(Store.class)).when(admin).getStore(clusterName, storeName6); doReturn(pubSubTopicSet).when(apacheKafkaAdminAdapter).listAllTopics(); + Version hybridVersion = mock(Version.class); + doReturn(mock(HybridStoreConfig.class)).when(hybridVersion).getHybridStoreConfig(); + + doReturn(Collections.singletonList(hybridVersion)).when(store2).getVersions(); + doReturn(Collections.singletonList(hybridVersion)).when(store3).getVersions(); + doReturn(Collections.singletonList(batchVersion)).when(store6).getVersions(); + // simulating blocked delete + doReturn(false).when(admin).canDeleteRTTopic(anyString(), eq(storeName1)); + doReturn(false).when(admin).canDeleteRTTopic(anyString(), eq(storeName2)); + doReturn(false).when(admin).canDeleteRTTopic(anyString(), eq(storeName3)); + doReturn(true).when(admin).canDeleteRTTopic(anyString(), eq(storeName4)); + doReturn(true).when(admin).canDeleteRTTopic(anyString(), eq(storeName6)); + topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_rt")); @@ -271,17 +297,21 @@ public void testCleanupVeniceTopics() throws ExecutionException { verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v3")); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v4")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_v1")); + verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName6, "_rt")); // Delete should be blocked by local VT verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt")); // Delete should be blocked by remote VT verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt")); - verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(8); - verify(topicCleanupServiceStats, never()).recordTopicDeletionError(); + verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(9); + verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeletionError(); verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeleted(); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_v100")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName4, "_rt")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName5, "_v1")); + + doReturn(true).when(admin).canDeleteRTTopic(anyString(), eq(storeName2)); + doReturn(true).when(admin).canDeleteRTTopic(anyString(), eq(storeName3)); topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName1, "_v3")); @@ -289,7 +319,7 @@ public void testCleanupVeniceTopics() throws ExecutionException { verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName2, "_rt")); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName3, "_rt")); verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(2); - verify(topicCleanupServiceStats, never()).recordTopicDeletionError(); + verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeletionError(); } private PubSubTopic getPubSubTopic(String storeName, String suffix) { @@ -330,6 +360,7 @@ public void testRun() throws Exception { doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); doReturn(true).when(admin).isLeaderControllerOfControllerCluster(); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); // Resource is still alive doReturn(true).when(admin).isResourceStillAlive(storeName2 + "_v2"); @@ -482,16 +513,18 @@ public void testCleanVeniceTopicRTTopicDeletionWithErrorFetchingVT() { doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); doReturn(storeTopics).when(topicManager).getAllTopicRetentions(); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); + doReturn(false).when(admin).canDeleteRTTopic(anyString(), anyString()); doReturn(Optional.of(new StoreConfig(storeName))).when(storeConfigRepository).getStoreConfig(storeName); when(remoteTopicManager.listTopics()).thenThrow(new VeniceException("test")).thenReturn(storeTopics.keySet()); topicCleanupService.cleanupVeniceTopics(); verify(topicManager, never()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName, "_rt")); - verify(remoteTopicManager, atLeastOnce()).listTopics(); verify(topicCleanupServiceStats, atLeastOnce()).recordDeletableTopicsCount(1); verify(topicCleanupServiceStats, atLeastOnce()).recordTopicDeletionError(); + doReturn(true).when(admin).canDeleteRTTopic(anyString(), anyString()); topicCleanupService.cleanupVeniceTopics(); verify(topicManager, atLeastOnce()).ensureTopicIsDeletedAndBlockWithRetry(getPubSubTopic(storeName, "_rt")); @@ -511,6 +544,8 @@ public void testCleanVeniceTopicOnlyFetchVTOnRTTopicDeletion() { doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(1000L); doReturn(false).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(Long.MAX_VALUE)); doReturn(true).when(admin).isTopicTruncatedBasedOnRetention(any(), eq(1000L)); + doReturn(Pair.create("cluster0", null)).when(admin).discoverCluster(any()); + doReturn(true).when(admin).canDeleteRTTopic(anyString(), anyString()); when(topicManager.getAllTopicRetentions()).thenReturn(storeTopics1).thenReturn(storeTopics2); doReturn(storeTopics2).when(remoteTopicManager).getAllTopicRetentions(); @@ -519,8 +554,6 @@ public void testCleanVeniceTopicOnlyFetchVTOnRTTopicDeletion() { verify(remoteTopicManager, never()).getAllTopicRetentions(); topicCleanupService.cleanupVeniceTopics(); - - verify(remoteTopicManager, atLeastOnce()).listTopics(); } @Test diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java index 60170dcc27..f4f3ce5ae3 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForMultiKafkaClusters.java @@ -59,6 +59,7 @@ public void setUp() { doReturn(kafkaUrlMap).when(config).getChildDataCenterKafkaUrlMap(); admin = mock(Admin.class); + doReturn(true).when(admin).isParent(); topicManager1 = mock(TopicManager.class); doReturn(kafkaClusterServerUrl1).when(topicManager1).getPubSubClusterAddress(); doReturn(topicManager1).when(admin).getTopicManager(kafkaClusterServerUrl1); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java index 0c4a987da5..4893b51587 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/TestTopicCleanupServiceForParentController.java @@ -32,6 +32,7 @@ public class TestTopicCleanupServiceForParentController { @BeforeTest public void setUp() { admin = mock(Admin.class); + doReturn(true).when(admin).isParent(); topicManager = mock(TopicManager.class); doReturn(topicManager).when(admin).getTopicManager(); VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class);