diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml
index 19d9d5616135d..99ffeb8e70ef2 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -65,6 +65,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3f88cd5aa8070..e68451ae4e778 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -57,6 +57,7 @@
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
+import org.apache.kafka.clients.admin.internals.ListShareGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
@@ -3796,12 +3797,13 @@ public DescribeShareGroupsResult describeShareGroups(final Collection gr
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
- // To do in a follow-up PR
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs,
final ListShareGroupOffsetsOptions options) {
- // To-do
- throw new InvalidRequestException("The method is not yet implemented");
+ SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext);
+ invokeDriver(handler, future, options.timeoutMs);
+ return new ListShareGroupOffsetsResult(future.all());
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
new file mode 100644
index 0000000000000..2b9882875c237
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
+ */
+public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> {
+
+ private final Map groupSpecs;
+ private final Logger log;
+ private final AdminApiLookupStrategy lookupStrategy;
+
+ public ListShareGroupOffsetsHandler(
+ Map groupSpecs,
+ LogContext logContext) {
+ this.groupSpecs = groupSpecs;
+ this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+ this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
+ }
+
+ public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) {
+ return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
+ }
+
+ @Override
+ public String apiName() {
+ return "describeShareGroupOffsets";
+ }
+
+ @Override
+ public AdminApiLookupStrategy lookupStrategy() {
+ return lookupStrategy;
+ }
+
+ @Override
+ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordinatorId, Set keys) {
+ List groupIds = keys.stream().map(key -> {
+ if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
+ throw new IllegalArgumentException("Invalid group coordinator key " + key +
+ " when building `DescribeShareGroupOffsets` request");
+ }
+ return key.idValue;
+ }).collect(Collectors.toList());
+ // The DescribeShareGroupOffsetsRequest only includes a single group ID at this point, which is likely a mistake to be fixing a follow-on PR.
+ String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
+ if (groupId == null) {
+ throw new IllegalArgumentException("Missing group id in request");
+ }
+ ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+ List topics =
+ spec.topicPartitions().stream().map(
+ topicPartition -> new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(topicPartition.topic())
+ .setPartitions(List.of(topicPartition.partition()))
+ ).collect(Collectors.toList());
+ DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(topics);
+ return new DescribeShareGroupOffsetsRequest.Builder(data, true);
+ }
+
+ @Override
+ public ApiResult> handleResponse(Node coordinator,
+ Set groupIds,
+ AbstractResponse abstractResponse) {
+ final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
+ final Map> completed = new HashMap<>();
+ final Map failed = new HashMap<>();
+
+ for (CoordinatorKey groupId : groupIds) {
+ Map data = new HashMap<>();
+ response.data().responses().stream().map(
+ describedTopic ->
+ describedTopic.partitions().stream().map(
+ partition -> {
+ if (partition.errorCode() == Errors.NONE.code())
+ data.put(new TopicPartition(describedTopic.topicName(), partition.partitionIndex()), partition.startOffset());
+ else
+ log.error("Skipping return offset for topic {} partition {} due to error {}.", describedTopic.topicName(), partition.partitionIndex(), Errors.forCode(partition.errorCode()));
+ return data;
+ }
+ ).collect(Collectors.toList())
+ ).collect(Collectors.toList());
+ completed.put(groupId, data);
+ }
+ return new ApiResult<>(completed, failed, Collections.emptyList());
+ }
+
+ private static Set coordinatorKeys(Collection groupIds) {
+ return groupIds.stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
index 072b16e944362..147f3dab9e309 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
@@ -86,4 +86,24 @@ public static DescribeShareGroupOffsetsRequest parse(ByteBuffer buffer, short ve
version
);
}
+
+ public static List getErrorDescribeShareGroupOffsets(
+ List topics,
+ Errors error
+ ) {
+ return topics.stream()
+ .map(
+ requestTopic -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(requestTopic.topicName())
+ .setPartitions(
+ requestTopic.partitions().stream().map(
+ partition -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message())
+ .setStartOffset(0)
+ ).collect(Collectors.toList())
+ )
+ ).collect(Collectors.toList());
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 1a549a6fdec7c..061982e34d360 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -176,6 +176,15 @@ public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(
return new ListClientMetricsResourcesResult(future);
}
+ public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) {
+ Map>> coordinatorFutures = groupOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ entry -> CoordinatorKey.byGroupId(entry.getKey()),
+ Map.Entry::getValue
+ ));
+ return new ListShareGroupOffsetsResult(coordinatorFutures);
+ }
+
/**
* Helper to create a KafkaAdminClient with a custom HostResolver accessible to tests outside this package.
*/
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 25cd12752c107..c3737e5780143 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -113,6 +113,8 @@
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
@@ -194,6 +196,8 @@
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
@@ -306,6 +310,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -8970,4 +8975,123 @@ public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) thro
assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), requestData.get().voterDirectoryId());
}
}
+
+ @Test
+ public void testListShareGroupOffsetsOptionsWithBatchedApi() throws Exception {
+ final Cluster cluster = mockCluster(3, 0);
+ final Time time = new MockTime();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+ AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ final List partitions = Collections.singletonList(new TopicPartition("A", 0));
+ final ListShareGroupOffsetsOptions options = new ListShareGroupOffsetsOptions();
+
+ final ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec()
+ .topicPartitions(partitions);
+ Map groupSpecs = new HashMap<>();
+ groupSpecs.put(GROUP_ID, groupSpec);
+
+ env.adminClient().listShareGroupOffsets(groupSpecs, options);
+
+ final MockClient mockClient = env.kafkaClient();
+ waitForRequest(mockClient, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ assertNotNull(clientRequest);
+ DescribeShareGroupOffsetsRequestData data = ((DescribeShareGroupOffsetsRequest.Builder) clientRequest.requestBuilder()).build().data();
+ assertEquals(GROUP_ID, data.groupId());
+ assertEquals(Collections.singletonList("A"),
+ data.topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
+ }
+ }
+
+ @Test
+ public void testListShareGroupOffsets() throws Exception {
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+ TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+ TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+ TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
+ TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4);
+ TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6);
+
+
+ ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions(
+ List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
+ );
+ Map groupSpecs = new HashMap<>();
+ groupSpecs.put(GROUP_ID, groupSpec);
+
+ DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setResponses(
+ List.of(
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+ )
+ );
+ env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
+
+ final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
+ final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();
+
+ assertEquals(6, partitionToOffsetAndMetadata.size());
+ assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0));
+ assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1));
+ assertEquals(40, partitionToOffsetAndMetadata.get(myTopicPartition2));
+ assertEquals(50, partitionToOffsetAndMetadata.get(myTopicPartition3));
+ assertEquals(100, partitionToOffsetAndMetadata.get(myTopicPartition4));
+ assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition5));
+ }
+ }
+
+ @Test
+ public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception {
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+ TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+ TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+ TopicPartition myTopicPartition2 = new TopicPartition("my_topic_1", 4);
+ TopicPartition myTopicPartition3 = new TopicPartition("my_topic_2", 6);
+
+
+ ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions(
+ List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3)
+ );
+ Map groupSpecs = new HashMap<>();
+ groupSpecs.put(GROUP_ID, groupSpec);
+
+ DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setResponses(
+ List.of(
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not a Coordinator"))),
+ new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+ )
+ );
+ env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
+
+ final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
+ final Map partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();
+
+ // For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result
+ assertEquals(3, partitionToOffsetAndMetadata.size());
+ assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0));
+ assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1));
+ assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition3));
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index bc775b5f38060..98a2651871497 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,7 @@ package kafka.coordinator.group
import kafka.server.{KafkaConfig, ReplicaManager}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
@@ -86,6 +86,15 @@ private[group] class GroupCoordinatorAdapter(
))
}
+ override def describeShareGroupOffsets(
+ context: RequestContext,
+ request: DescribeShareGroupOffsetsRequestData
+ ): CompletableFuture[DescribeShareGroupOffsetsResponseData] = {
+ FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ s"The old group coordinator does not support ${ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.name} API."
+ ))
+ }
+
override def joinGroup(
context: RequestContext,
request: JoinGroupRequestData,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 73a838e6a6eec..50e7589ae1a10 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -638,6 +638,7 @@ class BrokerServer(
.withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
+ .withPersister(persister)
.build()
} else {
GroupCoordinatorAdapter(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18795a7e0f38d..2d2c2f9ce9777 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3190,9 +3190,28 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = {
val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest]
- // TODO: Implement the DescribeShareGroupOffsetsRequest handling
- requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ if (!isShareGroupProtocolEnabled) {
+ requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else if (!authHelper.authorize(request.context, READ, GROUP, describeShareGroupOffsetsRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ groupCoordinator.describeShareGroupOffsets(
+ request.context,
+ describeShareGroupOffsetsRequest.data,
+ ).handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(
+ request,
+ new DescribeShareGroupOffsetsResponse(response)
+ )
+ }
+ }
+ }
}
// Visible for Testing
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index d950d5f417072..22e9857aa5207 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException}
-import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -95,6 +95,22 @@ class GroupCoordinatorAdapterTest {
assertFutureThrows(future, classOf[UnsupportedVersionException])
}
+ @Test
+ def testDescribeShareGroupOffsets(): Unit = {
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+ val context = makeContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.latestVersion)
+ val request = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("group")
+
+ val future = adapter.describeShareGroupOffsets(context, request)
+
+ assertTrue(future.isDone)
+ assertTrue(future.isCompletedExceptionally)
+ assertFutureThrows(future, classOf[UnsupportedVersionException])
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testJoinGroup(version: Short): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 11ebee134b896..32901d47ef6bd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -43,6 +43,8 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponsePartition, DescribeShareGroupOffsetsResponseTopic}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@@ -54,7 +56,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.message.ShareFetchRequestData.{AcknowledgementBatch, ForgottenTopic}
import org.apache.kafka.common.message.ShareFetchResponseData.{AcquiredRecords, PartitionData, ShareFetchableTopicResponse}
-import org.apache.kafka.common.metadata.{TopicRecord, PartitionRecord, RegisterBrokerRecord}
+import org.apache.kafka.common.metadata.{PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.message._
@@ -10441,6 +10443,125 @@ class KafkaApisTest extends Logging {
})
}
+ @Test
+ def testDescribeShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
+ val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+ util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
+ )
+
+ val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build())
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ response.data.responses.forEach(topic => topic.partitions().forEach(partition => assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode())))
+ }
+
+ @Test
+ def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
+ val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+ util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
+ )
+
+ val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(util.List.of(AuthorizationResult.DENIED))
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+ authorizer = Some(authorizer),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ response.data.responses.forEach(
+ topic => topic.partitions().forEach(
+ partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(), partition.errorCode())
+ )
+ )
+ }
+
+ @Test
+ def testDescribeShareGroupOffsetsRequestSuccess(): Unit = {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid()
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid()
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+
+ val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+ util.List.of(
+ new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1, 2, 3)),
+ new DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName2).setPartitions(util.List.of(10, 20)),
+ )
+ )
+
+ val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build())
+
+ val future = new CompletableFuture[DescribeShareGroupOffsetsResponseData]()
+ when(groupCoordinator.describeShareGroupOffsets(
+ requestChannelRequest.context,
+ describeShareGroupOffsetsRequest
+ )).thenReturn(future)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val describeShareGroupOffsetsResponse = new DescribeShareGroupOffsetsResponseData()
+ .setResponses(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(3)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ )),
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(topicId2)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(10)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(20)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ future.complete(describeShareGroupOffsetsResponse)
+ val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
@Test
def testWriteShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index b57963aed9247..6f28a63c8b548 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -22,6 +22,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -256,6 +258,18 @@ CompletableFuture fetchAllOffs
boolean requireStable
);
+ /**
+ * Fetch the Share Group Offsets for a given group.
+ *
+ * @param context The request context
+ * @param request The DescribeShareGroupOffsets request.
+ * @return A future yielding the results.
+ */
+ CompletableFuture describeShareGroupOffsets(
+ RequestContext context,
+ DescribeShareGroupOffsetsRequestData request
+ );
+
/**
* Commit offsets for a given Group.
*
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 8b8c4bb0f9985..6ee0c938d46c5 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -26,6 +26,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -40,6 +42,7 @@
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
@@ -53,6 +56,7 @@
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
@@ -75,6 +79,8 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.share.persister.Persister;
+import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
@@ -114,6 +120,7 @@ public static class Builder {
private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
private GroupCoordinatorMetrics groupCoordinatorMetrics;
private GroupConfigManager groupConfigManager;
+ private Persister persister;
public Builder(
int nodeId,
@@ -158,23 +165,21 @@ public Builder withGroupConfigManager(GroupConfigManager groupConfigManager) {
return this;
}
+ public Builder withPersister(Persister persister) {
+ this.persister = persister;
+ return this;
+ }
+
public GroupCoordinatorService build() {
- if (config == null)
- throw new IllegalArgumentException("Config must be set.");
- if (writer == null)
- throw new IllegalArgumentException("Writer must be set.");
- if (loader == null)
- throw new IllegalArgumentException("Loader must be set.");
- if (time == null)
- throw new IllegalArgumentException("Time must be set.");
- if (timer == null)
- throw new IllegalArgumentException("Timer must be set.");
- if (coordinatorRuntimeMetrics == null)
- throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
- if (groupCoordinatorMetrics == null)
- throw new IllegalArgumentException("GroupCoordinatorMetrics must be set.");
- if (groupConfigManager == null)
- throw new IllegalArgumentException("GroupConfigManager must be set.");
+ requireNonNull(config, new IllegalArgumentException("Config must be set."));
+ requireNonNull(writer, new IllegalArgumentException("Writer must be set."));
+ requireNonNull(loader, new IllegalArgumentException("Loader must be set."));
+ requireNonNull(time, new IllegalArgumentException("Time must be set."));
+ requireNonNull(timer, new IllegalArgumentException("Timer must be set."));
+ requireNonNull(coordinatorRuntimeMetrics, new IllegalArgumentException("CoordinatorRuntimeMetrics must be set."));
+ requireNonNull(groupCoordinatorMetrics, new IllegalArgumentException("GroupCoordinatorMetrics must be set."));
+ requireNonNull(groupConfigManager, new IllegalArgumentException("GroupConfigManager must be set."));
+ requireNonNull(persister, new IllegalArgumentException("Persister must be set."));
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
@@ -214,7 +219,8 @@ public GroupCoordinatorService build() {
config,
runtime,
groupCoordinatorMetrics,
- groupConfigManager
+ groupConfigManager,
+ persister
);
}
}
@@ -244,6 +250,11 @@ public GroupCoordinatorService build() {
*/
private final GroupConfigManager groupConfigManager;
+ /**
+ * The Persister to persist the state of share partition state.
+ */
+ private final Persister persister;
+
/**
* Boolean indicating whether the coordinator is active or not.
*/
@@ -255,6 +266,12 @@ public GroupCoordinatorService build() {
*/
private volatile int numPartitions = -1;
+ /**
+ * The metadata image to extract topic id to names map.
+ * This is initialised when the {@link GroupCoordinator#onNewMetadataImage(MetadataImage, MetadataDelta)} is called
+ */
+ private MetadataImage metadataImage = null;
+
/**
*
* @param logContext The log context.
@@ -262,19 +279,22 @@ public GroupCoordinatorService build() {
* @param runtime The runtime.
* @param groupCoordinatorMetrics The group coordinator metrics.
* @param groupConfigManager The group config manager.
+ * @param persister The persister
*/
GroupCoordinatorService(
LogContext logContext,
GroupCoordinatorConfig config,
CoordinatorRuntime runtime,
GroupCoordinatorMetrics groupCoordinatorMetrics,
- GroupConfigManager groupConfigManager
+ GroupConfigManager groupConfigManager,
+ Persister persister
) {
this.log = logContext.logger(GroupCoordinatorService.class);
this.config = config;
this.runtime = runtime;
this.groupCoordinatorMetrics = groupCoordinatorMetrics;
this.groupConfigManager = groupConfigManager;
+ this.persister = persister;
}
/**
@@ -935,6 +955,78 @@ public CompletableFuture fetch
}
}
+ /**
+ * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext, DescribeShareGroupOffsetsRequestData)}.
+ */
+ @Override
+ public CompletableFuture describeShareGroupOffsets(
+ RequestContext context,
+ DescribeShareGroupOffsetsRequestData requestData
+ ) {
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ new DescribeShareGroupOffsetsResponseData()
+ .setResponses(DescribeShareGroupOffsetsRequest.getErrorDescribeShareGroupOffsets(
+ requestData.topics(),
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ))
+ );
+ }
+
+ if (metadataImage == null) {
+ return CompletableFuture.completedFuture(
+ new DescribeShareGroupOffsetsResponseData()
+ .setResponses(DescribeShareGroupOffsetsRequest.getErrorDescribeShareGroupOffsets(
+ requestData.topics(),
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
+ ))
+ );
+ }
+
+ List readStateSummaryData =
+ requestData.topics().stream().map(
+ topic -> new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(metadataImage.topics().topicNameToIdView().get(topic.topicName()))
+ .setPartitions(
+ topic.partitions().stream().map(
+ partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+ ).toList()
+ )
+ ).toList();
+ ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(readStateSummaryData);
+ CompletableFuture future = new CompletableFuture<>();
+ persister.readSummary(ReadShareGroupStateSummaryParameters.from(readSummaryRequestData))
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("Failed to read summary of the share partition");
+ future.completeExceptionally(error);
+ return;
+ }
+ if (result == null || result.topicsData() == null) {
+ log.error("Result is null for the read state summary");
+ future.completeExceptionally(new IllegalStateException("Result is null for the read state summary"));
+ return;
+ }
+ List describeShareGroupOffsetsResponseTopicList =
+ result.topicsData().stream().map(
+ topicData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicId(topicData.topicId())
+ .setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+ .setPartitions(topicData.partitions().stream().map(
+ partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionData.partition())
+ .setStartOffset(partitionData.startOffset())
+ .setErrorMessage(partitionData.errorMessage())
+ .setErrorCode(partitionData.errorCode())
+ ).toList())
+ ).toList();
+ future.complete(new DescribeShareGroupOffsetsResponseData().setResponses(describeShareGroupOffsetsResponseTopicList));
+ });
+ return future;
+ }
+
/**
* See {@link GroupCoordinator#commitOffsets(RequestContext, OffsetCommitRequestData, BufferSupplier)}.
*/
@@ -1161,6 +1253,7 @@ public void onNewMetadataImage(
MetadataDelta delta
) {
throwIfNotActive();
+ metadataImage = newImage;
runtime.onNewMetadataImage(newImage, delta);
}
@@ -1277,4 +1370,10 @@ private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchExcept
);
}
}
+
+ private static void requireNonNull(Object obj, RuntimeException throwable) {
+ if (obj == null) {
+ throw throwable;
+ }
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1974a796d4a64..04181258f92b7 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -18,6 +18,7 @@
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@@ -38,6 +39,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -50,6 +53,8 @@
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
@@ -72,7 +77,15 @@
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.share.persister.DefaultStatePersister;
+import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
+import org.apache.kafka.server.share.persister.PartitionFactory;
+import org.apache.kafka.server.share.persister.Persister;
+import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
+import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test;
@@ -89,6 +102,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@@ -115,9 +129,21 @@
public class GroupCoordinatorServiceTest {
- @FunctionalInterface
- interface TriFunction {
- R apply(A a, B b, C c);
+ private static final String TOPIC_NAME = "test-topic";
+ private static final Uuid TOPIC_ID = Uuid.randomUuid();
+
+ private static Stream testGroupHeartbeatWithExceptionSource() {
+ return Stream.of(
+ Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
+ Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null),
+ Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null),
+ Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null),
+ Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid")
+ );
}
@SuppressWarnings("unchecked")
@@ -132,15 +158,11 @@ private GroupCoordinatorConfig createConfig() {
@Test
public void testStartupShutdown() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setRuntime(runtime)
+ .setConfig(createConfig())
+ .build(true);
- service.startup(() -> 1);
service.shutdown();
verify(runtime, times(1)).close();
@@ -149,13 +171,10 @@ public void testStartupShutdown() throws Exception {
@Test
public void testConsumerGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo");
@@ -175,19 +194,14 @@ public void testConsumerGroupHeartbeatWhenNotStarted() throws ExecutionException
@Test
public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setRuntime(runtime)
+ .setConfig(createConfig())
+ .build(true);
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("consumer-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -205,20 +219,6 @@ public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedE
assertEquals(new ConsumerGroupHeartbeatResponseData(), future.get(5, TimeUnit.SECONDS));
}
- private static Stream testGroupHeartbeatWithExceptionSource() {
- return Stream.of(
- Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
- Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
- Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
- Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null),
- Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null),
- Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
- Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null),
- Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null),
- Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid")
- );
- }
-
@ParameterizedTest
@MethodSource("testGroupHeartbeatWithExceptionSource")
public void testConsumerGroupHeartbeatWithException(
@@ -227,19 +227,14 @@ public void testConsumerGroupHeartbeatWithException(
String expectedErrorMessage
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("consumer-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -263,13 +258,10 @@ public void testConsumerGroupHeartbeatWithException(
@Test
public void testPartitionFor() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
assertThrows(CoordinatorNotAvailableException.class,
() -> service.partitionFor("foo"));
@@ -282,13 +274,10 @@ public void testPartitionFor() {
@Test
public void testGroupMetadataTopicConfigs() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
Properties expectedProperties = new Properties();
expectedProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
@@ -301,13 +290,10 @@ public void testGroupMetadataTopicConfigs() {
@Test
public void testOnElection() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
assertThrows(CoordinatorNotAvailableException.class,
() -> service.onElection(5, 10));
@@ -324,13 +310,10 @@ public void testOnElection() {
@Test
public void testOnResignation() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
assertThrows(CoordinatorNotAvailableException.class,
() -> service.onResignation(5, OptionalInt.of(10)));
@@ -347,15 +330,11 @@ public void testOnResignation() {
@Test
public void testOnResignationWithEmptyLeaderEpoch() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
- service.startup(() -> 1);
service.onResignation(5, OptionalInt.empty());
verify(runtime, times(1)).scheduleUnloadOperation(
@@ -367,20 +346,15 @@ public void testOnResignationWithEmptyLeaderEpoch() {
@Test
public void testJoinGroup() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
JoinGroupRequestData request = new JoinGroupRequestData()
.setGroupId("foo")
.setSessionTimeoutMs(1000);
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-join"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -402,20 +376,15 @@ public void testJoinGroup() {
@Test
public void testJoinGroupWithException() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
JoinGroupRequestData request = new JoinGroupRequestData()
.setGroupId("foo")
.setSessionTimeoutMs(1000);
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-join"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -439,15 +408,10 @@ public void testJoinGroupWithException() throws Exception {
@Test
public void testJoinGroupInvalidGroupId() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
-
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
JoinGroupRequestData request = new JoinGroupRequestData()
.setGroupId(null)
@@ -486,13 +450,10 @@ public void testJoinGroupInvalidGroupId() throws Exception {
@Test
public void testJoinGroupWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
JoinGroupRequestData request = new JoinGroupRequestData()
.setGroupId("foo");
@@ -514,15 +475,10 @@ public void testJoinGroupWhenNotStarted() throws ExecutionException, Interrupted
@ValueSource(ints = {120 - 1, 10 * 5 * 1000 + 1})
public void testJoinGroupInvalidSessionTimeout(int sessionTimeoutMs) throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorConfig config = createConfig();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- config,
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId("group-id")
@@ -546,19 +502,14 @@ public void testJoinGroupInvalidSessionTimeout(int sessionTimeoutMs) throws Exce
@Test
public void testSyncGroup() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-sync"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -580,19 +531,14 @@ public void testSyncGroup() {
@Test
public void testSyncGroupWithException() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-sync"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -617,15 +563,10 @@ public void testSyncGroupWithException() throws Exception {
@Test
public void testSyncGroupInvalidGroupId() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
-
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId(null)
@@ -647,13 +588,10 @@ public void testSyncGroupInvalidGroupId() throws Exception {
@Test
public void testSyncGroupWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");
@@ -674,19 +612,14 @@ public void testSyncGroupWhenNotStarted() throws ExecutionException, Interrupted
@Test
public void testHeartbeat() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -708,19 +641,14 @@ public void testHeartbeat() throws Exception {
@Test
public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -742,19 +670,14 @@ public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
@Test
public void testHeartbeatCoordinatorException() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -779,13 +702,10 @@ public void testHeartbeatCoordinatorException() throws Exception {
@Test
public void testHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");
@@ -805,13 +725,10 @@ public void testHeartbeatWhenNotStarted() throws ExecutionException, Interrupted
@Test
public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
service.startup(() -> 3);
List expectedResults = Arrays.asList(
@@ -853,13 +770,10 @@ public void testListGroups() throws ExecutionException, InterruptedException, Ti
public void testListGroupsFailedWithNotCoordinatorException()
throws InterruptedException, ExecutionException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
service.startup(() -> 3);
List expectedResults = Arrays.asList(
@@ -896,13 +810,10 @@ public void testListGroupsFailedWithNotCoordinatorException()
public void testListGroupsWithFailure()
throws InterruptedException, ExecutionException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
service.startup(() -> 3);
when(runtime.scheduleReadAllOperation(
@@ -929,13 +840,10 @@ public void testListGroupsWithFailure()
@Test
public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 0;
service.startup(() -> partitionCount);
@@ -955,13 +863,10 @@ public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException,
@Test
public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
ListGroupsRequestData request = new ListGroupsRequestData();
@@ -980,13 +885,11 @@ public void testListGroupsWhenNotStarted() throws ExecutionException, Interrupte
@Test
public void testDescribeGroups() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
int partitionCount = 2;
service.startup(() -> partitionCount);
@@ -1023,13 +926,11 @@ public void testDescribeGroups() throws Exception {
@Test
public void testDescribeGroupsInvalidGroupId() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
int partitionCount = 1;
service.startup(() -> partitionCount);
@@ -1057,13 +958,11 @@ public void testDescribeGroupsInvalidGroupId() throws Exception {
@Test
public void testDescribeGroupCoordinatorLoadInProgress() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
int partitionCount = 1;
service.startup(() -> partitionCount);
@@ -1090,13 +989,10 @@ public void testDescribeGroupCoordinatorLoadInProgress() throws Exception {
@Test
public void testDescribeGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
CompletableFuture> future = service.describeGroups(
requestContext(ApiKeys.DESCRIBE_GROUPS),
@@ -1124,15 +1020,10 @@ public void testFetchOffsets(
boolean requireStable
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
-
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
@@ -1191,13 +1082,10 @@ public void testFetchOffsetsWhenNotStarted(
boolean requireStable
) throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
@@ -1244,15 +1132,10 @@ public void testFetchOffsetsWithWrappedError(
Errors expectedError
) throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
-
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
@@ -1290,19 +1173,14 @@ public void testFetchOffsetsWithWrappedError(
@Test
public void testLeaveGroup() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
LeaveGroupRequestData request = new LeaveGroupRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-leave"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -1324,13 +1202,10 @@ public void testLeaveGroup() throws Exception {
@Test
public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
LeaveGroupRequestData request = new LeaveGroupRequestData()
.setGroupId("foo")
@@ -1343,8 +1218,6 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
.setGroupInstanceId("instance-2")
));
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-leave"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -1379,13 +1252,10 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
@Test
public void testLeaveGroupWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
LeaveGroupRequestData request = new LeaveGroupRequestData()
.setGroupId("foo");
@@ -1405,13 +1275,10 @@ public void testLeaveGroupWhenNotStarted() throws ExecutionException, Interrupte
@Test
public void testConsumerGroupDescribe() throws InterruptedException, ExecutionException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 2;
service.startup(() -> partitionCount);
@@ -1448,13 +1315,10 @@ public void testConsumerGroupDescribe() throws InterruptedException, ExecutionEx
@Test
public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 1;
service.startup(() -> partitionCount);
@@ -1483,13 +1347,10 @@ public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException,
@Test
public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 1;
service.startup(() -> partitionCount);
@@ -1516,13 +1377,10 @@ public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws Executio
@Test
public void testConsumerGroupDescribeCoordinatorNotActive() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -1546,19 +1404,16 @@ public void testConsumerGroupDescribeCoordinatorNotActive() throws ExecutionExce
@Test
public void testDeleteOffsets() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build(true);
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
))
@@ -1598,19 +1453,16 @@ public void testDeleteOffsets() throws Exception {
@Test
public void testDeleteOffsetsInvalidGroupId() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build(true);
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
))
@@ -1645,19 +1497,16 @@ public void testDeleteOffsetsWithException(
short expectedErrorCode
) throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build(true);
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
))
@@ -1689,13 +1538,10 @@ public void testDeleteOffsetsWithException(
@Test
public void testDeleteOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
OffsetDeleteRequestData request = new OffsetDeleteRequestData()
.setGroupId("foo");
@@ -1716,13 +1562,11 @@ public void testDeleteOffsetsWhenNotStarted() throws ExecutionException, Interru
@Test
public void testDeleteGroups() throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
service.startup(() -> 3);
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
@@ -1790,14 +1634,11 @@ public void testDeleteGroupsWithException(
short expectedErrorCode
) throws Exception {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build(true);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-groups"),
@@ -1826,13 +1667,11 @@ public void testDeleteGroupsWithException(
@Test
public void testDeleteGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- mock(GroupCoordinatorMetrics.class),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
CompletableFuture future = service.deleteGroups(
requestContext(ApiKeys.DELETE_GROUPS),
@@ -1854,13 +1693,10 @@ public void testDeleteGroupsWhenNotStarted() throws ExecutionException, Interrup
@Test
public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
.setGroupId("foo")
@@ -1868,7 +1704,7 @@ public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionExcep
.setMemberId("member-id")
.setGenerationId(10)
.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100)))));
@@ -1882,7 +1718,7 @@ public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionExcep
assertEquals(
new TxnOffsetCommitResponseData()
.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))))),
@@ -1895,14 +1731,10 @@ public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionExcep
@ValueSource(strings = {""})
public void testCommitTransactionalOffsetsWithInvalidGroupId(String groupId) throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
.setGroupId(groupId)
@@ -1910,7 +1742,7 @@ public void testCommitTransactionalOffsetsWithInvalidGroupId(String groupId) thr
.setMemberId("member-id")
.setGenerationId(10)
.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100)))));
@@ -1924,7 +1756,7 @@ public void testCommitTransactionalOffsetsWithInvalidGroupId(String groupId) thr
assertEquals(
new TxnOffsetCommitResponseData()
.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.INVALID_GROUP_ID.code()))))),
@@ -1936,14 +1768,10 @@ public void testCommitTransactionalOffsetsWithInvalidGroupId(String groupId) thr
@ValueSource(shorts = {4, 5})
public void testCommitTransactionalOffsets(Short txnOffsetCommitVersion) throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
.setGroupId("foo")
@@ -1953,14 +1781,14 @@ public void testCommitTransactionalOffsets(Short txnOffsetCommitVersion) throws
.setMemberId("member-id")
.setGenerationId(10)
.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100)))));
TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData()
.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))));
@@ -1995,14 +1823,10 @@ public void testCommitTransactionalOffsetsWithWrappedError(
Errors expectedError
) throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
.setGroupId("foo")
@@ -2012,14 +1836,14 @@ public void testCommitTransactionalOffsetsWithWrappedError(
.setMemberId("member-id")
.setGenerationId(10)
.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100)))));
TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData()
.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
- .setName("topic")
+ .setName(TOPIC_NAME)
.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(0)
.setErrorCode(expectedError.code())))));
@@ -2047,14 +1871,10 @@ public void testCommitTransactionalOffsetsWithWrappedError(
@Test
public void testCompleteTransaction() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
when(runtime.scheduleTransactionCompletion(
ArgumentMatchers.eq("write-txn-marker"),
@@ -2081,13 +1901,10 @@ public void testCompleteTransaction() throws ExecutionException, InterruptedExce
@Test
public void testCompleteTransactionWhenNotCoordinatorServiceStarted() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
CompletableFuture future = service.completeTransaction(
new TopicPartition("foo", 0),
@@ -2104,14 +1921,10 @@ public void testCompleteTransactionWhenNotCoordinatorServiceStarted() {
@Test
public void testCompleteTransactionWithUnexpectedPartition() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
- service.startup(() -> 1);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
CompletableFuture future = service.completeTransaction(
new TopicPartition("foo", 0),
@@ -2128,13 +1941,10 @@ public void testCompleteTransactionWithUnexpectedPartition() {
@Test
public void testOnPartitionsDeleted() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
service.startup(() -> 3);
when(runtime.scheduleWriteAllOperation(
@@ -2159,13 +1969,10 @@ public void testOnPartitionsDeleted() {
@Test
public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
assertThrows(CoordinatorNotAvailableException.class, () -> service.onPartitionsDeleted(
Collections.singletonList(new TopicPartition("foo", 0)),
@@ -2176,13 +1983,10 @@ public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
@Test
public void testShareGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData()
.setGroupId("foo");
@@ -2198,19 +2002,14 @@ public void testShareGroupHeartbeatWhenNotStarted() throws ExecutionException, I
@Test
public void testShareGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -2236,19 +2035,14 @@ public void testShareGroupHeartbeatWithException(
String expectedErrorMessage
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData()
.setGroupId("foo");
- service.startup(() -> 1);
-
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -2272,13 +2066,10 @@ public void testShareGroupHeartbeatWithException(
@Test
public void testShareGroupDescribe() throws InterruptedException, ExecutionException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 2;
service.startup(() -> partitionCount);
@@ -2315,13 +2106,10 @@ public void testShareGroupDescribe() throws InterruptedException, ExecutionExcep
@Test
public void testShareGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 1;
service.startup(() -> partitionCount);
@@ -2350,13 +2138,10 @@ public void testShareGroupDescribeInvalidGroupId() throws ExecutionException, In
@Test
public void testShareGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
int partitionCount = 1;
service.startup(() -> partitionCount);
@@ -2383,13 +2168,10 @@ public void testShareGroupDescribeCoordinatorLoadInProgress() throws ExecutionEx
@Test
public void testShareGroupDescribeCoordinatorNotActive() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
- GroupCoordinatorService service = new GroupCoordinatorService(
- new LogContext(),
- createConfig(),
- runtime,
- new GroupCoordinatorMetrics(),
- createConfigManager()
- );
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
@@ -2409,4 +2191,323 @@ public void testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti
future.get()
);
}
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithNoOpPersister() throws InterruptedException, ExecutionException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+ .setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE)))
+ )
+ );
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersister() throws InterruptedException, ExecutionException {
+ CoordinatorRuntime runtime = mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+ DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(21)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())))
+ )
+ );
+
+ ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setStateEpoch(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+ )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError() {
+ CoordinatorRuntime runtime = mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate read state summary request")));
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+ assertFutureThrows(future, Exception.class, "Unable to validate read state summary request");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() {
+ CoordinatorRuntime runtime = mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+ assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() {
+ CoordinatorRuntime runtime = mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
+ new ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build();
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+ assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(0)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())))
+ )
+ );
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+
+ // Forcing a null Metadata Image
+ service.onNewMetadataImage(null, null);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(0)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+ )
+ );
+
+ CompletableFuture future =
+ service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @FunctionalInterface
+ private interface TriFunction {
+ R apply(A a, B b, C c);
+ }
+
+ private static class GroupCoordinatorServiceBuilder {
+ private final LogContext logContext = new LogContext();
+ private final GroupConfigManager configManager = createConfigManager();
+ private GroupCoordinatorConfig config;
+ private CoordinatorRuntime runtime;
+ private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics();
+ private Persister persister = new NoOpShareStatePersister();
+ private MetadataImage metadataImage = null;
+
+ GroupCoordinatorService build() {
+ return build(false);
+ }
+
+ GroupCoordinatorService build(boolean serviceStartup) {
+ if (metadataImage == null) {
+ metadataImage = mock(MetadataImage.class);
+ }
+
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ logContext,
+ config,
+ runtime,
+ metrics,
+ configManager,
+ persister
+ );
+
+ if (serviceStartup) {
+ service.startup(() -> 1);
+ service.onNewMetadataImage(metadataImage, null);
+ }
+ when(metadataImage.topics()).thenReturn(mock(TopicsImage.class));
+ when(metadataImage.topics().topicIdToNameView()).thenReturn(Map.of(TOPIC_ID, TOPIC_NAME));
+ when(metadataImage.topics().topicNameToIdView()).thenReturn(Map.of(TOPIC_NAME, TOPIC_ID));
+
+ return service;
+ }
+
+ public GroupCoordinatorServiceBuilder setConfig(GroupCoordinatorConfig config) {
+ this.config = config;
+ return this;
+ }
+
+ public GroupCoordinatorServiceBuilder setRuntime(CoordinatorRuntime runtime) {
+ this.runtime = runtime;
+ return this;
+ }
+
+ public GroupCoordinatorServiceBuilder setPersister(Persister persister) {
+ this.persister = persister;
+ return this;
+ }
+
+ public GroupCoordinatorServiceBuilder setMetrics(GroupCoordinatorMetrics metrics) {
+ this.metrics = metrics;
+ return this;
+ }
+ }
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java
index cb0efd25e9efb..0173dd7e992a5 100644
--- a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java
+++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() {
return new ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ ReadShareGroupStateSummaryParameters that = (ReadShareGroupStateSummaryParameters) o;
+ return Objects.equals(groupTopicPartitionData, that.groupTopicPartitionData);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(groupTopicPartitionData);
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 17bb993bc8196..967095040eec5 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -22,8 +22,7 @@
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
@@ -237,28 +236,21 @@ TreeMap earliest = new HashMap<>();
- Map latest = new HashMap<>();
+ ListShareGroupOffsetsSpec offsetsSpec = new ListShareGroupOffsetsSpec().topicPartitions(allTp);
+ Map groupSpecs = new HashMap<>();
+ groupSpecs.put(groupId, offsetsSpec);
- for (TopicPartition tp : allTp) {
- earliest.put(tp, OffsetSpec.earliest());
- latest.put(tp, OffsetSpec.latest());
- }
-
- // This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented
try {
- Map earliestResult = adminClient.listOffsets(earliest).all().get();
- Map latestResult = adminClient.listOffsets(latest).all().get();
+ Map earliestResult = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
Set partitionOffsets = new HashSet<>();
- for (Entry tp : earliestResult.entrySet()) {
+ for (Entry tp : earliestResult.entrySet()) {
SharePartitionOffsetInformation partitionOffsetInfo = new SharePartitionOffsetInformation(
groupId,
tp.getKey().topic(),
tp.getKey().partition(),
- latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset()
+ earliestResult.get(tp.getKey())
);
partitionOffsets.add(partitionOffsetInfo);
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 11fd3bc35a90b..bfb8c30f109f7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -17,13 +17,14 @@
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
@@ -159,15 +160,16 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception {
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
- ListOffsetsResult resultOffsets = new ListOffsetsResult(
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
- new TopicPartition("topic1", 0),
- KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty()))
- ));
+ firstGroup,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L))
+ )
+ );
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
- when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets);
+ when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
@@ -212,17 +214,33 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception {
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
- ListOffsetsResult resultOffsets = new ListOffsetsResult(
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
- new TopicPartition("topic1", 0),
- KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty()))
- ));
+ firstGroup,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L))
+ )
+ );
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ secondGroup,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L))
+ )
+ );
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
- when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets);
+ when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(
+ invocation -> {
+ Map argument = invocation.getArgument(0);
+ if (argument.containsKey(firstGroup)) {
+ return listShareGroupOffsetsResult1;
+ } else if (argument.containsKey(secondGroup)) {
+ return listShareGroupOffsetsResult2;
+ }
+ return null;
+ });
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));