From 46cba92ff6a1d83520b9479293de9d3a5280cc31 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 23 Jan 2025 11:29:00 +0800 Subject: [PATCH] KAFKA-16717 initial commit fix typo KAFKA-16717 support alter share group offsets KAFKA-16717 support alter share group offsets --- .../clients/admin/AddRaftVoterResult.java | 2 +- .../org/apache/kafka/clients/admin/Admin.java | 27 +++++++++ .../admin/AlterShareGroupOffsetsOptions.java | 32 +++++++++++ .../admin/AlterShareGroupOffsetsResult.java | 55 +++++++++++++++++++ .../kafka/clients/admin/ForwardingAdmin.java | 5 ++ .../kafka/clients/admin/KafkaAdminClient.java | 6 ++ .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../AlterShareGroupOffsetsRequest.java | 31 +++++++++++ .../AlterShareGroupOffsetsRequest.json | 39 +++++++++++++ .../AlterShareGroupOffsetsResponse.json | 37 +++++++++++++ .../kafka/clients/admin/MockAdminClient.java | 10 ++++ .../main/scala/kafka/server/KafkaApis.scala | 6 ++ 12 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java create mode 100644 clients/src/main/resources/common/message/AlterShareGroupOffsetsRequest.json create mode 100644 clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java index d42204c5e4e79..8e71bea08595f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterResult.java @@ -39,4 +39,4 @@ public KafkaFuture all() { return result; } -} +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 6c743dec048a8..e852e63c88921 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1780,6 +1780,33 @@ RemoveRaftVoterResult removeRaftVoter( DescribeShareGroupsResult describeShareGroups(Collection groupIds, DescribeShareGroupsOptions options); + /** + * Alters offsets for the specified group. In order to succeed, the group must be empty. + * + *

This operation is not transactional, so it may succeed for some partitions while fail for others. + * + * @param groupId The group for which to alter offsets. + * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored. + * @param options The options to use when altering the offsets. + * @return The AlterShareGroupOffsetsResult. + */ + AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map offsets, AlterShareGroupOffsetsOptions options); + + + /** + * Alters offsets for the specified group. In order to succeed, the group must be empty. + * + *

This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options. + * See the overload for more details. + * + * @param groupId The group for which to alter offsets. + * @param offsets A map of offsets by partition. + * @return The AlterShareGroupOffsetsResult. + */ + default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map offsets) { + return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions()); + } + /** * Describe some share groups in the cluster, with the default options. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsOptions.java new file mode 100644 index 0000000000000..0aac20ebd9554 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * Options for the {@link Admin#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class AlterShareGroupOffsetsOptions extends AbstractOptions { + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java new file mode 100644 index 0000000000000..ebfbbac3e54fd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Map; + +/** + * The result of the {@link Admin#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class AlterShareGroupOffsetsResult { + + private final KafkaFuture> future; + + AlterShareGroupOffsetsResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Return a future which can be used to check the result for a given partition. + */ + public KafkaFuture partitionResult(final TopicPartition partition) { + return null; + } + + /** + * Return a future which succeeds if all the alter offsets succeed. + */ + public KafkaFuture all() { + return null; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 45ed560dffc5f..b52688e7d3bfa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -298,6 +298,11 @@ public DescribeShareGroupsResult describeShareGroups(Collection groupIds return delegate.describeShareGroups(groupIds, options); } + @Override + public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map offsets, AlterShareGroupOffsetsOptions options) { + return delegate.alterShareGroupOffsets(groupId, offsets, options); + } + @Override public ListShareGroupOffsetsResult listShareGroupOffsets(Map groupSpecs, ListShareGroupOffsetsOptions options) { return delegate.listShareGroupOffsets(groupSpecs, options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 3f88cd5aa8070..c6d8269851ba9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3796,6 +3796,12 @@ public DescribeShareGroupsResult describeShareGroups(final Collection gr .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } + @Override + public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map offsets, AlterShareGroupOffsetsOptions options) { + // TODO support alter share group offsets + throw new InvalidRequestException("The method is not yet implemented"); + } + // To do in a follow-up PR @Override public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e33dd2589ae99..0b355112f4835 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -133,7 +133,8 @@ public enum ApiKeys { READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT), STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE), - DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS); + DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS), + ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS); private static final Map> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java new file mode 100644 index 0000000000000..30e8d5a799aad --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java @@ -0,0 +1,31 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; + +import java.nio.ByteBuffer; + +public class AlterShareGroupOffsetsRequest extends AbstractRequest { + + private final AlterShareGroupOffsetsRequestData data; + + public AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short apiVersion) { + super(ApiKeys.ALTER_PARTITION, apiVersion); + this.data = data; + } + + @Override + public AlterShareGroupOffsetsRequestData data() { + return data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return null; + } + + public static AlterShareGroupOffsetsRequest parse(ByteBuffer buffer, short version) { + return new AlterShareGroupOffsetsRequest(new AlterShareGroupOffsetsRequestData(new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/resources/common/message/AlterShareGroupOffsetsRequest.json b/clients/src/main/resources/common/message/AlterShareGroupOffsetsRequest.json new file mode 100644 index 0000000000000..0fc7b8f308afa --- /dev/null +++ b/clients/src/main/resources/common/message/AlterShareGroupOffsetsRequest.json @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 91, + "type": "request", + "listeners": ["broker"], + "name": "AlterShareGroupOffsetsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+", + "about": "The topics to alter offsets for.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, + "about": "The topic name." }, + { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+", + "about": "Each partition to alter offsets for.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "StartOffset", "type": "int64", "versions": "0+", + "about": "The share-partition start offset." } + ]} + ]} + ] +} diff --git a/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json b/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json new file mode 100644 index 0000000000000..728d9d7e30354 --- /dev/null +++ b/clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json @@ -0,0 +1,37 @@ + +{ + "apiKey": 91, + "type": "response", + "name": "AlterShareGroupOffsetsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + // - GROUP_NOT_EMPTY (version 0+) + // - KAFKA_STORAGE_ERROR (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNKNOWN_SERVER_ERROR (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+", + "about": "The results for each topic.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The unique topic ID." }, + { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The error message, or null if there was no error." } + ]} + ]} + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index ed2d7c61f08f9..e88c7df6f3382 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1389,6 +1389,16 @@ public synchronized DescribeShareGroupsResult describeShareGroups(Collection offsets, AlterShareGroupOffsetsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map offsets) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public synchronized ListShareGroupOffsetsResult listShareGroupOffsets(Map groupSpecs, ListShareGroupOffsetsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6e25db6b05886..746ff0a5fa47a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -238,6 +238,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request) + case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3190,6 +3191,11 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { + val handleAlterGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] + // TODO: Implement the AlterShareGroupOffsetsRequest handling + } + // Visible for Testing def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: ShareAcknowledgeRequest, topicIdNames: util.Map[Uuid, String],