From 1ee8922047c53cf0c3c78e105cbb2be152e984e6 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 9 Oct 2024 14:54:48 -0700 Subject: [PATCH 1/2] [changelog] Add feature to compact events from changelog client This introduces a configuration which will compact down data returned from poll to only contain the latest records for a given key. This also maintains the order of results returned. --- .../consumer/ChangelogClientConfig.java | 14 +++- .../consumer/VeniceChangelogConsumerImpl.java | 16 ++++ .../VeniceChangelogConsumerImplTest.java | 75 +++++++++++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java index ca2b86c701..8a1d54feb0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java @@ -14,6 +14,8 @@ public class ChangelogClientConfig { private String viewName; private String consumerName = ""; + + private boolean compactMessages = false; private ClientConfig innerClientConfig; private D2ControllerClient d2ControllerClient; @@ -79,6 +81,11 @@ public ChangelogClientConfig setConsumerName(String consumerName) { return this; } + public ChangelogClientConfig setShouldCompactMessages(boolean compactMessages) { + this.compactMessages = compactMessages; + return this; + } + public String getViewName() { return viewName; } @@ -87,6 +94,10 @@ public String getConsumerName() { return consumerName; } + public boolean shouldCompactMessages() { + return compactMessages; + } + public ChangelogClientConfig setControllerD2ServiceName(String controllerD2ServiceName) { this.controllerD2ServiceName = controllerD2ServiceName; return this; @@ -207,7 +218,8 @@ public static ChangelogClientConfig cloneConfig(Ch .setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs()) .setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes()) .setConsumerName(config.consumerName) - .setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval()); + .setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval()) + .setShouldCompactMessages(config.shouldCompactMessages()); return newConfig; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 09bdf507f0..ec5ccb6fd7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -598,6 +599,21 @@ protected Collection, VeniceChangeCoordinate>> i if (changeCaptureStats != null) { changeCaptureStats.recordRecordsConsumed(pubSubMessages.size()); } + if (changelogClientConfig.shouldCompactMessages()) { + Map, VeniceChangeCoordinate>> tempMap = new LinkedHashMap<>(); + // The behavior of LinkedHashMap is such that it maintains the order of insertion, but for values which are + // replaced, + // it's put in at the position of the first insertion. This isn't quite what we want, we want to keep only + // a single key (just as a map would), but we want to keep the position of the last insertion as well. So in order + // to do that, we remove the entry before inserting it. + for (PubSubMessage, VeniceChangeCoordinate> message: pubSubMessages) { + if (tempMap.containsKey(message.getKey())) { + tempMap.remove(message.getKey()); + } + tempMap.put(message.getKey(), message); + } + return new ArrayList<>(tempMap.values()); + } return pubSubMessages; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 042878fdbb..0e7b0cf034 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -331,6 +331,81 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept verify(mockPubSubConsumer).close(); } + @Test + public void testConsumeAfterImageWithCompaction() throws ExecutionException, InterruptedException { + D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class); + StoreResponse storeResponse = mock(StoreResponse.class); + StoreInfo storeInfo = mock(StoreInfo.class); + doReturn(1).when(storeInfo).getCurrentVersion(); + doReturn(2).when(storeInfo).getPartitionCount(); + doReturn(storeInfo).when(storeResponse).getStore(); + doReturn(storeResponse).when(d2ControllerClient).getStore(storeName); + MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class); + MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class); + doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr(); + doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse) + .getSchemas(); + doReturn(multiRMDSchemaResponse).when(d2ControllerClient).getAllReplicationMetadataSchemas(storeName); + + PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)); + PubSubTopic oldChangeCaptureTopic = + pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); + + prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true); + ChangelogClientConfig changelogClientConfig = + new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient) + .setSchemaReader(schemaReader) + .setStoreName(storeName) + .setShouldCompactMessages(true) + .setViewName(""); + VeniceChangelogConsumerImpl veniceChangelogConsumer = + new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer); + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); + + ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + Store store = mock(Store.class); + Version mockVersion = new VersionImpl(storeName, 1, "foo"); + Mockito.when(store.getCurrentVersion()).thenReturn(1); + Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); + Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); + Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + veniceChangelogConsumer.setStoreRepository(mockRepository); + veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); + verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET); + + List, VeniceChangeCoordinate>> pubSubMessages = + (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); + for (int i = 0; i < 5; i++) { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); + Utf8 messageStr = pubSubMessage.getValue().getCurrentValue(); + Assert.assertEquals(messageStr.toString(), "newValue" + i); + } + prepareChangeCaptureRecordsToBePolled( + 0L, + 10L, + mockPubSubConsumer, + oldChangeCaptureTopic, + 0, + oldVersionTopic, + null, + false); + pubSubMessages = + (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); + Assert.assertFalse(pubSubMessages.isEmpty()); + Assert.assertEquals(pubSubMessages.size(), 10); + for (int i = 0; i < 10; i++) { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); + Utf8 pubSubMessageValue = pubSubMessage.getValue().getCurrentValue(); + Assert.assertEquals(pubSubMessageValue.toString(), "newValue" + i); + } + + veniceChangelogConsumer.close(); + verify(mockPubSubConsumer, times(2)).batchUnsubscribe(any()); + verify(mockPubSubConsumer).close(); + } + @Test public void testMetricReportingThread() throws InterruptedException { D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class); From 8e6c670bbb3ba4209734a68629f9b28b2e98fcf6 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Wed, 9 Oct 2024 15:06:00 -0700 Subject: [PATCH 2/2] added testing --- .../consumer/VeniceChangelogConsumerImpl.java | 2 +- .../VeniceChangelogConsumerImplTest.java | 37 ++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index ec5ccb6fd7..7369082f4d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -612,7 +612,7 @@ protected Collection, VeniceChangeCoordinate>> i } tempMap.put(message.getKey(), message); } - return new ArrayList<>(tempMap.values()); + return tempMap.values(); } return pubSubMessages; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 0e7b0cf034..73ad657847 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -123,6 +123,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup partition, oldVersionTopic, newVersionTopic, + false, false); ChangelogClientConfig changelogClientConfig = new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient) @@ -203,7 +204,16 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE VeniceChangelogConsumerImpl mockInternalSeekConsumer = Mockito.mock(VeniceChangelogConsumerImpl.class); Mockito.when(mockInternalSeekConsumer.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null)); Mockito.when(mockInternalSeekConsumer.getPubSubConsumer()).thenReturn(mockPubSubConsumer); - prepareChangeCaptureRecordsToBePolled(0L, 10L, mockPubSubConsumer, oldVersionTopic, 0, oldVersionTopic, null, true); + prepareChangeCaptureRecordsToBePolled( + 0L, + 10L, + mockPubSubConsumer, + oldVersionTopic, + 0, + oldVersionTopic, + null, + true, + false); VeniceAfterImageConsumerImpl veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>( changelogClientConfig, mockPubSubConsumer, @@ -315,6 +325,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept 0, oldVersionTopic, null, + false, false); pubSubMessages = (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); @@ -376,7 +387,7 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET); List, VeniceChangeCoordinate>> pubSubMessages = - (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); + new ArrayList<>(veniceChangelogConsumer.poll(100)); for (int i = 0; i < 5; i++) { PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); Utf8 messageStr = pubSubMessage.getValue().getCurrentValue(); @@ -390,9 +401,9 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int 0, oldVersionTopic, null, - false); - pubSubMessages = - (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); + false, + true); + pubSubMessages = new ArrayList<>(veniceChangelogConsumer.poll(100)); Assert.assertFalse(pubSubMessages.isEmpty()); Assert.assertEquals(pubSubMessages.size(), 10); for (int i = 0; i < 10; i++) { @@ -459,7 +470,8 @@ private void prepareChangeCaptureRecordsToBePolled( int partition, PubSubTopic oldVersionTopic, PubSubTopic newVersionTopic, - boolean addEndOfPushMessage) { + boolean addEndOfPushMessage, + boolean repeatMessages) { List> pubSubMessageList = new ArrayList<>(); // Add a start of push message @@ -478,6 +490,19 @@ private void prepareChangeCaptureRecordsToBePolled( pubSubMessageList.add(pubSubMessage); } + if (repeatMessages) { + for (long i = startIdx; i < endIdx; i++) { + PubSubMessage pubSubMessage = constructChangeCaptureConsumerRecord( + changeCaptureTopic, + partition, + "oldValue" + i, + "newValue" + i, + "key" + i, + Arrays.asList(i, i)); + pubSubMessageList.add(pubSubMessage); + } + } + if (addEndOfPushMessage) { pubSubMessageList.add(constructEndOfPushMessage(changeCaptureTopic, partition, endIdx + 1)); }