Skip to content

Commit

Permalink
Merge pull request #193 from theY4Kman/feat/print-cg-group-instance-id
Browse files Browse the repository at this point in the history
Show group instance IDs in describe consumer-group output
  • Loading branch information
d-rk authored Aug 7, 2024
2 parents ace14dc + 1200743 commit db53027
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed
- Updated several dependencies and replaced abandoned avro schema registry lib with a new library

### Added
- [#207](https://github.com/deviceinsight/kafkactl/pull/207) Allow configuring TLS for avro schema registry
- [#193](https://github.com/deviceinsight/kafkactl/pull/193) Print group instance IDs in `describe consumer-group` command

## 5.0.6 - 2024-03-14

Expand Down
46 changes: 36 additions & 10 deletions internal/consumergroups/consumer-group-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type partitionOffset struct {
type consumerGroupMember struct {
ClientHost string `json:"clientHost" yaml:"clientHost"`
ClientID string `json:"clientId" yaml:"clientId"`
GroupInstanceID string `json:"groupInstanceId,omitempty" yaml:"groupInstanceId,omitempty"`
AssignedPartitions []topicPartition `json:"assignedPartitions" yaml:"assignedPartitions"`
}

Expand Down Expand Up @@ -73,11 +74,12 @@ type ConsumerGroupOperation struct {
func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeConsumerGroupFlags, group string) error {

var (
err error
ctx internal.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
descriptions []*sarama.GroupDescription
err error
ctx internal.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
descriptions []*sarama.GroupDescription
supportsGroupInstanceID bool
)

if ctx, err = internal.CreateClientContext(); err != nil {
Expand All @@ -88,6 +90,8 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon
return errors.Wrap(err, "failed to create client")
}

supportsGroupInstanceID = client.Config().Version.IsAtLeast(sarama.V2_4_0_0)

if admin, err = internal.CreateClusterAdmin(&ctx); err != nil {
return errors.Wrap(err, "failed to create cluster admin")
}
Expand Down Expand Up @@ -138,7 +142,7 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon

assignedPartitions := filterAssignedPartitions(memberAssignment.Topics, topicPartitions)

consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, assignedPartitions)
consumerGroupDescription.Members = addMember(consumerGroupDescription.Members, member.ClientHost, member.ClientId, member.GroupInstanceId, assignedPartitions)
}

sort.Slice(consumerGroupDescription.Members, func(i, j int) bool {
Expand Down Expand Up @@ -173,14 +177,30 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon
consumerGroupDescription.Members = nil
} else if flags.OutputFormat == "wide" || flags.OutputFormat == "" {
tableWriter := output.CreateTableWriter()
if err := tableWriter.WriteHeader("CLIENT_HOST", "CLIENT_ID", "TOPIC", "ASSIGNED_PARTITIONS"); err != nil {

columns := make([]string, 0, 5)
columns = append(columns, "CLIENT_HOST", "CLIENT_ID")
if supportsGroupInstanceID {
columns = append(columns, "GROUP_INSTANCE_ID")
}
columns = append(columns, "TOPIC", "ASSIGNED_PARTITIONS")

if err := tableWriter.WriteHeader(columns...); err != nil {
return err
}

for _, m := range consumerGroupDescription.Members {
for _, topic := range m.AssignedPartitions {
partitions := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(topic.Partitions)), ","), "[]")
if err := tableWriter.Write(m.ClientHost, m.ClientID, topic.Name, partitions); err != nil {

columns = columns[:0]
columns = append(columns, m.ClientHost, m.ClientID)
if supportsGroupInstanceID {
columns = append(columns, m.GroupInstanceID)
}
columns = append(columns, topic.Name, partitions)

if err := tableWriter.Write(columns...); err != nil {
return err
}
}
Expand Down Expand Up @@ -226,7 +246,7 @@ func filterAssignedPartitions(assignedPartitions map[string][]int32, topicPartit
return result
}

func addMember(members []consumerGroupMember, clientHost string, clientID string, assignedPartitions map[string][]int32) []consumerGroupMember {
func addMember(members []consumerGroupMember, clientHost string, clientID string, groupInstanceID *string, assignedPartitions map[string][]int32) []consumerGroupMember {

topicPartitionList := make([]topicPartition, 0)

Expand All @@ -247,7 +267,13 @@ func addMember(members []consumerGroupMember, clientHost string, clientID string
if len(assignedPartitions) == 0 {
return members
}
member := consumerGroupMember{ClientHost: clientHost, ClientID: clientID, AssignedPartitions: topicPartitionList}

groupInstanceIDString := ""
if groupInstanceID != nil {
groupInstanceIDString = *groupInstanceID
}

member := consumerGroupMember{ClientHost: clientHost, ClientID: clientID, GroupInstanceID: groupInstanceIDString, AssignedPartitions: topicPartitionList}
return append(members, member)
}

Expand Down

0 comments on commit db53027

Please sign in to comment.