Skip to content

Commit

Permalink
KAFKA-18058: Share group state record pruning impl. (apache#18014)
Browse files Browse the repository at this point in the history
In this PR, we've added a class ShareCoordinatorOffsetsManager, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job in ShareCoordinatorService which queries for the redundant offset at regular intervals and if a valid value is found, issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In this way the size of the partitions is kept manageable.

Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>, Andrew Schofield <[email protected]>
  • Loading branch information
smjn authored and peterxcli committed Dec 18, 2024
1 parent 5485475 commit d595eee
Show file tree
Hide file tree
Showing 16 changed files with 1,175 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -2468,4 +2469,23 @@ public void close() throws Exception {
Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}

/**
* Util method which returns all the topic partitions for which
* the state machine is in active state.
* <p>
* This could be useful if the caller does not have a specific
* target internal topic partition.
* @return List of {@link TopicPartition} whose coordinators are active
*/
public List<TopicPartition> activeTopicPartitions() {
if (coordinators == null || coordinators.isEmpty()) {
return Collections.emptyList();
}

return coordinators.entrySet().stream()
.filter(entry -> entry.getValue().state.equals(CoordinatorState.ACTIVE))
.map(Map.Entry::getKey)
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,15 @@ CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
short producerEpoch,
short apiVersion
) throws KafkaException;

/**
* Delete records from a topic partition until specified offset
* @param tp The partition to delete records from
* @param deleteBeforeOffset Offset to delete until, starting from the beginning
* @throws KafkaException Any KafkaException caught during the operation.
*/
CompletableFuture<Void> deleteRecords(
TopicPartition tp,
long deleteBeforeOffset
) throws KafkaException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ public long append(
}
}

@Override
public CompletableFuture<Void> deleteRecords(
TopicPartition tp,
long deleteBeforeOffset
) throws KafkaException {
throw new RuntimeException("method not implemented");
}

@Override
public CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
TopicPartition tp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
// Required offset.
partitionResult.lastOffset + 1
}

override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {
val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()

replicaManager.deleteRecords(
timeout = 30000L, // 30 seconds.
offsetPerPartition = Map(tp -> deleteBeforeOffset),
responseCallback = results => {
val result = results.get(tp)
if (result.isEmpty) {
responseFuture.completeExceptionally(new IllegalStateException(s"Delete status $result should have partition $tp."))
} else if (result.get.errorCode != Errors.NONE.code) {
responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode).exception)
} else {
responseFuture.complete(null)
}
},
allowInternalTopicDeletion = true
)
responseFuture
}
}
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
*/
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
// reject delete records operation on internal topics
if (Topic.isInternal(topicPartition.topic)) {
// reject delete records operation for internal topics unless allowInternalTopicDeletion is true
if (Topic.isInternal(topicPartition.topic) && !allowInternalTopicDeletion) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
Expand Down Expand Up @@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,

def deleteRecords(timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit): Unit = {
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit,
allowInternalTopicDeletion: Boolean = false): Unit = {
val timeBeforeLocalDeleteRecords = time.milliseconds
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))

val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand Down Expand Up @@ -238,4 +239,83 @@ class CoordinatorPartitionWriterTest {
batch
))
}

@Test
def testDeleteRecordsResponseContainsError(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// Response contains error.
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code
)))
}

partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception, exp)
}

// Empty response
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map[TopicPartition, DeleteRecordsPartitionResult]())
}

partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertTrue(exp.isInstanceOf[IllegalStateException])
}
}

@Test
def testDeleteRecordsSuccess(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// response contains error
when(replicaManager.deleteRecords(
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer { _ =>
callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NONE.code)
))
}

partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L
).whenComplete { (_, exp) =>
assertNull(exp)
}
}
}
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
Expand Down Expand Up @@ -11702,7 +11702,7 @@ class KafkaApisTest extends Logging {

val response = getReadShareGroupResponse(
readRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
readStateResultData
Expand Down Expand Up @@ -11757,7 +11757,7 @@ class KafkaApisTest extends Logging {

val response = getReadShareGroupResponse(
readRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
readStateResultData
Expand Down Expand Up @@ -11812,7 +11812,7 @@ class KafkaApisTest extends Logging {

val response = getWriteShareGroupResponse(
writeRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
writeStateResultData
Expand Down Expand Up @@ -11867,7 +11867,7 @@ class KafkaApisTest extends Logging {

val response = getWriteShareGroupResponse(
writeRequestData,
config ++ ShareCoordinatorConfigTest.testConfigMap().asScala,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
writeStateResultData
Expand Down
58 changes: 57 additions & 1 deletion core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartit
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.{DeleteRecordsResponseData, LeaderAndIsrRequestData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
Expand Down Expand Up @@ -6660,6 +6661,61 @@ class ReplicaManagerTest {
}
}

@Test
def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = {
val localId = 1
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
val directoryEventHandler = mock(classOf[DirectoryEventHandler])

val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
val directoryIds = rm.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
None)

def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
assert(responseStatus.values.head.errorCode == Errors.INVALID_TOPIC_EXCEPTION.code)
}

// default internal topics delete disabled
rm.deleteRecords(
timeout = 0L,
Map[TopicPartition, Long](topicPartition0.topicPartition() -> 10L),
responseCallback = callback
)
}

@Test
def testDeleteRecordsInternalTopicDeleteAllowed(): Unit = {
val localId = 1
val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, Topic.GROUP_METADATA_TOPIC_NAME)
val directoryEventHandler = mock(classOf[DirectoryEventHandler])

val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler)
val directoryIds = rm.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, isStartIdLeader = true, directoryIds = directoryIds)
val (partition: Partition, _) = rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)),
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
None)

def callback(responseStatus: Map[TopicPartition, DeleteRecordsResponseData.DeleteRecordsPartitionResult]): Unit = {
assert(responseStatus.values.head.errorCode == Errors.NONE.code)
}

// internal topics delete allowed
rm.deleteRecords(
timeout = 0L,
Map[TopicPartition, Long](topicPartition0.topicPartition() -> 0L),
responseCallback = callback,
allowInternalTopicDeletion = true
)
}

private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.";

public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms";
public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
Expand All @@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);

private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
Expand All @@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize;
private final CompressionType compressionType;
private final int appendLingerMs;
private final int pruneIntervalMs;


public ShareCoordinatorConfig(AbstractConfig config) {
Expand All @@ -108,6 +115,7 @@ public ShareCoordinatorConfig(AbstractConfig config) {
.map(CompressionType::forId)
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
validate();
}

Expand Down Expand Up @@ -151,6 +159,10 @@ public CompressionType shareCoordinatorStateTopicCompressionType() {
return compressionType;
}

public int shareCoordinatorTopicPruneIntervalMs() {
return pruneIntervalMs;
}

private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
Expand Down
Loading

0 comments on commit d595eee

Please sign in to comment.