Skip to content

Commit

Permalink
[compat][server][controller] Introduced options to speed up KafkaProd…
Browse files Browse the repository at this point in the history
…ucer in Server for nearline workload (#1258)

* [compat][server][controller] Introduced options to speed up KafkaProducer in Server for nearline workload

During benchmarking, we noticed that the producing is slow and sometimes
it can take 10+ms for producing one single record for some medium
size record: 20-30KB.
There are two issues we discovered during the benchmarking:
1. Kafka producer compression is taking a lot of time.
   Even we disable KafkaProducer compression, Kafka broker will
   compress it according to the target compression setup in the broker.
2. Kafka Producer is not scalable and when there are multiple threads
   invoking the same producer, there are a lot of contentions.

These optimizations have overheads:
1. By disabling compression, the Kafka workload will increase and considering
   the optimization is only for nearline workload, hope the workload
   increase is not too much. Also there is a store-level control.
2. More Producers in Venice Server.
   More memory consumption as each producer has its own buffer, and
   more producer threads.

Ideally, we would like to enable these optimizations to some critical
stores, which have heavy AA/WC nearline workload to improve the E2E
write latency.

New Server Config:
server.nearline.workload.producer.throughput.optimization.enabled: default true

This setting can override all the store-level config to disable
the optimization entirely.

Two new store-level config:
NearlineProducerCountPerWriter: default 1
NearlineProducerCompressionEnabled: default true

We can use admin-tool to update these store-level configs.

This PR also changes the admin operation protocol, so we need
to deploy child controllers first and then parent controller.
  • Loading branch information
gaojieliu authored Oct 25, 2024
1 parent a698f3e commit 30999ea
Show file tree
Hide file tree
Showing 35 changed files with 2,329 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_LOCAL_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_MAX_REQUEST_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_MAX_WAIT_FOR_VERSION_INFO_MS_CONFIG;
import static com.linkedin.venice.ConfigKeys.SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_IDLE_TIME_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_WORKER_THREADS;
Expand Down Expand Up @@ -552,6 +553,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean aaWCWorkloadParallelProcessingEnabled;
private final int aaWCWorkloadParallelProcessingThreadPoolSize;
private final boolean isGlobalRtDivEnabled;
private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -924,6 +926,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED, false);
aaWCWorkloadParallelProcessingThreadPoolSize =
serverProperties.getInt(SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE, 8);
nearlineWorkloadProducerThroughputOptimizationEnabled =
serverProperties.getBoolean(SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED, true);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1681,4 +1685,8 @@ public int getAAWCWorkloadParallelProcessingThreadPoolSize() {
public boolean isGlobalRtDivEnabled() {
return isGlobalRtDivEnabled;
}

public boolean isNearlineWorkloadProducerThroughputOptimizationEnabled() {
return nearlineWorkloadProducerThroughputOptimizationEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,8 @@ private void producePutOrDeleteToKafka(
deletePayload.replicationMetadataVersionId = rmdProtocolVersionId;
deletePayload.replicationMetadataPayload = mergeConflictResultWrapper.getUpdatedRmdBytes();
BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceToTopicFunction =
(callback, sourceTopicOffset) -> veniceWriter.get()
(callback, sourceTopicOffset) -> partitionConsumptionState.getVeniceWriterLazyRef()
.get()
.delete(
key,
callback,
Expand Down Expand Up @@ -888,6 +889,7 @@ private void producePutOrDeleteToKafka(
updatedPut.replicationMetadataPayload = updatedRmdBytes;

BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceToTopicFunction = getProduceToTopicFunction(
partitionConsumptionState,
key,
updatedValueBytes,
updatedRmdBytes,
Expand Down Expand Up @@ -1538,6 +1540,7 @@ int getRmdProtocolVersionId() {
}

protected BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> getProduceToTopicFunction(
PartitionConsumptionState partitionConsumptionState,
byte[] key,
ByteBuffer updatedValueBytes,
ByteBuffer updatedRmdBytes,
Expand All @@ -1555,7 +1558,7 @@ protected BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> getProduceToTopi
ByteUtils.getIntHeaderFromByteBuffer(updatedValueBytes),
true));
}
getVeniceWriter().get()
getVeniceWriter(partitionConsumptionState).get()
.put(
key,
ByteUtils.extractByteArray(updatedValueBytes),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.LeaderCompleteState;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -208,6 +210,8 @@ public class PartitionConsumptionState {

private List<String> pendingReportIncPushVersionList;

private Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef;

public PartitionConsumptionState(String replicaId, int partition, OffsetRecord offsetRecord, boolean hybrid) {
this.replicaId = replicaId;
this.partition = partition;
Expand Down Expand Up @@ -454,6 +458,14 @@ public void finalizeExpectedChecksum() {
this.expectedSSTFileChecksum = null;
}

public Lazy<VeniceWriter<byte[], byte[], byte[]>> getVeniceWriterLazyRef() {
return veniceWriterLazyRef;
}

public void setVeniceWriterLazyRef(Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef) {
this.veniceWriterLazyRef = veniceWriterLazyRef;
}

/**
* Keep updating the checksum for key/value pair received from kafka PUT message.
* If the checksum instance is not configured via {@link PartitionConsumptionState#initializeExpectedChecksum} then do nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,11 @@
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubMessage;
Expand All @@ -101,7 +98,6 @@
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.SparseConcurrentList;
Expand Down Expand Up @@ -293,9 +289,6 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
*/
protected final int partitionCount;

// Used to construct VenicePartitioner
protected final VenicePartitioner venicePartitioner;

// Total number of partition for this store version
protected final int storeVersionPartitionCount;

Expand Down Expand Up @@ -443,11 +436,6 @@ public StoreIngestionTask(
}
this.bootstrapTimeoutInMs = pushTimeoutInMs;
this.isIsolatedIngestion = isIsolatedIngestion;

PartitionerConfig partitionerConfig = version.getPartitionerConfig();
this.venicePartitioner = partitionerConfig == null
? new DefaultVenicePartitioner()
: PartitionUtils.getVenicePartitioner(partitionerConfig);
this.partitionCount = storeVersionPartitionCount;
this.ingestionNotificationDispatcher =
new IngestionNotificationDispatcher(notifiers, kafkaVersionTopic, isCurrentVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
when(ingestionTask.getKafkaVersionTopic()).thenReturn(testTopic);
when(ingestionTask.createProducerCallback(any(), any(), any(), anyInt(), anyString(), anyLong()))
.thenCallRealMethod();
when(ingestionTask.getProduceToTopicFunction(any(), any(), any(), any(), any(), anyInt(), anyBoolean()))
when(ingestionTask.getProduceToTopicFunction(any(), any(), any(), any(), any(), any(), anyInt(), anyBoolean()))
.thenCallRealMethod();
when(ingestionTask.getRmdProtocolVersionId()).thenReturn(rmdProtocolVersionID);
doCallRealMethod().when(ingestionTask)
Expand Down Expand Up @@ -370,7 +370,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
VeniceWriter<byte[], byte[], byte[]> writer =
new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer);
when(ingestionTask.isTransientRecordBufferUsed()).thenReturn(true);
when(ingestionTask.getVeniceWriter()).thenReturn(Lazy.of(() -> writer));
when(ingestionTask.getVeniceWriter(any())).thenReturn(Lazy.of(() -> writer));
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 50000; i++) {
stringBuilder.append("abcdefghabcdefghabcdefghabcdefgh");
Expand Down Expand Up @@ -402,6 +402,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
partitionConsumptionState,
leaderProducedRecordContext,
ingestionTask.getProduceToTopicFunction(
partitionConsumptionState,
updatedKeyBytes,
updatedValueBytes,
updatedRmdBytes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.linkedin.davinci.kafka.consumer;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.HashMap;
import java.util.Map;
import org.testng.annotations.Test;


public class LeaderFollowerStoreIngestionTaskTest {
@Test
public void testCheckWhetherToCloseUnusedVeniceWriter() {
VeniceWriter<byte[], byte[], byte[]> writer1 = mock(VeniceWriter.class);
VeniceWriter<byte[], byte[], byte[]> writer2 = mock(VeniceWriter.class);
PartitionConsumptionState pcsForLeaderBeforeEOP = mock(PartitionConsumptionState.class);
doReturn(LeaderFollowerStateType.LEADER).when(pcsForLeaderBeforeEOP).getLeaderFollowerState();
doReturn(false).when(pcsForLeaderBeforeEOP).isEndOfPushReceived();
PartitionConsumptionState pcsForLeaderAfterEOP = mock(PartitionConsumptionState.class);
doReturn(LeaderFollowerStateType.LEADER).when(pcsForLeaderAfterEOP).getLeaderFollowerState();
doReturn(true).when(pcsForLeaderAfterEOP).isEndOfPushReceived();
PartitionConsumptionState pcsForFollowerBeforeEOP = mock(PartitionConsumptionState.class);
doReturn(LeaderFollowerStateType.STANDBY).when(pcsForFollowerBeforeEOP).getLeaderFollowerState();
doReturn(false).when(pcsForFollowerBeforeEOP).isEndOfPushReceived();
PartitionConsumptionState pcsForFollowerAfterEOP = mock(PartitionConsumptionState.class);
doReturn(LeaderFollowerStateType.STANDBY).when(pcsForFollowerAfterEOP).getLeaderFollowerState();
doReturn(true).when(pcsForLeaderAfterEOP).isEndOfPushReceived();

String versionTopicName = "store_v1";
// Some writers are not available.
assertFalse(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
Lazy.of(() -> writer1),
Lazy.of(() -> writer1),
mock(Map.class),
() -> {},
versionTopicName));
Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterWithInitializedValue1 = Lazy.of(() -> writer1);
veniceWriterWithInitializedValue1.get();
assertFalse(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
veniceWriterWithInitializedValue1,
Lazy.of(() -> writer1),
mock(Map.class),
() -> {},
versionTopicName));
assertFalse(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
Lazy.of(() -> writer1),
veniceWriterWithInitializedValue1,
mock(Map.class),
() -> {},
versionTopicName));

// Same writers
assertFalse(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
veniceWriterWithInitializedValue1,
veniceWriterWithInitializedValue1,
mock(Map.class),
() -> {},
versionTopicName));

Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterWithInitializedValue2 = Lazy.of(() -> writer2);
veniceWriterWithInitializedValue2.get();
// No leader
Map<Integer, PartitionConsumptionState> noLeaderPCSMap = new HashMap<>();
noLeaderPCSMap.put(0, pcsForFollowerAfterEOP);
noLeaderPCSMap.put(1, pcsForFollowerBeforeEOP);
Runnable runnable = mock(Runnable.class);

assertTrue(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
veniceWriterWithInitializedValue1,
veniceWriterWithInitializedValue2,
noLeaderPCSMap,
runnable,
versionTopicName));
verify(runnable).run();

// One leader before EOP and some follower
Map<Integer, PartitionConsumptionState> oneLeaderBeforeEOPPCSMap = new HashMap<>();
oneLeaderBeforeEOPPCSMap.put(0, pcsForLeaderBeforeEOP);
oneLeaderBeforeEOPPCSMap.put(1, pcsForFollowerBeforeEOP);
runnable = mock(Runnable.class);
assertFalse(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
veniceWriterWithInitializedValue1,
veniceWriterWithInitializedValue2,
oneLeaderBeforeEOPPCSMap,
runnable,
versionTopicName));
verify(runnable, never()).run();

// One leader before EOP and one leader after EOP and some follower
Map<Integer, PartitionConsumptionState> oneLeaderBeforeEOPAndOneLeaderAfterEOPPCSMap = new HashMap<>();
oneLeaderBeforeEOPAndOneLeaderAfterEOPPCSMap.put(0, pcsForLeaderBeforeEOP);
oneLeaderBeforeEOPAndOneLeaderAfterEOPPCSMap.put(1, pcsForLeaderAfterEOP);
oneLeaderBeforeEOPAndOneLeaderAfterEOPPCSMap.put(2, pcsForFollowerAfterEOP);
runnable = mock(Runnable.class);
assertFalse(
LeaderFollowerStoreIngestionTask.checkWhetherToCloseUnusedVeniceWriter(
veniceWriterWithInitializedValue1,
veniceWriterWithInitializedValue2,
oneLeaderBeforeEOPAndOneLeaderAfterEOPPCSMap,
runnable,
versionTopicName));
verify(runnable, never()).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.LeaderCompleteState;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
Expand Down Expand Up @@ -4300,6 +4301,8 @@ public void testMaybeSendIngestionHeartbeat(
doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong());
doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any());

doReturn(Lazy.of(() -> veniceWriter)).when(pcs).getVeniceWriterLazyRef();

StoreIngestionTaskFactory ingestionTaskFactory = TestUtils.getStoreIngestionTaskBuilder(storeName)
.setStorageMetadataService(mockStorageMetadataService)
.setMetadataRepository(mockReadOnlyStoreRepository)
Expand Down Expand Up @@ -4384,6 +4387,9 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter
VeniceWriterFactory veniceWriterFactory = mock(VeniceWriterFactory.class);
doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any());

doReturn(Lazy.of(() -> veniceWriter)).when(pcs0).getVeniceWriterLazyRef();
doReturn(Lazy.of(() -> veniceWriter)).when(pcs1).getVeniceWriterLazyRef();

StoreIngestionTaskFactory ingestionTaskFactory = TestUtils.getStoreIngestionTaskBuilder(storeName)
.setStorageMetadataService(mockStorageMetadataService)
.setMetadataRepository(mockReadOnlyStoreRepository)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,12 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
integerParam(cmd, Arg.MAX_NEARLINE_RECORD_SIZE_BYTES, params::setMaxNearlineRecordSizeBytes, argSet);
booleanParam(cmd, Arg.UNUSED_SCHEMA_DELETION_ENABLED, p -> params.setUnusedSchemaDeletionEnabled(p), argSet);
booleanParam(cmd, Arg.BLOB_TRANSFER_ENABLED, p -> params.setBlobTransferEnabled(p), argSet);
booleanParam(
cmd,
Arg.NEARLINE_PRODUCER_COMPRESSION_ENABLED,
p -> params.setNearlineProducerCompressionEnabled(p),
argSet);
integerParam(cmd, Arg.NEARLINE_PRODUCER_COUNT_PER_WRITER, p -> params.setNearlineProducerCountPerWriter(p), argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,15 @@ public enum Arg {
), RECOVER_CLUSTER("recover-cluster", "rc", true, "Cluster to recover from"),
BACKUP_FOLDER("backup-folder", "bf", true, "Backup folder path"),
DEBUG("debug", "d", false, "Print debugging messages for execute-data-recovery"),
BLOB_TRANSFER_ENABLED("blob-transfer-enabled", "bt", true, "Flag to indicate if the blob transfer is allowed or not");
BLOB_TRANSFER_ENABLED("blob-transfer-enabled", "bt", true, "Flag to indicate if the blob transfer is allowed or not"),
NEARLINE_PRODUCER_COMPRESSION_ENABLED(
"nearline-producer-compression-enabled", "npce", true,
"Flag to control whether KafkaProducer will use compression or not for nearline workload"
),
NEARLINE_PRODUCER_COUNT_PER_WRITER(
"nearline-producer-count-per-writer", "npcpw", true,
"How many producers will be used to write nearline workload in Server"
);

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import static com.linkedin.venice.Arg.MIN_COMPACTION_LAG_SECONDS;
import static com.linkedin.venice.Arg.NATIVE_REPLICATION_ENABLED;
import static com.linkedin.venice.Arg.NATIVE_REPLICATION_SOURCE_FABRIC;
import static com.linkedin.venice.Arg.NEARLINE_PRODUCER_COMPRESSION_ENABLED;
import static com.linkedin.venice.Arg.NEARLINE_PRODUCER_COUNT_PER_WRITER;
import static com.linkedin.venice.Arg.NON_INTERACTIVE;
import static com.linkedin.venice.Arg.NUM_VERSIONS_TO_PRESERVE;
import static com.linkedin.venice.Arg.OFFSET;
Expand Down Expand Up @@ -268,7 +270,8 @@ public enum Command {
ACTIVE_ACTIVE_REPLICATION_ENABLED, REGIONS_FILTER, DISABLE_META_STORE, DISABLE_DAVINCI_PUSH_STATUS_STORE,
STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS,
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED }
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED,
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2305,4 +2305,14 @@ private ConfigKeys() {
public static final String SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE =
"server.aa.wc.workload.parallel.processing.thread.pool.size";
public static final String SERVER_GLOBAL_RT_DIV_ENABLED = "server.global.rt.div.enabled";

/**
* Whether to enable producer throughput optimization for realtime workload or not.
* Two strategies:
* 1. Disable compression.
* 2. Utilizing multiple producers per write.
* These two options are controlled via store-level config.
*/
public static final String SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED =
"server.nearline.workload.producer.throughput.optimization.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,7 @@ public class ControllerApiConstants {
public static final String BLOB_TRANSFER_ENABLED = "blob_transfer_enabled";

public static final String HEARTBEAT_TIMESTAMP = "heartbeat_timestamp";

public static final String NEARLINE_PRODUCER_COMPRESSION_ENABLED = "nearline_producer_compression_enabled";
public static final String NEARLINE_PRODUCER_COUNT_PER_WRITER = "nearline_producer_count_per_writer";
}
Loading

0 comments on commit 30999ea

Please sign in to comment.