Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] do not delay RT topic deletion if the store is no longer hybrid #1234

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -93,6 +96,91 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
String storeName = Utils.getUniqueString("testRTTopicDeletion");
String clusterName = CLUSTER_NAMES[0];
String rtTopicName = Version.composeRealTimeTopic(storeName);
PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
ControllerClient parentControllerClient =
ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs);

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(1)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// change store from hybrid to batch-only
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1);
TestWriteUtils.updateStore(storeName, parentControllerClient, params);

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
// at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic
// should not get deleted
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

// now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService
for (TopicManager topicManager: topicManagers) {
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
true,
true,
() -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic)));
}

/*
todo - RT topic is not deleted in parent colo yet.
we need to probably modify TopicCleanupServiceForParentController and check if all the child colos have deleted
RT topic, then it can also be deleted from parent colo. I believe we can extend it to non RT topics as well and
then can get rid of `storeToCountdownForDeletion`

TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> {
Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic));
}
);
*/
}

@Test(timeOut = TEST_TIMEOUT)
public void testUpdateStore() {
String clusterName = CLUSTER_NAMES[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ String getNativeReplicationSourceFabric(

TopicManager getTopicManager(String pubSubServerAddress);

boolean canDeleteRTTopic(String clusterName, String storeName);

/**
* Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be
* either a parent controller or a child controller since a cluster must have a leader child controller and a leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3507,8 +3507,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver
}
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) {
store = resources.getStoreMetadataRepository().getStore(storeName);
safeDeleteRTTopic(clusterName, storeName, store);
safeDeleteRTTopic(clusterName, storeName);
}
}
}
Expand All @@ -3523,17 +3522,26 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi
}
}

private void safeDeleteRTTopic(String clusterName, String storeName, Store store) {
private void safeDeleteRTTopic(String clusterName, String storeName) {
Store store = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository().getStore(storeName);
if (canDeleteRTTopic(clusterName, storeName, store)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the conditions for RT topic deletion exist in multiple places, e.g. L3509, canDeleteRTTopic. is it possible to merge these conditions into a single function for simplicity?

deleteRTTopic(clusterName, storeName);
} else {
LOGGER.warn("Topic deletion for topic: {} is delayed.", Version.composeRealTimeTopic(storeName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it easy to log the reason for delayed deletion?

}
}

public boolean canDeleteRTTopic(String clusterName, String storeName, Store store) {
// Perform RT cleanup checks for batch only store that used to be hybrid. Check the local versions first
// to see if any version is still using RT and then also check other fabrics before deleting the RT. Since
// we perform this check everytime when a store version is deleted we can afford to do best effort
// approach if some fabrics are unavailable or out of sync (temporarily).
boolean canDeleteRT = !Version.containsHybridVersion(store.getVersions());
return !Version.containsHybridVersion(store.getVersions()) && canDeleteRTTopic(clusterName, storeName);
}

public boolean canDeleteRTTopic(String clusterName, String storeName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename the function somehow to reflect the fact that we are querying the child controllers for their opinions?

Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
for (Map.Entry<String, ControllerClient> controllerClientEntry: controllerClientMap.entrySet()) {
if (!canDeleteRT) {
return;
}
StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName);
if (storeResponse.isError()) {
LOGGER.warn(
Expand All @@ -3542,24 +3550,29 @@ private void safeDeleteRTTopic(String clusterName, String storeName, Store store
clusterName,
controllerClientEntry.getKey(),
storeResponse.getError());
return;
return false;
}
if (Version.containsHybridVersion(storeResponse.getStore().getVersions())) {
return false;
}
canDeleteRT = !Version.containsHybridVersion(storeResponse.getStore().getVersions());
}
if (canDeleteRT) {
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
truncateKafkaTopic(rtTopicToDelete);
return true;
}

private void deleteRTTopic(String clusterName, String storeName) {
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName);
String rtTopicToDelete = Version.composeRealTimeTopic(storeName);
truncateKafkaTopic(rtTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(rtTopicToDelete);
}
Comment on lines +3565 to +3568
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines are duplicated later. Can we extract them into a small helper function?

// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
truncateKafkaTopic(incrementalPushRTTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(rtTopicToDelete);
}
// Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to
// delete it.
String incrementalPushRTTopicToDelete = Version.composeSeparateRealTimeTopic(storeName);
if (getTopicManager().containsTopic(pubSubTopicRepository.getTopic(incrementalPushRTTopicToDelete))) {
truncateKafkaTopic(incrementalPushRTTopicToDelete);
for (ControllerClient controllerClient: controllerClientMap.values()) {
controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete);
}
controllerClient.deleteKafkaTopic(incrementalPushRTTopicToDelete);
}
}
}
Expand Down Expand Up @@ -7366,7 +7379,7 @@ public void configureActiveActiveReplication(
if (storeName.isPresent()) {
/**
* Legacy stores venice_system_store_davinci_push_status_store_<cluster_name> still exist.
* But {@link com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* But {@link HelixReadOnlyStoreRepositoryAdapter#getStore(String)} cannot find
* them by store names. Skip davinci push status stores until legacy znodes are cleaned up.
*/
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3827,6 +3827,11 @@ public TopicManager getTopicManager(String pubSubServerAddress) {
return getVeniceHelixAdmin().getTopicManager(pubSubServerAddress);
}

@Override
public boolean canDeleteRTTopic(String clusterName, String storeName) {
return false;
}

/**
* @see VeniceHelixAdmin#isLeaderControllerFor(String)
*/
Expand Down
Loading
Loading