Skip to content

Commit

Permalink
check all versions of a store from all colos and block rt topic delet…
Browse files Browse the repository at this point in the history
…ion if any version is still hybrid
  • Loading branch information
arjun4084346 committed Oct 24, 2024
1 parent a698f3e commit ddf8586
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ static boolean isPushIdRePush(String pushId) {

static boolean containsHybridVersion(List<Version> versions) {
for (Version version: versions) {
if (version.getHybridStoreConfig() != null) {
VersionStatus versionStatus = version.getStatus();
if (version.getHybridStoreConfig() != null && VersionStatus.isVersionErrored(versionStatus)
&& VersionStatus.isVersionKilled(versionStatus)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -93,6 +96,97 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
String storeName = Utils.getUniqueString("testRTTopicDeletion");
String clusterName = CLUSTER_NAMES[0];
String rtTopicName = Version.composeRealTimeTopic(storeName);
PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
ControllerClient parentControllerClient =
ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs);
ControllerClient[] controllerClients = new ControllerClient[childDatacenters.size() + 1];
controllerClients[0] = parentControllerClient;
for (int i = 0; i < childDatacenters.size(); i++) {
controllerClients[i + 1] =
new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString());
}

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(1)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// change store from hybrid to batch-only
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1);
TestWriteUtils.updateStore(storeName, parentControllerClient, params);

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
// at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic
// should not get deleted
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

// now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService
for (TopicManager topicManager: topicManagers) {
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
true,
true,
() -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic)));
}

/*
todo - RT topic is not deleted in parent colo yet.
we need to probably modify TopicCleanupServiceForParentController and check if all the child colos have deleted
RT topic, then it can also be deleted from parent colo. I believe we can extend it to non RT topics as well and
then can get rid of `storeToCountdownForDeletion`
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> {
Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic));
}
);
*/
}

@Test(timeOut = TEST_TIMEOUT)
public void testUpdateStore() {
String clusterName = CLUSTER_NAMES[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ String getNativeReplicationSourceFabric(

TopicManager getTopicManager(String pubSubServerAddress);

boolean canDeleteRTTopic(String clusterName, String storeName);

/**
* Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be
* either a parent controller or a child controller since a cluster must have a leader child controller and a leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3507,8 +3507,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver
}
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) {
store = resources.getStoreMetadataRepository().getStore(storeName);
safeDeleteRTTopic(clusterName, storeName, store);
safeDeleteRTTopic(clusterName, storeName);
}
}
}
Expand All @@ -3523,17 +3522,26 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi
}
}

private void safeDeleteRTTopic(String clusterName, String storeName, Store store) {
private void safeDeleteRTTopic(String clusterName, String storeName) {
Store store = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository().getStore(storeName);
if (canDeleteRTTopic(clusterName, storeName, store)) {
deleteRTTopic(clusterName, storeName);
} else {
LOGGER.warn("Topic deletion for topic: {} is delayed.", Version.composeRealTimeTopic(storeName));
}
}

public boolean canDeleteRTTopic(String clusterName, String storeName, Store store) {
// Perform RT cleanup checks for batch only store that used to be hybrid. Check the local versions first
// to see if any version is still using RT and then also check other fabrics before deleting the RT. Since
// we perform this check everytime when a store version is deleted we can afford to do best effort
// approach if some fabrics are unavailable or out of sync (temporarily).
boolean canDeleteRT = !Version.containsHybridVersion(store.getVersions());
return !Version.containsHybridVersion(store.getVersions()) && canDeleteRTTopic(clusterName, storeName);
}

public boolean canDeleteRTTopic(String clusterName, String storeName) {
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
for (Map.Entry<String, ControllerClient> controllerClientEntry: controllerClientMap.entrySet()) {
if (!canDeleteRT) {
return;
}
StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName);
if (storeResponse.isError()) {
LOGGER.warn(
Expand All @@ -3542,24 +3550,29 @@ private void safeDeleteRTTopic(String clusterName, String storeName, Store store
clusterName,
controllerClientEntry.getKey(),
storeResponse.getError());
return;
return false;
}
if (Version.containsHybridVersion(storeResponse.getStore().getVersions())) {
return false;
}
canDeleteRT = !Version.containsHybridVersion(storeResponse.getStore().getVersions());
}
if (canDeleteRT) {
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
truncateKafkaTopic(rtTopicToDelete);
return true;
}

private void deleteRTTopic(String clusterName, String storeName) {
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
truncateKafkaTopic(rtTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(rtTopicToDelete);
}
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
truncateKafkaTopic(incrementalPushRTTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(rtTopicToDelete);
}
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
truncateKafkaTopic(incrementalPushRTTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete);
}
controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete);
}
}
}
Expand Down Expand Up @@ -7366,7 +7379,7 @@ public void configureActiveActiveReplication(
if (storeName.isPresent()) {
/**
* Legacy stores venice_system_store_davinci_push_status_store_<cluster_name> still exist.
* But {@link com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* But {@link HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* them by store names. Skip davinci push status stores until legacy znodes are cleaned up.
*/
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3827,6 +3827,11 @@ public TopicManager getTopicManager(String pubSubServerAddress) {
return getVeniceHelixAdmin().getTopicManager(pubSubServerAddress);
}

@Override
public boolean canDeleteRTTopic(String clusterName, String storeName) {
return false;
}

/**
* @see VeniceHelixAdmin#isLeaderControllerFor(String)
*/
Expand Down
Loading

0 comments on commit ddf8586

Please sign in to comment.