Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[changelog] Add message compaction as an opt in feature to changelog consumer #1230

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
private String viewName;

private String consumerName = "";

private boolean compactMessages = false;
private ClientConfig<T> innerClientConfig;
private D2ControllerClient d2ControllerClient;

Expand Down Expand Up @@ -79,6 +81,11 @@ public ChangelogClientConfig<T> setConsumerName(String consumerName) {
return this;
}

public ChangelogClientConfig<T> setShouldCompactMessages(boolean compactMessages) {
this.compactMessages = compactMessages;
return this;
}

public String getViewName() {
return viewName;
}
Expand All @@ -87,6 +94,10 @@ public String getConsumerName() {
return consumerName;
}

public boolean shouldCompactMessages() {
return compactMessages;
}

public ChangelogClientConfig<T> setControllerD2ServiceName(String controllerD2ServiceName) {
this.controllerD2ServiceName = controllerD2ServiceName;
return this;
Expand Down Expand Up @@ -207,7 +218,8 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs())
.setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes())
.setConsumerName(config.consumerName)
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval());
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
.setShouldCompactMessages(config.shouldCompactMessages());
return newConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -598,6 +599,21 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
if (changeCaptureStats != null) {
changeCaptureStats.recordRecordsConsumed(pubSubMessages.size());
}
if (changelogClientConfig.shouldCompactMessages()) {
Map<K, PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> tempMap = new LinkedHashMap<>();
// The behavior of LinkedHashMap is such that it maintains the order of insertion, but for values which are
// replaced,
// it's put in at the position of the first insertion. This isn't quite what we want, we want to keep only
// a single key (just as a map would), but we want to keep the position of the last insertion as well. So in order
// to do that, we remove the entry before inserting it.
for (PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> message: pubSubMessages) {
if (tempMap.containsKey(message.getKey())) {
tempMap.remove(message.getKey());
}
tempMap.put(message.getKey(), message);
}
return tempMap.values();
}
return pubSubMessages;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup
partition,
oldVersionTopic,
newVersionTopic,
false,
false);
ChangelogClientConfig changelogClientConfig =
getChangelogClientConfig(d2ControllerClient).setViewName("changeCaptureView");
Expand Down Expand Up @@ -197,7 +198,16 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE
VeniceChangelogConsumerImpl mockInternalSeekConsumer = Mockito.mock(VeniceChangelogConsumerImpl.class);
Mockito.when(mockInternalSeekConsumer.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null));
Mockito.when(mockInternalSeekConsumer.getPubSubConsumer()).thenReturn(mockPubSubConsumer);
prepareChangeCaptureRecordsToBePolled(0L, 10L, mockPubSubConsumer, oldVersionTopic, 0, oldVersionTopic, null, true);
prepareChangeCaptureRecordsToBePolled(
0L,
10L,
mockPubSubConsumer,
oldVersionTopic,
0,
oldVersionTopic,
null,
true,
false);
VeniceAfterImageConsumerImpl<String, Utf8> veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>(
changelogClientConfig,
mockPubSubConsumer,
Expand Down Expand Up @@ -305,6 +315,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
0,
oldVersionTopic,
null,
false,
false);
pubSubMessages =
(List<PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100);
Expand All @@ -321,6 +332,81 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
verify(mockPubSubConsumer).close();
}

@Test
public void testConsumeAfterImageWithCompaction() throws ExecutionException, InterruptedException {
D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class);
StoreResponse storeResponse = mock(StoreResponse.class);
StoreInfo storeInfo = mock(StoreInfo.class);
doReturn(1).when(storeInfo).getCurrentVersion();
doReturn(2).when(storeInfo).getPartitionCount();
doReturn(storeInfo).when(storeResponse).getStore();
doReturn(storeResponse).when(d2ControllerClient).getStore(storeName);
MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class);
doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr();
doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse)
.getSchemas();
doReturn(multiRMDSchemaResponse).when(d2ControllerClient).getAllReplicationMetadataSchemas(storeName);

PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class);
PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1));
PubSubTopic oldChangeCaptureTopic =
pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);

prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true);
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName)
.setShouldCompactMessages(true)
.setViewName("");
VeniceChangelogConsumerImpl<String, Utf8> veniceChangelogConsumer =
new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer);
Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2);

ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema));
Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion);
veniceChangelogConsumer.setStoreRepository(mockRepository);
veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get();
verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET);

List<PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate>> pubSubMessages =
new ArrayList<>(veniceChangelogConsumer.poll(100));
for (int i = 0; i < 5; i++) {
PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i);
Utf8 messageStr = pubSubMessage.getValue().getCurrentValue();
Assert.assertEquals(messageStr.toString(), "newValue" + i);
}
prepareChangeCaptureRecordsToBePolled(
0L,
10L,
mockPubSubConsumer,
oldChangeCaptureTopic,
0,
oldVersionTopic,
null,
false,
true);
pubSubMessages = new ArrayList<>(veniceChangelogConsumer.poll(100));
Assert.assertFalse(pubSubMessages.isEmpty());
Assert.assertEquals(pubSubMessages.size(), 10);
for (int i = 0; i < 10; i++) {
PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i);
Utf8 pubSubMessageValue = pubSubMessage.getValue().getCurrentValue();
Assert.assertEquals(pubSubMessageValue.toString(), "newValue" + i);
}

veniceChangelogConsumer.close();
verify(mockPubSubConsumer, times(2)).batchUnsubscribe(any());
verify(mockPubSubConsumer).close();
}

@Test
public void testMetricReportingThread() throws InterruptedException {
D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class);
Expand Down Expand Up @@ -370,7 +456,8 @@ private void prepareChangeCaptureRecordsToBePolled(
int partition,
PubSubTopic oldVersionTopic,
PubSubTopic newVersionTopic,
boolean addEndOfPushMessage) {
boolean addEndOfPushMessage,
boolean repeatMessages) {
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> pubSubMessageList = new ArrayList<>();

// Add a start of push message
Expand All @@ -389,6 +476,19 @@ private void prepareChangeCaptureRecordsToBePolled(
pubSubMessageList.add(pubSubMessage);
}

if (repeatMessages) {
for (long i = startIdx; i < endIdx; i++) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage = constructChangeCaptureConsumerRecord(
changeCaptureTopic,
partition,
"oldValue" + i,
"newValue" + i,
"key" + i,
Arrays.asList(i, i));
pubSubMessageList.add(pubSubMessage);
}
}

if (addEndOfPushMessage) {
pubSubMessageList.add(constructEndOfPushMessage(changeCaptureTopic, partition, endIdx + 1));
}
Expand Down
Loading