Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18058: Share group state record pruning impl. #18014

Merged
merged 38 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
546eda7
Prune records initial commit.
smjn Dec 3, 2024
dd575d4
added offsets manager class.
smjn Dec 3, 2024
3dc1798
replace config with boolean flag arg.
smjn Dec 3, 2024
9d19c0b
add deleteRecords method to partition writer.
smjn Dec 3, 2024
87f5a1d
add plumbing in share coordinator for purging.
smjn Dec 3, 2024
d566e22
review comments
smjn Dec 3, 2024
6ae8337
minor bug fix.
smjn Dec 4, 2024
c49b7db
shard unit tests.
smjn Dec 4, 2024
f616f85
partition writer unit tests.
smjn Dec 4, 2024
bea9566
added share coord service unit tests.
smjn Dec 4, 2024
b0d4cca
final touches, incorporated comments.
smjn Dec 4, 2024
9a342c2
Fixed documentation.
smjn Dec 4, 2024
e77c16d
added more unit tests, constraint to exception logging.
smjn Dec 4, 2024
6d889bc
added runtime method to fetch active tps.
smjn Dec 4, 2024
b30df09
logging improvements.
smjn Dec 4, 2024
f56477b
incorporated comments.
smjn Dec 5, 2024
055ef21
revert to using timeline hm in offsets manager.
smjn Dec 5, 2024
7e62122
Merge remote-tracking branch 'apache-kafka/trunk' into KAFKA-18058
smjn Dec 5, 2024
18a44e1
fixed javadocs.
smjn Dec 6, 2024
74dcd45
fixed tests.
smjn Dec 6, 2024
f254399
incorporated comments.
smjn Dec 6, 2024
a9a986e
Merge remote-tracking branch 'apache-kafka/trunk' into KAFKA-18058
smjn Dec 8, 2024
b68b014
add last red off method
smjn Dec 8, 2024
1cc890a
some debug logs.
smjn Dec 8, 2024
e1a38ab
change repeated return values for last red offset.
smjn Dec 9, 2024
ae6333a
incorporated comments.
smjn Dec 9, 2024
8244bdc
revert changes
smjn Dec 9, 2024
c798254
reschedule even on errors.
smjn Dec 9, 2024
75eb4d2
inc review comments.
smjn Dec 9, 2024
be0310f
fixed javadoc
smjn Dec 9, 2024
a5aa93f
replaced timeline object with atomic boolean
smjn Dec 10, 2024
78d8c49
updated comments
smjn Dec 10, 2024
9714ce6
Revert "updated comments"
smjn Dec 10, 2024
7e420cb
updated docs
smjn Dec 10, 2024
e3db1dc
incorporated comments.
smjn Dec 10, 2024
1d60249
incorporated comments.
smjn Dec 11, 2024
502b5e7
remove part index, instead of clearing memo.
smjn Dec 11, 2024
d55abdd
incorporated comments.
smjn Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,17 @@ CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
short producerEpoch,
short apiVersion
) throws KafkaException;

/**
* Delete records from a topic partition until specified offset
* @param tp The partition to delete records from
* @param deleteBeforeOffset Offset to delete until, starting from the beginning
* @param allowInternalTopicDeletion Boolean indicating whether to allow delete operation on internal topics
* @throws KafkaException Any KafkaException caught during the operation.
*/
void deleteRecords(
TopicPartition tp,
long deleteBeforeOffset,
boolean allowInternalTopicDeletion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is missing from the javadoc.

) throws KafkaException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ public long append(
}
}

@Override
public void deleteRecords(
TopicPartition tp,
long deleteBeforeOffset,
boolean allowInternalTopicDeletion
) throws KafkaException {
throw new RuntimeException("method not implemented");
}

@Override
public CompletableFuture<VerificationGuard> maybeStartTransactionVerification(
TopicPartition tp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.coordinator.group
import kafka.cluster.PartitionListener
import kafka.server.{ReplicaManager, defaultError, genericError}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
Expand Down Expand Up @@ -165,4 +166,21 @@ class CoordinatorPartitionWriter(
// Required offset.
partitionResult.lastOffset + 1
}

override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long, allowInternalTopicDeletion: Boolean): Unit = {
var deleteResults: Map[TopicPartition, DeleteRecordsPartitionResult] = Map.empty
replicaManager.deleteRecords(
timeout = 0L,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have doubts about using timeout=0 here. My understanding is that the it will delete records from the local logs but it wont wait on the replication to acknowledges the deletion from the replicas. Hence, I suppose that it will always throw an error. I suppose that we don't really care about the replication in this case but it may spam the logs with unnecessary errors.

Copy link
Contributor Author

@smjn smjn Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot compare with append records in this case because they don't work the sam way. In append records, the key is that we use requiredAcks = 1 which means that the operation does not go to the purgatory at all. In the delete record case, it goes to the purgatory in all cases.

With the timeout, I believe that the following code will actually throw a IllegalStateException because the operation in the purgatory won't be completed when you reach it.

    val partitionResult = deleteResults.getOrElse(tp,
      throw new IllegalStateException(s"Delete status $deleteResults should have partition $tp."))

Could you please double check?

One option may be to make delete records return a future and complete it when the callback returns. This would also allow you to not trigger the next purge while the current one is not completed.

offsetPerPartition = Map(tp -> deleteBeforeOffset),
responseCallback = results => deleteResults = results,
allowInternalTopicDeletion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could just set it to true here as this class is only used by the coordinator.

)

val partitionResult = deleteResults.getOrElse(tp,
throw new IllegalStateException(s"Delete status $deleteResults should have partition $tp."))

if (partitionResult.errorCode() != Errors.NONE.code()) {
throw Errors.forCode(partitionResult.errorCode()).exception()
}
}
}
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1172,11 +1172,11 @@ class ReplicaManager(val config: KafkaConfig,
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
*/
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long], allowInternalTopicDeletion: Boolean): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
// reject delete records operation on internal topics
if (Topic.isInternal(topicPartition.topic)) {
// reject delete records operation for internal topics unless allowInternalTopicDeletion is true
if (Topic.isInternal(topicPartition.topic) && !allowInternalTopicDeletion) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
Expand Down Expand Up @@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,

def deleteRecords(timeout: Long,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a unit test for this new flag in ReplicaManagerTest.

offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit): Unit = {
responseCallback: Map[TopicPartition, DeleteRecordsPartitionResult] => Unit,
allowInternalTopicDeletion: Boolean = false): Unit = {
val timeBeforeLocalDeleteRecords = time.milliseconds
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition, allowInternalTopicDeletion)
debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))

val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import kafka.server.ReplicaManager
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand Down Expand Up @@ -238,4 +239,102 @@ class CoordinatorPartitionWriterTest {
batch
))
}

@Test
def testDeleteRecordsResponseMissingTopicPartition(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// response does not contain topic partition supplied
when(replicaManager.deleteRecords(
ArgumentMatchers.eq(0L),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer(_ => {
callbackCapture.getValue.apply(Map(
new TopicPartition("other-topic", 0) -> new DeleteRecordsPartitionResult()
))
})

assertThrows(
classOf[IllegalStateException],
() => partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L,
allowInternalTopicDeletion = true
)
)
}

@Test
def testDeleteRecordsResponseContainsError(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// response contains error
when(replicaManager.deleteRecords(
ArgumentMatchers.eq(0L),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer(_ => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
))
})

assertThrows(
Errors.NOT_LEADER_OR_FOLLOWER.exception().getClass,
() => partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L,
allowInternalTopicDeletion = true
)
)
}

@Test
def testDeleteRecordsSuccess(): Unit = {
val replicaManager = mock(classOf[ReplicaManager])
val partitionRecordWriter = new CoordinatorPartitionWriter(
replicaManager
)

val callbackCapture: ArgumentCaptor[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, DeleteRecordsPartitionResult] => Unit])

// response contains error
when(replicaManager.deleteRecords(
ArgumentMatchers.eq(0L),
ArgumentMatchers.any(),
callbackCapture.capture(),
ArgumentMatchers.eq(true)
)).thenAnswer(_ => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

callbackCapture.getValue.apply(Map(
new TopicPartition("random-topic", 0) -> new DeleteRecordsPartitionResult()
.setErrorCode(Errors.NONE.code())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

))
})

assertDoesNotThrow(() => {
partitionRecordWriter.deleteRecords(
new TopicPartition("random-topic", 0),
10L,
allowInternalTopicDeletion = true
)
true
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.";

public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms";
public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
Expand All @@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC);

private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
Expand All @@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize;
private final CompressionType compressionType;
private final int appendLingerMs;
private final int pruneIntervalMs;


public ShareCoordinatorConfig(AbstractConfig config) {
Expand All @@ -108,6 +115,7 @@ public ShareCoordinatorConfig(AbstractConfig config) {
.map(CompressionType::forId)
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
validate();
}

Expand Down Expand Up @@ -151,6 +159,10 @@ public CompressionType shareCoordinatorStateTopicCompressionType() {
return compressionType;
}

public int shareCoordinatorTopicPruneIntervalMs() {
return pruneIntervalMs;
}

private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && snapshotUpdateRecordsPerSnapshot <= 500,
String.format("%s must be between [0, 500]", SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
Expand Down
Loading
Loading