Skip to content

Commit

Permalink
check all versions of a store from all colos and block rt topic delet…
Browse files Browse the repository at this point in the history
…ion if any version is still hybrid
  • Loading branch information
arjun4084346 committed Oct 24, 2024
1 parent 1ebd6fa commit 9ec9fce
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 405 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.meta;

import static java.lang.Character.isDigit;
import static java.lang.Character.*;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
Expand Down Expand Up @@ -422,7 +422,9 @@ static boolean isPushIdRePush(String pushId) {

static boolean containsHybridVersion(List<Version> versions) {
for (Version version: versions) {
if (version.getHybridStoreConfig() != null) {
VersionStatus versionStatus = version.getStatus();
if (version.getHybridStoreConfig() != null && VersionStatus.isVersionErrored(versionStatus)
&& VersionStatus.isVersionKilled(versionStatus)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
package com.linkedin.venice.pubsub.manager;

import static com.linkedin.venice.pubsub.PubSubConstants.CREATE_TOPIC_RETRIABLE_EXCEPTIONS;
import static com.linkedin.venice.pubsub.PubSubConstants.DEFAULT_TOPIC_RETENTION_POLICY_MS;
import static com.linkedin.venice.pubsub.PubSubConstants.ETERNAL_TOPIC_RETENTION_POLICY_MS;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_FAST_OPERATION_TIMEOUT_MS;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_DELETE_RETRY_TIMES;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_UNKNOWN_RETENTION;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.CONTAINS_TOPIC_WITH_RETRY;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.CREATE_TOPIC;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.DELETE_TOPIC;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.GET_ALL_TOPIC_RETENTIONS;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.GET_SOME_TOPIC_CONFIGS;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.GET_TOPIC_CONFIG;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.GET_TOPIC_CONFIG_WITH_RETRY;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.LIST_ALL_TOPICS;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.SET_TOPIC_CONFIG;
import static com.linkedin.venice.pubsub.PubSubConstants.*;
import static com.linkedin.venice.pubsub.manager.TopicManagerStats.SENSOR_TYPE.*;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.utils.IntegrationTestPushUtils.runVPJ;
import static com.linkedin.venice.utils.TestUtils.assertCommand;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertEquals;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.*;
import static com.linkedin.venice.utils.TestUtils.*;
import static com.linkedin.venice.utils.TestWriteUtils.*;
import static java.util.Objects.*;
import static org.testng.Assert.*;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
import static com.linkedin.venice.ConfigKeys.DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID;
import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFER_VERSION_SWAP;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.*;
import static com.linkedin.venice.utils.TestWriteUtils.*;
import static com.linkedin.venice.vpj.VenicePushJobConstants.*;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.fail;

Expand All @@ -20,17 +18,20 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -93,6 +94,97 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
public void testRTTopicDeletionWithHybridAndIncrementalVersions() {
String storeName = Utils.getUniqueString("testRTTopicDeletion");
String clusterName = CLUSTER_NAMES[0];
String rtTopicName = Version.composeRealTimeTopic(storeName);
PubSubTopic rtPubSubTopic = new PubSubTopicRepository().getTopic(rtTopicName);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
ControllerClient parentControllerClient =
ControllerClient.constructClusterControllerClient(clusterName, parentControllerURLs);
ControllerClient[] controllerClients = new ControllerClient[childDatacenters.size() + 1];
controllerClients[0] = parentControllerClient;
for (int i = 0; i < childDatacenters.size(); i++) {
controllerClients[i + 1] =
new ControllerClient(clusterName, childDatacenters.get(i).getControllerConnectString());
}

List<TopicManager> topicManagers = new ArrayList<>(2);
topicManagers
.add(childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());
topicManagers
.add(childDatacenters.get(1).getControllers().values().iterator().next().getVeniceAdmin().getTopicManager());

NewStoreResponse newStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", "\"string\""));
Assert.assertFalse(
newStoreResponse.isError(),
"The NewStoreResponse returned an error: " + newStoreResponse.getError());

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(1)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// change store from hybrid to batch-only
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setHybridRewindSeconds(-1).setHybridTimeLagThreshold(-1).setHybridOffsetLagThreshold(-1);
TestWriteUtils.updateStore(storeName, parentControllerClient, params);

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);
// at this point, the current version should be batch-only, but the older version should be hybrid, so rt topic
// should not get deleted
for (TopicManager topicManager: topicManagers) {
Assert.assertTrue(topicManager.containsTopic(rtPubSubTopic));
}

// create new version by doing an empty push
parentControllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

// now both the versions should be batch-only, so rt topic should get deleted by TopicCleanupService
for (TopicManager topicManager: topicManagers) {
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
true,
true,
() -> Assert.assertFalse(topicManager.containsTopic(rtPubSubTopic)));
}

/*
todo - RT topic is not deleted in parent colo yet.
we need to probably modify TopicCleanupServiceForParentController and check if all the child colos have deleted
RT topic, then it can also be deleted from parent colo. I believe we can extend it to non RT topics as well and
then can get rid of `storeToCountdownForDeletion`
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> {
Assert.assertFalse(parentTopicManager.containsTopic(rtPubSubTopic));
}
);
*/
}

@Test(timeOut = TEST_TIMEOUT)
public void testUpdateStore() {
String clusterName = CLUSTER_NAMES[0];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.pubsub.PubSubConstants.*;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.*;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.runVPJ;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord;
import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.USER_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.utils.TestWriteUtils.writeEmptyAvroFile;
import static com.linkedin.venice.vpj.VenicePushJobConstants.COMPRESSION_METRIC_COLLECTION_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static com.linkedin.venice.utils.TestWriteUtils.*;
import static com.linkedin.venice.vpj.VenicePushJobConstants.*;
import static org.testng.Assert.*;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
Expand Down Expand Up @@ -111,7 +102,7 @@ public void testEmptyPushByChangingCompressionStrategyForHybridStore() throws IO
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
0L,
venice.getPubSubBrokerWrapper(),
venice.getPubSubTopicRepository())
.getTopicManager(venice.getPubSubBrokerWrapper().getAddress())) {
Expand Down Expand Up @@ -153,7 +144,7 @@ public void testEmptyPushByChangingCompressionStrategyForHybridStore() throws IO
Runnable dataValidator = () -> {
try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(venice.getRandomRouterURL()))) {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
try {
for (int i = 1; i <= 1000; ++i) {
String key = keyPrefix + i;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,16 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_ALLOCATION_STRATEGY;
import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY;
import static com.linkedin.venice.ConfigKeys.SSL_TO_KAFKA_LEGACY;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_KEY_SCHEMA;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.DEFAULT_VALUE_SCHEMA;
import static com.linkedin.venice.meta.BufferReplayPolicy.REWIND_FROM_EOP;
import static com.linkedin.venice.meta.BufferReplayPolicy.REWIND_FROM_SOP;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;
import static com.linkedin.venice.router.api.VenicePathParser.TYPE_STORAGE;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.*;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapper.*;
import static com.linkedin.venice.meta.BufferReplayPolicy.*;
import static com.linkedin.venice.pubsub.PubSubConstants.*;
import static com.linkedin.venice.router.api.VenicePathParser.*;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.*;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducerConfig;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.runVPJ;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendCustomSizeStreamingRecord;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord;
import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFER_VERSION_SWAP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static com.linkedin.venice.utils.TestWriteUtils.*;
import static com.linkedin.venice.vpj.VenicePushJobConstants.*;
import static org.testng.Assert.*;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.luben.zstd.Zstd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ String getNativeReplicationSourceFabric(

TopicManager getTopicManager(String pubSubServerAddress);

boolean canDeleteRTTopic(String clusterName, String storeName);

/**
* Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be
* either a parent controller or a child controller since a cluster must have a leader child controller and a leader
Expand Down
Loading

0 comments on commit 9ec9fce

Please sign in to comment.