Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show group instance IDs in describe consumer-group output #193

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed
- Updated several dependencies and replaced abandoned avro schema registry lib with a new library

### Added
- [#207](https://github.com/deviceinsight/kafkactl/pull/207) Allow configuring TLS for avro schema registry
- [#193](https://github.com/deviceinsight/kafkactl/pull/193) Print group instance IDs in `describe consumer-group` command

## 5.0.6 - 2024-03-14

Expand Down
46 changes: 36 additions & 10 deletions internal/consumergroups/consumer-group-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type partitionOffset struct {
type consumerGroupMember struct {
ClientHost string `json:"clientHost" yaml:"clientHost"`
ClientID string `json:"clientId" yaml:"clientId"`
GroupInstanceID string `json:"groupInstanceId,omitempty" yaml:"groupInstanceId,omitempty"`
AssignedPartitions []topicPartition `json:"assignedPartitions" yaml:"assignedPartitions"`
}

Expand Down Expand Up @@ -73,11 +74,12 @@ type ConsumerGroupOperation struct {
func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeConsumerGroupFlags, group string) error {

var (
err error
ctx internal.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
descriptions []*sarama.GroupDescription
err error
ctx internal.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
descriptions []*sarama.GroupDescription
supportsGroupInstanceID bool
)

if ctx, err = internal.CreateClientContext(); err != nil {
Expand All @@ -88,6 +90,8 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon
return errors.Wrap(err, "failed to create client")
}

supportsGroupInstanceID = client.Config().Version.IsAtLeast(sarama.V2_4_0_0)

if admin, err = internal.CreateClusterAdmin(&ctx); err != nil {
return errors.Wrap(err, "failed to create cluster admin")
}
Expand Down Expand Up @@ -138,7 +142,7 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon

assignedPartitions := filterAssignedPartitions(memberAssignment.Topics, topicPartitions)

consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, assignedPartitions)
consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, member.GroupInstanceId, assignedPartitions)
}

sort.Slice(consumerGroupDescription.Members, func(i, j int) bool {
Expand Down Expand Up @@ -173,14 +177,30 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon
consumerGroupDescription.Members = nil
} else if flags.OutputFormat == "wide" || flags.OutputFormat == "" {
tableWriter := output.CreateTableWriter()
if err := tableWriter.WriteHeader("CLIENT_HOST", "CLIENT_ID", "TOPIC", "ASSIGNED_PARTITIONS"); err != nil {

columns := make([]string, 0, 5)
columns = append(columns, "CLIENT_HOST", "CLIENT_ID")
if supportsGroupInstanceID {
columns = append(columns, "GROUP_INSTANCE_ID")
d-rk marked this conversation as resolved.
Show resolved Hide resolved
}
columns = append(columns, "TOPIC", "ASSIGNED_PARTITIONS")

if err := tableWriter.WriteHeader(columns...); err != nil {
return err
}

for _, m := range consumerGroupDescription.Members {
for _, topic := range m.AssignedPartitions {
partitions := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(topic.Partitions)), ","), "[]")
if err := tableWriter.Write(m.ClientHost, m.ClientID, topic.Name, partitions); err != nil {

columns = columns[:0]
columns = append(columns, m.ClientHost, m.ClientID)
if supportsGroupInstanceID {
columns = append(columns, m.GroupInstanceID)
}
columns = append(columns, topic.Name, partitions)

if err := tableWriter.Write(columns...); err != nil {
return err
}
}
Expand Down Expand Up @@ -226,7 +246,7 @@ func filterAssignedPartitions(assignedPartitions map[string][]int32, topicPartit
return result
}

func addMember(members []consumerGroupMember, clientHost string, clientID string, assignedPartitions map[string][]int32) []consumerGroupMember {
func addMember(members []consumerGroupMember, clientHost string, clientID string, groupInstanceID *string, assignedPartitions map[string][]int32) []consumerGroupMember {

topicPartitionList := make([]topicPartition, 0)

Expand All @@ -247,7 +267,13 @@ func addMember(members []consumerGroupMember, clientHost string, clientID string
if len(assignedPartitions) == 0 {
return members
}
member := consumerGroupMember{ClientHost: clientHost, ClientID: clientID, AssignedPartitions: topicPartitionList}

groupInstanceIDString := ""
if groupInstanceID != nil {
groupInstanceIDString = *groupInstanceID
}

member := consumerGroupMember{ClientHost: clientHost, ClientID: clientID, GroupInstanceID: groupInstanceIDString, AssignedPartitions: topicPartitionList}
return append(members, member)
}

Expand Down
Loading