diff --git a/CHANGELOG.md b/CHANGELOG.md index 0051fcad..b7da34d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Changed -- Updated several dependencies and replaced abandoned avro schema registry lib with a new library - ### Added - [#207](https://github.com/deviceinsight/kafkactl/pull/207) Allow configuring TLS for avro schema registry +- [#193](https://github.com/deviceinsight/kafkactl/pull/193) Print group instance IDs in `describe consumer-group` command ## 5.0.6 - 2024-03-14 diff --git a/internal/consumergroups/consumer-group-operation.go b/internal/consumergroups/consumer-group-operation.go index 61f99fb0..e54b8cd3 100644 --- a/internal/consumergroups/consumer-group-operation.go +++ b/internal/consumergroups/consumer-group-operation.go @@ -43,6 +43,7 @@ type partitionOffset struct { type consumerGroupMember struct { ClientHost string `json:"clientHost" yaml:"clientHost"` ClientID string `json:"clientId" yaml:"clientId"` + GroupInstanceID string `json:"groupInstanceId,omitempty" yaml:"groupInstanceId,omitempty"` AssignedPartitions []topicPartition `json:"assignedPartitions" yaml:"assignedPartitions"` } @@ -73,11 +74,12 @@ type ConsumerGroupOperation struct { func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeConsumerGroupFlags, group string) error { var ( - err error - ctx internal.ClientContext - client sarama.Client - admin sarama.ClusterAdmin - descriptions []*sarama.GroupDescription + err error + ctx internal.ClientContext + client sarama.Client + admin sarama.ClusterAdmin + descriptions []*sarama.GroupDescription + supportsGroupInstanceID bool ) if ctx, err = internal.CreateClientContext(); err != nil { @@ -88,6 +90,8 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon return errors.Wrap(err, "failed to create client") } + supportsGroupInstanceID = client.Config().Version.IsAtLeast(sarama.V2_4_0_0) + if admin, err = internal.CreateClusterAdmin(&ctx); err != nil { return errors.Wrap(err, "failed to create cluster admin") } @@ -138,7 +142,7 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon assignedPartitions := filterAssignedPartitions(memberAssignment.Topics, topicPartitions) - consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, assignedPartitions) + consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, member.GroupInstanceId, assignedPartitions) } sort.Slice(consumerGroupDescription.Members, func(i, j int) bool { @@ -173,14 +177,30 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon consumerGroupDescription.Members = nil } else if flags.OutputFormat == "wide" || flags.OutputFormat == "" { tableWriter := output.CreateTableWriter() - if err := tableWriter.WriteHeader("CLIENT_HOST", "CLIENT_ID", "TOPIC", "ASSIGNED_PARTITIONS"); err != nil { + + columns := make([]string, 0, 5) + columns = append(columns, "CLIENT_HOST", "CLIENT_ID") + if supportsGroupInstanceID { + columns = append(columns, "GROUP_INSTANCE_ID") + } + columns = append(columns, "TOPIC", "ASSIGNED_PARTITIONS") + + if err := tableWriter.WriteHeader(columns...); err != nil { return err } for _, m := range consumerGroupDescription.Members { for _, topic := range m.AssignedPartitions { partitions := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(topic.Partitions)), ","), "[]") - if err := tableWriter.Write(m.ClientHost, m.ClientID, topic.Name, partitions); err != nil { + + columns = columns[:0] + columns = append(columns, m.ClientHost, m.ClientID) + if supportsGroupInstanceID { + columns = append(columns, m.GroupInstanceID) + } + columns = append(columns, topic.Name, partitions) + + if err := tableWriter.Write(columns...); err != nil { return err } } @@ -226,7 +246,7 @@ func filterAssignedPartitions(assignedPartitions map[string][]int32, topicPartit return result } -func addMember(members []consumerGroupMember, clientHost string, clientID string, assignedPartitions map[string][]int32) []consumerGroupMember { +func addMember(members []consumerGroupMember, clientHost string, clientID string, groupInstanceID *string, assignedPartitions map[string][]int32) []consumerGroupMember { topicPartitionList := make([]topicPartition, 0) @@ -247,7 +267,13 @@ func addMember(members []consumerGroupMember, clientHost string, clientID string if len(assignedPartitions) == 0 { return members } - member := consumerGroupMember{ClientHost: clientHost, ClientID: clientID, AssignedPartitions: topicPartitionList} + + groupInstanceIDString := "" + if groupInstanceID != nil { + groupInstanceIDString = *groupInstanceID + } + + member := consumerGroupMember{ClientHost: clientHost, ClientID: clientID, GroupInstanceID: groupInstanceIDString, AssignedPartitions: topicPartitionList} return append(members, member) }