Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18058: Share group state record pruning impl. #18014

Merged
merged 38 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
546eda7
Prune records initial commit.
smjn Dec 3, 2024
dd575d4
added offsets manager class.
smjn Dec 3, 2024
3dc1798
replace config with boolean flag arg.
smjn Dec 3, 2024
9d19c0b
add deleteRecords method to partition writer.
smjn Dec 3, 2024
87f5a1d
add plumbing in share coordinator for purging.
smjn Dec 3, 2024
d566e22
review comments
smjn Dec 3, 2024
6ae8337
minor bug fix.
smjn Dec 4, 2024
c49b7db
shard unit tests.
smjn Dec 4, 2024
f616f85
partition writer unit tests.
smjn Dec 4, 2024
bea9566
added share coord service unit tests.
smjn Dec 4, 2024
b0d4cca
final touches, incorporated comments.
smjn Dec 4, 2024
9a342c2
Fixed documentation.
smjn Dec 4, 2024
e77c16d
added more unit tests, constraint to exception logging.
smjn Dec 4, 2024
6d889bc
added runtime method to fetch active tps.
smjn Dec 4, 2024
b30df09
logging improvements.
smjn Dec 4, 2024
f56477b
incorporated comments.
smjn Dec 5, 2024
055ef21
revert to using timeline hm in offsets manager.
smjn Dec 5, 2024
7e62122
Merge remote-tracking branch 'apache-kafka/trunk' into KAFKA-18058
smjn Dec 5, 2024
18a44e1
fixed javadocs.
smjn Dec 6, 2024
74dcd45
fixed tests.
smjn Dec 6, 2024
f254399
incorporated comments.
smjn Dec 6, 2024
a9a986e
Merge remote-tracking branch 'apache-kafka/trunk' into KAFKA-18058
smjn Dec 8, 2024
b68b014
add last red off method
smjn Dec 8, 2024
1cc890a
some debug logs.
smjn Dec 8, 2024
e1a38ab
change repeated return values for last red offset.
smjn Dec 9, 2024
ae6333a
incorporated comments.
smjn Dec 9, 2024
8244bdc
revert changes
smjn Dec 9, 2024
c798254
reschedule even on errors.
smjn Dec 9, 2024
75eb4d2
inc review comments.
smjn Dec 9, 2024
be0310f
fixed javadoc
smjn Dec 9, 2024
a5aa93f
replaced timeline object with atomic boolean
smjn Dec 10, 2024
78d8c49
updated comments
smjn Dec 10, 2024
9714ce6
Revert "updated comments"
smjn Dec 10, 2024
7e420cb
updated docs
smjn Dec 10, 2024
e3db1dc
incorporated comments.
smjn Dec 10, 2024
1d60249
incorporated comments.
smjn Dec 11, 2024
502b5e7
remove part index, instead of clearing memo.
smjn Dec 11, 2024
d55abdd
incorporated comments.
smjn Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -2460,4 +2461,23 @@ public void close() throws Exception {
Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}

/**
* Util method which returns all the topic partitions for which
* the state machine is in active state.
* <p>
* This could be useful if the caller does not have a specific
* target internal topic partition.
* @return List of {@link TopicPartition} whose coordinators are active
*/
public List<TopicPartition> activeTopicPartitions() {
if (coordinators == null || coordinators.isEmpty()) {
return Collections.emptyList();
}

return coordinators.entrySet().stream()
.filter(entry -> entry.getValue().state.equals(CoordinatorState.ACTIVE))
.map(Map.Entry::getKey)
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,15 @@ CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
short producerEpoch,
short apiVersion
) throws KafkaException;

/**
* Delete records from a topic partition until specified offset
* @param tp The partition to delete records from
* @param deleteBeforeOffset Offset to delete until, starting from the beginning
* @throws KafkaException Any KafkaException caught during the operation.
*/
CompletableFuture<Void> deleteRecords(
TopicPartition tp,
long deleteBeforeOffset
) throws KafkaException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ public long append(
}
}

@Override
public CompletableFuture<Void> deleteRecords(
TopicPartition tp,
long deleteBeforeOffset
) throws KafkaException {
throw new RuntimeException("method not implemented");
}

@Override
public CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
TopicPartition tp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
// Required offset.
partitionResult.lastOffset + 1
}

override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {
val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()

replicaManager.deleteRecords(
timeout = 30000L, // 30 seconds.
offsetPerPartition = Map(tp -> deleteBeforeOffset),
responseCallback = results => {
val result = results.get(tp)
if (result.isEmpty) {
responseFuture.completeExceptionally(new IllegalStateException(s"Delete status $result should have partition $tp."))
} else if (result.get.errorCode != Errors.NONE.code) {
responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception)
} else {
responseFuture.complete(null)
}
},
allowInternalTopicDeletion = true
)
responseFuture
}
}
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
*/
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
// reject delete records operation on internal topics
if (Topic.isInternal(topicPartition.topic)) {
// reject delete records operation for internal topics unless allowInternalTopicDeletion is true
if (Topic.isInternal(topicPartition.topic) && !allowInternalTopicDeletion) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
Expand Down Expand Up @@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,

def deleteRecords(timeout: Long,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a unit test for this new flag in ReplicaManagerTest.

offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit): Unit = {
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit,
allowInternalTopicDeletion: Boolean = false): Unit = {
val timeBeforeLocalDeleteRecords = time.milliseconds
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))

val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand Down Expand Up @@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest {
batch
))
}

@Test
def testDeleteRecordsResponseContainsError(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// Response contains error.
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code
)))
}

partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp)
}

// Empty response
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map[TopicPartition, DeleteRecordsPartitionResult]())
}

partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertTrue(exp.isInstanceOf[IllegalStateException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a topic doesn't exist, we should get an unknown topic exception.

}
}

@Test
def testDeleteRecordsSuccess(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// response contains error
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NONE.code)
))
}

partitionRecordWriter.deleteRecords(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertNull(exp)
}
}
}
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
Expand Down Expand Up @@ -11699,7 +11699,7 @@ class KafkaApisTest extends Logging {

val response = getReadShareGroupResponse(
readRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
readStateResultData
Expand Down Expand Up @@ -11754,7 +11754,7 @@ class KafkaApisTest extends Logging {

val response = getReadShareGroupResponse(
readRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
readStateResultData
Expand Down Expand Up @@ -11809,7 +11809,7 @@ class KafkaApisTest extends Logging {

val response = getWriteShareGroupResponse(
writeRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
writeStateResultData
Expand Down Expand Up @@ -11864,7 +11864,7 @@ class KafkaApisTest extends Logging {

val response = getWriteShareGroupResponse(
writeRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
writeStateResultData
Expand Down
58 changes: 57 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartit
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.{DeleteRecordsResponseData, LeaderAndIsrRequestData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
Expand Down Expand Up @@ -6660,6 +6661,61 @@ class ReplicaManagerTest {
}
}

@Test
def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are lots of existing usage of rm.becomeLeaderOrFollower. It would be useful to clean them up in a followup jira.

Copy link
Contributor Author

@smjn smjn Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps

val localId = 1
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
val directoryEventHandler = mock(classOf[DirectoryEventHandler])

val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
val directoryIds = rm.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
None)

def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code)
}

// default internal topics delete disabled
rm.deleteRecords(
timeout = 0L,
Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L),
responseCallback = callback
)
}

@Test
def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = {
val localId = 1
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
val directoryEventHandler = mock(classOf[DirectoryEventHandler])

val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
val directoryIds = rm.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
None)

def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
assert(responseStatus.values.head.errorCode == Errors.NONE.code)
}

// internal topics delete allowed
rm.deleteRecords(
timeout = 0L,
Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L),
responseCallback = callback,
allowInternalTopicDeletion = true
)
}

private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.";

public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms";
public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
Expand All @@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);

private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
Expand All @@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize;
private final CompressionType compressionType;
private final int appendLingerMs;
private final int pruneIntervalMs;


public ShareCoordinatorConfig(AbstractConfig config) {
Expand All @@ -108,6 +115,7 @@ public ShareCoordinatorConfig(AbstractConfig config) {
.map(CompressionType::forId)
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
validate();
}

Expand Down Expand Up @@ -151,6 +159,10 @@ public CompressionType shareCoordinatorStateTopicCompressionType() {
return compressionType;
}

public int shareCoordinatorTopicPruneIntervalMs() {
return pruneIntervalMs;
}

private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
Expand Down
Loading
Loading