Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-16720] AdminClient Support for ListShareGroupOffsets (2/2) #18671

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.common"/>
<allow pkg="org.apache.kafka.server.record"/>
<allow pkg="org.apache.kafka.server.share.persister"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.storage.internals.log"/>
<allow pkg="org.apache.kafka.test" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeShareGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
Expand Down Expand Up @@ -3796,12 +3797,14 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

// To do in a follow-up PR
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
final ListShareGroupOffsetsOptions options) {
// To-do
throw new InvalidRequestException("The method is not yet implemented");
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future =
DescribeShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
DescribeShareGroupOffsetsHandler handler = new DescribeShareGroupOffsetsHandler(groupSpecs, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListShareGroupOffsetsResult(future.all());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ListShareGroupOffsetsResult {

private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;

ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) {
public ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;


public class DescribeShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {

private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;

public DescribeShareGroupOffsetsHandler(
Map<String, ListShareGroupOffsetsSpec> groupSpecs,
LogContext logContext) {
this.groupSpecs = groupSpecs;
this.log = logContext.logger(DescribeShareGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}

@Override
public String apiName() {
return "describeShareGroupOffsets";
}

@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}

@Override
public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) {
List<String> groupIds = keys.stream().map(key -> {
if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
throw new IllegalArgumentException("Invalid group coordinator key " + key +
" when building `DescribeShareGroupOffsets` request");
}
return key.idValue;
}).collect(Collectors.toList());
String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
if (groupId == null) {
throw new IllegalArgumentException("GroupId is null");
}
ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic> topics =
spec.topicPartitions().stream().map(
topicPartition -> new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
.setTopicName(topicPartition.topic())
.setPartitions(List.of(topicPartition.partition()))
).collect(Collectors.toList());
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(topics);
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
}

@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();

for (CoordinatorKey groupId : groupIds) {
Map<TopicPartition, Long> data = new HashMap<>();
response.data().responses().stream().map(
describedTopic ->
describedTopic.partitions().stream().map(
partition ->
data.put(new TopicPartition(describedTopic.topicName(), partition.partitionIndex()), partition.startOffset())
).collect(Collectors.toList())
).collect(Collectors.toList());
completed.put(groupId, data);
}
return new ApiResult<>(completed, failed, new ArrayList<>());
}

private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) {
return groupIds.stream()
.map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,25 @@ public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, short v
version
);
}

public static List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> getErrorReadShareGroupStateSummary(
List<ReadShareGroupStateSummaryRequestData.ReadStateSummaryData> topics,
Errors error
) {
return topics.stream()
.map(
requestTopic -> new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(requestTopic.topicId())
.setPartitions(
requestTopic.partitions().stream().map(
partition -> new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition.partition())
.setErrorCode(error.code())
.setErrorMessage(error.message())
.setStartOffset(0)
.setStateEpoch(0)
).collect(Collectors.toList())
)
).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
Expand Down Expand Up @@ -194,6 +196,8 @@
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
Expand Down Expand Up @@ -8969,4 +8973,85 @@ public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) thro
assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), requestData.get().voterDirectoryId());
}
}

@Test
public void testListShareGroupOffsetsOptionsWithBatchedApi() {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();

try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("A", 0));
final ListShareGroupOffsetsOptions options = new ListShareGroupOffsetsOptions();

final ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec()
.topicPartitions(partitions);
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec);

env.adminClient().listShareGroupOffsets(groupSpecs, options);

final MockClient mockClient = env.kafkaClient();
waitForRequest(mockClient, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);

ClientRequest clientRequest = mockClient.requests().peek();
assertNotNull(clientRequest);
DescribeShareGroupOffsetsRequestData data = ((DescribeShareGroupOffsetsRequest.Builder) clientRequest.requestBuilder()).build().data();
assertEquals(GROUP_ID, data.groupId());
assertEquals(Collections.singletonList("A"),
data.topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
public void testListShareGroupOffsets() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
TopicPartition myTopicPartition4 = new TopicPartition("my_topic_1", 4);
TopicPartition myTopicPartition5 = new TopicPartition("my_topic_2", 6);


ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec().topicPartitions(
List.of(myTopicPartition0, myTopicPartition1, myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
);
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec);

DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData().setResponses(
List.of(
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
)
);
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));

final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, Long> partitionToOffsetAndMetadata = result.partitionsToOffset(GROUP_ID).get();

assertEquals(6, partitionToOffsetAndMetadata.size());
assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(11, partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(40, partitionToOffsetAndMetadata.get(myTopicPartition2));
assertEquals(50, partitionToOffsetAndMetadata.get(myTopicPartition3));
assertEquals(100, partitionToOffsetAndMetadata.get(myTopicPartition4));
assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition5));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.coordinator.group

import kafka.server.{KafkaConfig, ReplicaManager}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ReadShareGroupStateSummaryRequestData, ReadShareGroupStateSummaryResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
Expand Down Expand Up @@ -86,6 +86,15 @@ private[group] class GroupCoordinatorAdapter(
))
}

override def describeShareGroupOffsets(
context: RequestContext,
request: ReadShareGroupStateSummaryRequestData
): CompletableFuture[ReadShareGroupStateSummaryResponseData] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS.name} API."
))
}

override def joinGroup(
context: RequestContext,
request: JoinGroupRequestData,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ class BrokerServer(
.withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.build()
} else {
GroupCoordinatorAdapter(
Expand Down
Loading
Loading