-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18058: Share group state record pruning impl. #18014
Conversation
// reject delete records operation on internal topics except for allow listed ones | ||
if (Topic.isInternal(topicPartition.topic) && !config.internalTopicsRecordDeleteAllowList.contains(topicPartition.topic)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, this is the wrong approach because it will allow users of the cluster to delete too. When I suggested it, I thought that we would add a boolean to the method, e.g. allowInternalTopics, and use it from the component doing the deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood, will rectify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I have done a partial review and left some comments.
@@ -71,6 +72,10 @@ public class ShareCoordinatorConfig { | |||
public static final int APPEND_LINGER_MS_DEFAULT = 10; | |||
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk."; | |||
|
|||
public static final String STATE_TOPIC_PRUNE_INTERVAL = "share.coordinator.state.topic.prune.interval.ms"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG
I think. All three of these should have _MS_
as part of the name.
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition)) | ||
offsetPerPartition.map { case (topicPartition, requestedOffset) => | ||
// reject delete records operation on internal topics | ||
if (Topic.isInternal(topicPartition.topic)) { | ||
// reject delete records operation for internal topics if allowInternalTopicDeletion is false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably "unless allowInternalTopicDeletion is true" is clearer.
/** | ||
* Delete records from a topic partition until specified offset | ||
* @param tp The partition to delete records from | ||
* @param deleteUntilOffset Offset to delete until, starting from the beginning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest deleteBeforeOffset
rather than deleteUntilOffset
. The latter is clearly non-inclusive, while the latter is a bit more ambiguous. I think the effect we want here is that if I provide offset 10, then offsets up to and including 9 may be deleted, but not 10.
void deleteRecords( | ||
TopicPartition tp, | ||
long deleteUntilOffset, | ||
boolean allowInternalTopicDeletion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter is missing from the javadoc.
timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { | ||
@Override | ||
public void run() { | ||
for (int i = 0; i < numPartitions; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would It not be the case that each shard should set up the pruning for its owned partitions? Shouldn't the pruning stop when a shard loses leadership of a partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - the shard (state machine) does not maintain that mapping. The information is maintained by the runtime which calls the appropriate shards, based on the context. This information is not exposed outside. We need access to https://github.com/apache/kafka/blob/trunk/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java#L1877 to expose this information. Even then the shard cannot do this.
The runtime is encapsulated in the ShareCoordinatorService and only it can issue calls to the runtime. The Shard only serves to provide data related to partitions.
Using the loop approach - for a specific internal topic-partition only the correct Shard will honour the request and the others will fail silently due to NOT_COORDINATOR.
Flow is
ShareCoordinatorShard.callback
|
|
add task task with correct shard
ShareCoordinatorService ----> Runtime -----> ==================== ----> EventProcessor
| QUEUE
|
obtain shard from TP in task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should expose a method to get the list of active state machines/shards from the runtime. This would allow you to just iterate on it instead of having to list all the possibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've been thinking about this. I'm not entirely comfortable with every broker starting a timer for every partition. I know it's harmless, but it's not exactly elegant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dajac we don't need the coordinators, but the list of topic partitions whose coordinators are ACTIVE at the point of execution of the timer job.
CoordinatorRuntime
public List<TopicPartition> activeTopicPartitions() {
return coordinators.entrySet().stream()
.filter(entry -> entry.getValue().state.equals(CoordinatorState.ACTIVE))
.map(Map.Entry::getKey)
.toList();
}
Caller:
activeTopicPartitions.forEach(tp -> runtime.scheudle(tp, ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewJSchofield @dajac removed the loop in favor of active tps
if (exception != null) { | ||
log.error("Last redundant offset lookup threw an error.", exception); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I suggest to handle the known errors such as NOT_COORDINATOR, COORDINATOR_LOADING, etc. As you optimistically send the write operations to all the possible shards, you will get many "unknown" ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the logging for above errors - they will happen in every case. Since they timer job is periodic, we do not need any special handling anyway.
timeout = 0L, | ||
offsetPerPartition = Map(tp -> deleteBeforeOffset), | ||
responseCallback = results => deleteResults = results, | ||
allowInternalTopicDeletion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could just set it to true
here as this class is only used by the coordinator.
assertEquals(1, manager.curState().size()); | ||
verify(manager, times(5)).purge(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We usually have an empty line at the end.
This reverts commit 78d8c49.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn : Thanks for the updated PR. Made a pass of all files. A few more comments.
new TopicPartition("random-topic", 0), | ||
10L | ||
).whenComplete { (_, exp) => | ||
assertTrue(exp.isInstanceOf[IllegalStateException]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a topic doesn't exist, we should get an unknown topic exception.
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, | ||
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) | ||
|
||
rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
becomeLeaderOrFollower
is only used in ZK based controller, which won't be supported in 4.0. We need to use the cod path for KRaft based controller.
List.of( | ||
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.empty()), | ||
ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.empty()), | ||
ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test result is counter intuitive. I'd expect lastRedundantOffset
for all three to be 10L since we should be able to truncate the log at offset 10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small efficiency gain here as the offsets will be applied in increasing order (partition records auto inc offset) to the updateState
method. If the case is such that 10L is the smallest one - it means there are no offsets smaller than that present in the topic partition, hence returning 10L will be of no consequence and save an extra deleteRecords
call.
In the algorithm we have chosen to get at least 2 offsets for any key before exposing the redundant offset.
Also since we are exposing the offset only once and then setting the boolean flag, the 2nd and 3rd line should be empty
.
), | ||
|
||
new ShareOffsetTestHolder( | ||
"redundant state cold partition", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does cold partition mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
infrequently written
@@ -274,6 +290,7 @@ public void testReadStateSuccess() throws ExecutionException, InterruptedExcepti | |||
))) | |||
); | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra new line
@Test | ||
public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception { | ||
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); | ||
org.apache.kafka.server.util.MockTime time = new org.apache.kafka.server.util.MockTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we import MockTime?
* After returning the value once, the redundant offset is reset. | ||
* @return Optional of type Long representing the offset or empty for invalid offset values | ||
*/ | ||
public Optional<Long> lastRedundantOffset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is ok, but is very customized for the usage in the current only caller. If there is another caller, it can unexpectedly break the existing caller. A better api is probably to always expose the lastRedundantOffset and let the caller handle the case when the same value is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn : Thanks for the updated PR. A few more comments.
public ShareCoordinatorOffsetsManager(SnapshotRegistry snapshotRegistry) { | ||
Objects.requireNonNull(snapshotRegistry); | ||
offsets = new TimelineHashMap<>(snapshotRegistry, 0); | ||
minOffset = new TimelineLong(snapshotRegistry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minOffset => lastRedundantOffset ?
minOffset.set(Math.min(minOffset.get(), offset)); | ||
offsets.put(key, offset); | ||
|
||
Optional<Long> deleteTillOffset = findRedundantOffset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteTillOffset => redundantOffset ?
if (result.isPresent()) { | ||
Long off = result.get(); | ||
// Guard and optimization. | ||
if (off == Long.MAX_VALUE || off <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test seems redundant since ShareCoordinatorOffsetsManager.lastRedundantOffset
does that already.
@@ -240,9 +251,96 @@ public void startup( | |||
|
|||
log.info("Starting up."); | |||
numPartitions = shareGroupTopicPartitionCount.getAsInt(); | |||
Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- offsets => lastPrunedOffsets?
- Would it be better to make that an instance val so that we don't have to pass it around?
- Should we remove entries when
onResignation
is called?
@@ -6660,6 +6661,61 @@ class ReplicaManagerTest { | |||
} | |||
} | |||
|
|||
@Test | |||
def testDeleteRecordsInternalTopicDeleteDisallowed(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are lots of existing usage of rm.becomeLeaderOrFollower
. It would be useful to clean them up in a followup jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps
private CompletableFuture<Void> performRecordPruning(TopicPartition tp, Map<TopicPartition, Long> offsets) { | ||
// This future will always be completed normally, exception or not. | ||
CompletableFuture<Void> fut = new CompletableFuture<>(); | ||
runtime.scheduleWriteOperation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call doesn't do any writes in runtime. Should we use scheduleReadOperation? Similarly, I also don't understand why readState
calls runtime.scheduleWriteOperation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the write operation used here is for the write consistency offered by the method.
The ShareCoordinatorShard.replay
calls offsetsManager.updateState
with various last written offset values. The replay
method itself is called when other write RPCs produce records. However, it does not mean the offset set in replay has been committed.
Now, the coordinator enqueues the write operations in a queue and guarantees that when the scheduleWriteOperation
completes, the records it generated have been replicated, even those which were written before it.
The framework however, gives no consistency guarantees between write and read operations. Consider, a write op writing an offset into the offset manager. We only know that this offset is written but not replicated. A subsequent read could give us back the same offset but still there is no guarantee that this offset has been replicated. It is only when the next write operation completes, do we have a guarantee that the previous offset has been committed.
This was extensively discussed with the coordinator framework owner @dajac and we arrived at this solution.
fut.complete(null); | ||
// Update offsets map as we do not want to | ||
// issue repeated deleted | ||
offsets.put(tp, off); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is called from the purgatory thread, it's possible when this call occurs, the partition leader has already resigned and we need to handle that accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't foresee any consistency issues with that - at max a repeated delete call might be made which is acceptable since the frequency of these calls is very low (in minutes).
Whoever the leader, the partition offsets will not change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn : Thanks for the updated PR. Just one more comment.
@@ -543,6 +629,7 @@ public void onElection(int partitionIndex, int partitionLeaderEpoch) { | |||
@Override | |||
public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) { | |||
throwIfNotActive(); | |||
lastPrunedOffsets.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only clear the entry for partitionIndex, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn : Thanks for the updated PR. LGTM. @AndrewJSchofield and @dajac, any other comments from you?
@junrao No. All good to me. Thanks! |
@junrao Thanks for the review. I'll take another look this evening and merge once I'm happy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A handful of final review comments. I've been running with the pruning enabled and it's happily deleting records as intended.
if (result.isPresent()) { | ||
Long off = result.get(); | ||
|
||
if (lastPrunedOffsets.containsKey(tp) && Objects.equals(lastPrunedOffsets.get(tp), off)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest lastPrunedOffsets.get(tp).longValue() == off
. Using the generic object equality method seems odd for just a pair of longs.
Time time) { | ||
Time time, | ||
Timer timer, | ||
PartitionWriter writer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: New line please so the arguments and the method body do not run into each other.
log.info("Startup complete."); | ||
} | ||
|
||
private void setupRecordPruning() { | ||
log.info("Scheduling share state topic prune job."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
share-group state topic
is what we use in most places.
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) | ||
.whenComplete((res, exp) -> { | ||
if (exp != null) { | ||
log.error("Received error in share state topic prune.", exp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
share-group state topic
Long off = result.get(); | ||
|
||
if (lastPrunedOffsets.containsKey(tp) && Objects.equals(lastPrunedOffsets.get(tp), off)) { | ||
log.debug("{} already pruned at offset {}", tp, off); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've used till
in most places, so I'd replace the at
here too.
@@ -50,6 +50,7 @@ private static Map<String, String> testConfigMapRaw() { | |||
configs.put(ShareCoordinatorConfig.LOAD_BUFFER_SIZE_CONFIG, "555"); | |||
configs.put(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, "10"); | |||
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG, String.valueOf(CompressionType.NONE.id)); | |||
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, "30000"); // 30 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class doesn't contain any tests, so calling it XYZTest is peculiar. Please rename to ShareCoordinatorTestUtils
or similar.
@AndrewJSchofield Thanks for the review, incorporated comments. |
} | ||
fut.complete(null); | ||
// Best effort prevention of issuing duplicate delete calls. | ||
lastPrunedOffsets.put(tp, off); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach is ok, but it breaks the pattern in CoordinatorRuntime that all internal states are updated by CoordinatorRuntime threads since lastPrunedOffsets is now updated by the request io threads. We probably could follow the approach of how CoordinatorRuntime waits for a record to be replicated. It appends the record to local log and registers a PartitionListener
for onHighWatermarkUpdated()
. Once a new HWM is received, CoordinatorRuntime.onHighWatermarkUpdated
enqueues a CoordinatorInternalEvent
, the processing of which will trigger the update of the internal state. We could follow a similar approach for LowWaterMark
. This can be done in a follow up jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While my initial approach was to add functionality in CoordinatorRuntime
- the requirement wasn't general enough to modify the runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure this is not the final state of this code, but it's definitely a fine starting point.
In this PR, we've added a class ShareCoordinatorOffsetsManager, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job in ShareCoordinatorService which queries for the redundant offset at regular intervals and if a valid value is found, issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In this way the size of the partitions is kept manageable. Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>, Andrew Schofield <[email protected]>
In this PR, we've added a class ShareCoordinatorOffsetsManager, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job in ShareCoordinatorService which queries for the redundant offset at regular intervals and if a valid value is found, issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In this way the size of the partitions is kept manageable. Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>, Andrew Schofield <[email protected]>
What
ShareCoordinatorOffsetsManager
, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job inShareCoordinatorService
which queries for the redundant offset at regular intervals and if a valid value is found, issues thedeleteRecords
call to theReplicaManager
via thePartitionWriter
. In this way the size of the partitions is kept manageable.share.coordinator.state.topic.prune.interval.ms
(default 5 mins).Why
SharePartition
invokes theDefaultStatePersister
to write data into the internal share group topic__share_group_state
. This topic is not eligible for compaction.__share_group_state
will be populated with gigantic amount of records.ShareCoordinatorShard
resulting in extensive latency during startup.Testing
CoordinatorPartitionWriter
,ShareCoordinatorOffsetsManager
,ShareCoordinatorService
andShareCoordinatorShard
.Sample o/p
Broker logs
Records before prune
Records after prune