Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][doc] Create MVP for DaVinciRecordTransformer #1087

Open
wants to merge 69 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
8dc58f0
Create MVP for record transformer
kvargha Jul 29, 2024
88b62fe
Fix docs and imports
kvargha Jul 29, 2024
fc856b7
Don't make serializers and deserializers global
kvargha Jul 29, 2024
03f0453
Dont check for boolean val
kvargha Jul 29, 2024
45ba862
Delete classHash if it exists
kvargha Jul 29, 2024
e8fa547
Assert file deletion is true
kvargha Jul 29, 2024
b3f3e0a
Improve code coverage
kvargha Jul 30, 2024
f3cacff
Add tests for blocking record transformer
kvargha Jul 30, 2024
d2532b3
Created AvroGenericDaVinciClient for record transformer test
kvargha Jul 30, 2024
4d3ee2b
Make sure getRecordTransformer is valid
kvargha Jul 30, 2024
189127c
Make sure getRecordTransformer isn't null
kvargha Jul 30, 2024
9995758
Merge branch 'main' into dvrt-mvp
kvargha Jul 30, 2024
2c421a6
Reorganize testRecordTransformerClient and fix key schema
kvargha Jul 30, 2024
68d2be8
Fix TestStringRecordTransformer to work with Avro objects and update doc
kvargha Jul 31, 2024
9ffbf41
Merge pull request #1 from kvargha/dvrt-mvp
kvargha Jul 31, 2024
fc0b188
Refactor onRecovery and add javadoc for DaVinciRecordTransformer's co…
kvargha Jul 31, 2024
05c22b4
Reset offset if we need to bootstrap from VT
kvargha Aug 1, 2024
1ef7c20
Merge branch 'main' into dvrt-mvp
kvargha Aug 2, 2024
6b56ff8
Delete classHash after running tests
kvargha Aug 2, 2024
2cbd14f
Throw an error if a user tries to use blob transfer with record trans…
kvargha Aug 2, 2024
1ed816b
Make previous public methods private, remove subscribe call inside on…
kvargha Aug 13, 2024
6162604
Correctly pass DVRT functional interface to initBackend, add todo to …
kvargha Aug 13, 2024
968d357
Fix spotbugs
kvargha Aug 13, 2024
e78340e
Init DVRT inside SIT, and move DVRT recovery to SIT
kvargha Aug 14, 2024
42485d9
Modify checkout action
kvargha Aug 15, 2024
22b57e9
Undo
kvargha Aug 15, 2024
5f4da22
Merge branch 'linkedin:main' into dvrt-mvp
kvargha Aug 15, 2024
e57caeb
Merge branch 'main' into dvrt-mvp
kvargha Aug 15, 2024
3c7ec6c
Fix compilation
kvargha Aug 15, 2024
36926d5
Cache deserializer/serializer
kvargha Aug 16, 2024
ef6ac83
Create utility class for record transformer
kvargha Aug 19, 2024
14fef31
Create AbstractStorageIterator and a RocksDB implementation
kvargha Aug 23, 2024
80dd551
Delete classHash file. Compare value classes and don't override value…
kvargha Sep 4, 2024
2dd00a8
Merge branch 'linkedin:main' into dvrt-mvp
kvargha Sep 4, 2024
a4227a5
Remove compareCacheConfig mock
kvargha Sep 4, 2024
91ba185
Wrap access modifier with doPrivileged
kvargha Sep 4, 2024
cf7169e
Merge branch 'main' into dvrt-mvp
kvargha Sep 12, 2024
4d958a2
Fix spotless error
kvargha Sep 12, 2024
4017da6
Merge branch 'main' into dvrt-mvp
kvargha Sep 17, 2024
d15f437
Remove unused variables
kvargha Sep 17, 2024
7e49576
Added a ToDo to make chunking with record transformer lazy, and make …
kvargha Sep 18, 2024
f175b10
Created a config for record transformer that's passed into the DaVinc…
kvargha Sep 19, 2024
849ebce
Add message envelope for DVRT
kvargha Oct 19, 2024
3f44da3
Merge branch 'main' into dvrt-mvp
kvargha Oct 19, 2024
ccf1940
Fix spotless issue
kvargha Oct 19, 2024
54ba9ee
Fix test
kvargha Oct 19, 2024
12b012c
update docs
kvargha Oct 19, 2024
e747f70
Dvrt mvp (#4)
kvargha Oct 19, 2024
33690b7
Add another test, and update docs
kvargha Oct 21, 2024
1bb889b
Cleanup code
kvargha Oct 21, 2024
9462bc7
Cleanup docs
kvargha Oct 21, 2024
ac165e6
Merge branch 'main' into dvrt-mvp
kvargha Oct 21, 2024
edbd3ca
Cleanup
kvargha Oct 21, 2024
0b74a7a
Cleanup
kvargha Oct 22, 2024
39201bc
Add integration test for onrecovery. And fix iterator deserializer issue
kvargha Oct 23, 2024
4a8e44a
Throw error if II and DVRT are enabled together
kvargha Oct 23, 2024
fdd0ef0
Cleanup
kvargha Oct 23, 2024
b09a3c9
Compress transformed value
kvargha Oct 24, 2024
feb73cf
Add test for chunking
kvargha Oct 25, 2024
71ffec7
Cleanup
kvargha Oct 25, 2024
6b8bab1
Cleanup
kvargha Oct 25, 2024
a94b196
cleanup
kvargha Oct 25, 2024
be07be4
cleanup
kvargha Oct 25, 2024
609c06d
Add versionIngestion to onStart/onEnd function names
kvargha Oct 25, 2024
4215967
Cleanup
kvargha Oct 25, 2024
beb821f
Cleanup
kvargha Oct 25, 2024
bb25b73
Address review comments
kvargha Oct 28, 2024
83a88bd
Reorganize
kvargha Oct 28, 2024
bf85c1e
Add overrides
kvargha Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ subprojects {
implementation libraries.grpcProtobuf
implementation libraries.grpcServices
implementation libraries.grpcStub
implementation 'org.ow2.asm:asm:9.7'
compileOnly libraries.tomcatAnnotations
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.linkedin.davinci.blobtransfer.BlobTransferManager;
import com.linkedin.davinci.blobtransfer.BlobTransferUtil;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.StoreBackendConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
Expand Down Expand Up @@ -119,7 +119,7 @@ public DaVinciBackend(
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
Function<Integer, DaVinciRecordTransformer> getRecordTransformer) {
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();
Expand Down Expand Up @@ -268,7 +268,7 @@ public DaVinciBackend(
false,
compressorFactory,
cacheBackend,
getRecordTransformer,
recordTransformerFunction,
true,
// TODO: consider how/if a repair task would be valid for Davinci users?
null,
Expand All @@ -290,6 +290,10 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerFunction != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(),
Expand Down Expand Up @@ -462,7 +466,8 @@ private synchronized void bootstrap() {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageService.getStorageEngine(versionTopic));
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -138,6 +137,8 @@ protected boolean removeEldestEntry(Map.Entry<Integer, GenericRecord> eldest) {
private final AbstractAvroChunkingAdapter<V> chunkingAdapter;
private final Executor readChunkExecutorForLargeRequest;

private final DaVinciRecordTransformerConfig recordTransformerConfig;

public AvroGenericDaVinciClient(
DaVinciConfig daVinciConfig,
ClientConfig clientConfig,
Expand Down Expand Up @@ -180,8 +181,17 @@ protected AvroGenericDaVinciClient(
this.managedClients = managedClients;
this.icProvider = icProvider;
this.chunkingAdapter = chunkingAdapter;
this.recordTransformerConfig = daVinciConfig.getRecordTransformerConfig();
this.readChunkExecutorForLargeRequest =
readChunkExecutorForLargeRequest != null ? readChunkExecutorForLargeRequest : READ_CHUNK_EXECUTOR;

if (daVinciConfig.isIsolated() && recordTransformerConfig != null) {
// When both are enabled, this causes the storage engine to be deleted everytime the client starts,
// since the record transformer config is never persisted to disk. Additionally, this will spawn multiple
// transformers per version, and if the user's transformer is stateful this could cause issues.
throw new VeniceClientException("Ingestion Isolation is not supported with DaVinciRecordTransformer");
}

preValidation.run();
}

Expand Down Expand Up @@ -657,7 +667,8 @@ protected GenericRecordChunkingAdapter getGenericRecordChunkingAdapter() {
return GenericRecordChunkingAdapter.INSTANCE;
}

private D2ServiceDiscoveryResponse discoverService() {
// Visible for testing
protected D2ServiceDiscoveryResponse discoverService() {
try (TransportClient client = getTransportClient(clientConfig)) {
if (!(client instanceof D2TransportClient)) {
throw new VeniceClientException(
Expand Down Expand Up @@ -688,6 +699,7 @@ private VeniceConfigLoader buildVeniceConfig() {
if (kafkaBootstrapServers == null) {
kafkaBootstrapServers = backendConfig.getString(KAFKA_BOOTSTRAP_SERVERS);
}

VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName())
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config
Expand All @@ -703,8 +715,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(
RECORD_TRANSFORMER_VALUE_SCHEMA,
daVinciConfig.isRecordTransformerEnabled()
// We're creating a new record transformer here just to get the schema
? daVinciConfig.getRecordTransformer(0).getValueOutputSchema().toString()
? recordTransformerConfig.getOutputValueSchema().toString()
: "null")
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter
// in Isolated Process
Expand All @@ -714,13 +725,14 @@ private VeniceConfigLoader buildVeniceConfig() {
return new VeniceConfigLoader(config, config);
}

private void initBackend(
// Visible for testing
protected void initBackend(
ClientConfig clientConfig,
VeniceConfigLoader configLoader,
Optional<Set<String>> managedClients,
ICProvider icProvider,
Optional<ObjectCacheConfig> cacheConfig,
Function<Integer, DaVinciRecordTransformer> getRecordTransformer) {
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
synchronized (AvroGenericDaVinciClient.class) {
if (daVinciBackend == null) {
logger
Expand All @@ -732,7 +744,7 @@ private void initBackend(
managedClients,
icProvider,
cacheConfig,
getRecordTransformer),
recordTransformerFunction),
backend -> {
// Ensure that existing backend is fully closed before a new one can be created.
synchronized (AvroGenericDaVinciClient.class) {
Expand Down Expand Up @@ -776,7 +788,7 @@ public synchronized void start() {
managedClients,
icProvider,
cacheConfig,
daVinciConfig::getRecordTransformer);
daVinciConfig.getRecordTransformerFunction());

try {
getBackend().verifyCacheConfigEquality(daVinciConfig.getCacheConfig(), getStoreName());
Expand All @@ -795,12 +807,28 @@ public synchronized void start() {
this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
this.genericRecordStoreDeserializerCache =
new AvroStoreDeserializerCache(daVinciBackend.get().getSchemaRepository(), getStoreName(), true);
this.storeDeserializerCache = clientConfig.isSpecificClient()
? new AvroSpecificStoreDeserializerCache<>(

if (clientConfig.isSpecificClient()) {
if (daVinciConfig.isRecordTransformerEnabled()) {
if (recordTransformerConfig.getOutputValueClass() != clientConfig.getSpecificValueClass()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do a test case through these, I think looking back at the integration test cases the transformed type is always the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of unit tests that go over this. Unless you mean changing the types? Ex: Int -> String?

throw new VeniceClientException(
"Specific value class mismatch between ClientConfig and DaVinciRecordTransformer, expected="
+ clientConfig.getSpecificValueClass() + ", actual="
+ recordTransformerConfig.getOutputValueClass());
}

this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(
recordTransformerConfig.getOutputValueSchema(),
clientConfig.getSpecificValueClass());
} else {
this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(
daVinciBackend.get().getSchemaRepository(),
getStoreName(),
clientConfig.getSpecificValueClass())
: (AvroStoreDeserializerCache<V>) this.genericRecordStoreDeserializerCache;
clientConfig.getSpecificValueClass());
}
} else {
this.storeDeserializerCache = (AvroStoreDeserializerCache<V>) this.genericRecordStoreDeserializerCache;
}

ready.set(true);
logger.info("Client is started successfully, storeName=" + getStoreName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,44 @@ public class BlockingDaVinciRecordTransformer<K, V, O> extends DaVinciRecordTran
private final DaVinciRecordTransformer recordTransformer;
private final CountDownLatch startLatch = new CountDownLatch(1);

public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer) {
super(recordTransformer.getStoreVersion());
public BlockingDaVinciRecordTransformer(DaVinciRecordTransformer recordTransformer, boolean storeRecordsInDaVinci) {
super(recordTransformer.getStoreVersion(), storeRecordsInDaVinci);
this.recordTransformer = recordTransformer;
}

public Schema getKeyOutputSchema() {
return this.recordTransformer.getKeyOutputSchema();
public Schema getKeySchema() {
return this.recordTransformer.getKeySchema();
}

public Schema getValueOutputSchema() {
return this.recordTransformer.getValueOutputSchema();
public Schema getOutputValueSchema() {
return this.recordTransformer.getOutputValueSchema();
}

public O put(Lazy<K> key, Lazy<V> value) {
public DaVinciRecordTransformerResult<O> transform(Lazy<K> key, Lazy<V> value) {
return this.recordTransformer.transform(key, value);
}

public void processPut(Lazy<K> key, Lazy<O> value) {
try {
// Waiting for onStartIngestionTask to complete before proceeding
startLatch.await();
return (O) this.recordTransformer.put(key, value);
this.recordTransformer.processPut(key, value);
} catch (InterruptedException e) {
// Restore the interrupt status
Thread.currentThread().interrupt();
return null;
}
}

public O delete(Lazy<K> key) {
return (O) this.recordTransformer.delete(key);
public void processDelete(Lazy<K> key) {
this.recordTransformer.processDelete(key);
}

public void onStartIngestionTask() {
this.recordTransformer.onStartIngestionTask();
public void onStartVersionIngestion() {
this.recordTransformer.onStartVersionIngestion();
startLatch.countDown();
}

public void onEndIngestionTask() {
this.recordTransformer.onEndIngestionTask();
public void onEndVersionIngestion() {
this.recordTransformer.onEndVersionIngestion();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.davinci.client;

import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
import java.util.function.Function;


public class DaVinciConfig {
Expand Down Expand Up @@ -32,7 +31,7 @@ public class DaVinciConfig {
/**
* Record transformer reference
*/
private Function<Integer, DaVinciRecordTransformer> recordTransformerFunction;
private DaVinciRecordTransformerConfig recordTransformerConfig;

/**
* Whether to enable read-path metrics.
Expand Down Expand Up @@ -107,7 +106,7 @@ public boolean isCacheEnabled() {
}

public boolean isRecordTransformerEnabled() {
return recordTransformerFunction != null;
return recordTransformerConfig != null;
}

public ObjectCacheConfig getCacheConfig() {
Expand All @@ -119,17 +118,27 @@ public DaVinciConfig setCacheConfig(ObjectCacheConfig cacheConfig) {
return this;
}

public DaVinciConfig setRecordTransformerConfig(DaVinciRecordTransformerConfig recordTransformerConfig) {
this.recordTransformerConfig = recordTransformerConfig;
return this;
}

public DaVinciRecordTransformerConfig getRecordTransformerConfig() {
return recordTransformerConfig;
}

public DaVinciRecordTransformer getRecordTransformer(Integer storeVersion) {
if (recordTransformerFunction != null) {
return recordTransformerFunction.apply(storeVersion);
if (recordTransformerConfig == null) {
return null;
}
return null;
return recordTransformerConfig.getRecordTransformer(storeVersion);
}

public DaVinciConfig setRecordTransformerFunction(
Function<Integer, DaVinciRecordTransformer> recordTransformerFunction) {
this.recordTransformerFunction = recordTransformerFunction;
return this;
public DaVinciRecordTransformerFunctionalInterface getRecordTransformerFunction() {
if (recordTransformerConfig == null) {
return null;
}
return recordTransformerConfig.getRecordTransformerFunction();
}

public boolean isReadMetricsEnabled() {
Expand Down
Loading
Loading