Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18058: Share group state record pruning impl. #18014

Merged
merged 38 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
546eda7
Prune records initial commit.
smjn Dec 3, 2024
dd575d4
added offsets manager class.
smjn Dec 3, 2024
3dc1798
replace config with boolean flag arg.
smjn Dec 3, 2024
9d19c0b
add deleteRecords method to partition writer.
smjn Dec 3, 2024
87f5a1d
add plumbing in share coordinator for purging.
smjn Dec 3, 2024
d566e22
review comments
smjn Dec 3, 2024
6ae8337
minor bug fix.
smjn Dec 4, 2024
c49b7db
shard unit tests.
smjn Dec 4, 2024
f616f85
partition writer unit tests.
smjn Dec 4, 2024
bea9566
added share coord service unit tests.
smjn Dec 4, 2024
b0d4cca
final touches, incorporated comments.
smjn Dec 4, 2024
9a342c2
Fixed documentation.
smjn Dec 4, 2024
e77c16d
added more unit tests, constraint to exception logging.
smjn Dec 4, 2024
6d889bc
added runtime method to fetch active tps.
smjn Dec 4, 2024
b30df09
logging improvements.
smjn Dec 4, 2024
f56477b
incorporated comments.
smjn Dec 5, 2024
055ef21
revert to using timeline hm in offsets manager.
smjn Dec 5, 2024
7e62122
Merge remote-tracking branch 'apache-kafka/trunk' into KAFKA-18058
smjn Dec 5, 2024
18a44e1
fixed javadocs.
smjn Dec 6, 2024
74dcd45
fixed tests.
smjn Dec 6, 2024
f254399
incorporated comments.
smjn Dec 6, 2024
a9a986e
Merge remote-tracking branch 'apache-kafka/trunk' into KAFKA-18058
smjn Dec 8, 2024
b68b014
add last red off method
smjn Dec 8, 2024
1cc890a
some debug logs.
smjn Dec 8, 2024
e1a38ab
change repeated return values for last red offset.
smjn Dec 9, 2024
ae6333a
incorporated comments.
smjn Dec 9, 2024
8244bdc
revert changes
smjn Dec 9, 2024
c798254
reschedule even on errors.
smjn Dec 9, 2024
75eb4d2
inc review comments.
smjn Dec 9, 2024
be0310f
fixed javadoc
smjn Dec 9, 2024
a5aa93f
replaced timeline object with atomic boolean
smjn Dec 10, 2024
78d8c49
updated comments
smjn Dec 10, 2024
9714ce6
Revert "updated comments"
smjn Dec 10, 2024
7e420cb
updated docs
smjn Dec 10, 2024
e3db1dc
incorporated comments.
smjn Dec 10, 2024
1d60249
incorporated comments.
smjn Dec 11, 2024
502b5e7
remove part index, instead of clearing memo.
smjn Dec 11, 2024
d55abdd
incorporated comments.
smjn Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val connectionSetupTimeoutMs = getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMaxMs = getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)

def internalTopicsRecordDeleteAllowList: Set[String] = {
getString(ServerConfigs.INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST)
.split(",")
.map(item => item.trim)
.toSet
}

def getNumReplicaAlterLogDirsThreads: Int = {
val numThreads: Integer = Option(getInt(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG)).getOrElse(logDirs.size)
numThreads
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1175,8 +1175,8 @@ class ReplicaManager(val config: KafkaConfig,
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
// reject delete records operation on internal topics
if (Topic.isInternal(topicPartition.topic)) {
// reject delete records operation on internal topics except for allow listed ones
if (Topic.isInternal(topicPartition.topic) && !config.internalTopicsRecordDeleteAllowList.contains(topicPartition.topic)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this is the wrong approach because it will allow users of the cluster to delete too. When I suggested it, I thought that we would add a boolean to the method, e.g. allowInternalTopics, and use it from the component doing the deletion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, will rectify

(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
Expand Down
16 changes: 15 additions & 1 deletion core/src/test/scala/unit/kafka/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.utils.TestUtils.assertBadConfigContainingMessage
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down Expand Up @@ -243,6 +243,20 @@ class KafkaConfigTest {
assertEquals(expected, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG).asInstanceOf[Long])
}

@Test
def testInternalTopicsDeleteRecordsAllowList(): Unit = {
val propertiesFile = prepareDefaultConfig()
var expected = Set[String](Topic.SHARE_GROUP_STATE_TOPIC_NAME)
var config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertEquals(expected, config.internalTopicsRecordDeleteAllowList)

// trims extraneous space
expected = Set[String](Topic.SHARE_GROUP_STATE_TOPIC_NAME, Topic.GROUP_METADATA_TOPIC_NAME)
val arg = Topic.SHARE_GROUP_STATE_TOPIC_NAME + " , " + Topic.GROUP_METADATA_TOPIC_NAME;
config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", s"delete.records.internal.topics.allow.list=$arg")))
assertEquals(expected, config.internalTopicsRecordDeleteAllowList)
}

def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
Expand Down Expand Up @@ -93,6 +95,10 @@ public class ServerConfigs {
public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG);
public static final String COMPRESSION_ZSTD_LEVEL_DOC = "The compression level to use if " + COMPRESSION_TYPE_CONFIG + " is set to 'zstd'.";

public static final String INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST = "delete.records.internal.topics.allow.list";
public static final String INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST_DEFAULT = Topic.SHARE_GROUP_STATE_TOPIC_NAME;
public static final String INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST_DOC = "Comma separated list of internal topics where record deletion is allowed.";

/***************** rack configuration *************/
public static final String BROKER_RACK_CONFIG = "broker.rack";
public static final String BROKER_RACK_DOC = "Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: <code>RACK1</code>, <code>us-east-1d</code>";
Expand Down Expand Up @@ -173,5 +179,16 @@ public class ServerConfigs {
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
// This indicates whether unreleased MetadataVersions should be enabled on this node.
.defineInternal(UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH);
.defineInternal(UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
// List of internal topic from which record deletion is allowed
.defineInternal(INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST, STRING, INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST_DEFAULT, (name, value) -> {
if (!(value instanceof String)) {
throw new ConfigException("Invalid value for internal topics delete value allow list: " + value);
}
for (String topic : value.toString().split(",")) {
if (!Topic.isInternal(topic.trim())) {
throw new ConfigException("Topic " + topic + " is not a valid internal topic: " + value);
}
}
}, LOW, INTERNAL_TOPICS_DELETE_RECORDS_ALLOW_LIST_DOC);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class ShareCoordinatorConfig {
public static final int APPEND_LINGER_MS_DEFAULT = 10;
public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.";

public static final String STATE_TOPIC_PRUNE_INTERVAL = "share.coordinator.state.topic.prune.interval.ms";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG I think. All three of these should have _MS_ as part of the name.

public static final int STATE_TOPIC_PRUNE_INTERVAL_DEFAULT = 5 * 60 * 1000; // 5 minutes
public static final String STATE_TOPIC_PRUNE_INTERVAL_DOC = "The duration in milliseconds that the share coordinator will wait between pruning eligible records in share-group state topic.";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_REPLICATION_FACTOR_DOC)
Expand All @@ -81,7 +86,8 @@ public class ShareCoordinatorConfig {
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC);
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL, INT, STATE_TOPIC_PRUNE_INTERVAL_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_DOC);

private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
Expand All @@ -93,6 +99,7 @@ public class ShareCoordinatorConfig {
private final int loadBufferSize;
private final CompressionType compressionType;
private final int appendLingerMs;
private final int pruneIntervalMs;


public ShareCoordinatorConfig(AbstractConfig config) {
Expand All @@ -108,6 +115,7 @@ public ShareCoordinatorConfig(AbstractConfig config) {
.map(CompressionType::forId)
.orElse(null);
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL);
validate();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.coordinator.share;

import org.apache.kafka.server.share.SharePartitionKey;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;

public class ShareCoordinatorOffsetsManager {
private enum State {
ACTIVE,
INACTIVE
}

static class Entry {
private final SharePartitionKey key;
private final long offset;
private State state;

private Entry(SharePartitionKey key, long offset) {
this.key = key;
this.offset = offset;
this.state = State.ACTIVE;
}

public SharePartitionKey key() {
return key;
}

public long offset() {
return offset;
}

public State state() {
return state;
}

public void setState(State state) {
this.state = state;
}

public static Entry instance(SharePartitionKey key, long offset) {
return new Entry(key, offset);
}
}

private final Map<SharePartitionKey, Entry> entries = new HashMap<>();
private final Queue<Entry> curState;
private long minOffset = Long.MAX_VALUE;
private final AtomicLong lastRedundantOffset = new AtomicLong(0);

public ShareCoordinatorOffsetsManager() {
curState = new PriorityQueue<>(new Comparator<Entry>() {
@Override
public int compare(Entry o1, Entry o2) {
return Long.compare(o1.offset, o2.offset);
}
});
}

public Optional<Long> updateState(SharePartitionKey key, long offset) {
minOffset = Math.min(minOffset, offset);
if (entries.containsKey(key)) {
entries.get(key).setState(State.INACTIVE);
}
Entry newEntry = Entry.instance(key, offset);
curState.add(newEntry);
entries.put(key, newEntry);

purge();

Optional<Long> deleteTillOpt = findLastRedundantOffset();
deleteTillOpt.ifPresent(off -> minOffset = off);
return deleteTillOpt;
}

private Optional<Long> findLastRedundantOffset() {
if (curState.isEmpty()) {
return Optional.empty();
}

Entry candidate = curState.peek();
if (candidate == null) {
return Optional.empty();
}

if (candidate.offset() <= 0 || candidate.offset() == minOffset) {
return Optional.empty();
}
lastRedundantOffset.set(candidate.offset());
return Optional.of(candidate.offset());
}

public Optional<Long> lastRedundantOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is ok, but is very customized for the usage in the current only caller. If there is another caller, it can unexpectedly break the existing caller. A better api is probably to always expose the lastRedundantOffset and let the caller handle the case when the same value is returned.

return Optional.of(lastRedundantOffset.get());
}

// test visibility
void purge() {
while (!curState.isEmpty() && curState.peek().state() == State.INACTIVE) {
curState.poll();
}
}

//test visibility
Queue<Entry> curState() {
return curState;
}

//test visibility
Map<SharePartitionKey, Entry> entries() {
return entries;
}
}
Loading
Loading