Skip to content

Commit

Permalink
feat: hide group instance ID in describe cg if unsupported (<2.4.0)
Browse files Browse the repository at this point in the history
  • Loading branch information
theY4Kman committed Apr 11, 2024
1 parent 459a8a6 commit bec03bb
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions internal/consumergroups/consumer-group-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type partitionOffset struct {
type consumerGroupMember struct {
ClientHost string `json:"clientHost" yaml:"clientHost"`
ClientID string `json:"clientId" yaml:"clientId"`
GroupInstanceID string `json:"groupInstanceId" yaml:"groupInstanceId"`
GroupInstanceID string `json:"groupInstanceId,omitempty" yaml:"groupInstanceId,omitempty"`
AssignedPartitions []topicPartition `json:"assignedPartitions" yaml:"assignedPartitions"`
}

Expand Down Expand Up @@ -74,11 +74,12 @@ type ConsumerGroupOperation struct {
func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeConsumerGroupFlags, group string) error {

var (
err error
ctx internal.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
descriptions []*sarama.GroupDescription
err error
ctx internal.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
descriptions []*sarama.GroupDescription
supportsGroupInstanceID bool
)

if ctx, err = internal.CreateClientContext(); err != nil {
Expand All @@ -89,6 +90,8 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon
return errors.Wrap(err, "failed to create client")
}

supportsGroupInstanceID = client.Config().Version.IsAtLeast(sarama.V2_4_0_0)

if admin, err = internal.CreateClusterAdmin(&ctx); err != nil {
return errors.Wrap(err, "failed to create cluster admin")
}
Expand Down Expand Up @@ -174,13 +177,29 @@ func (operation *ConsumerGroupOperation) DescribeConsumerGroup(flags DescribeCon
consumerGroupDescription.Members = nil
} else if flags.OutputFormat == "wide" || flags.OutputFormat == "" {
tableWriter := output.CreateTableWriter()
if err := tableWriter.WriteHeader("CLIENT_HOST", "CLIENT_ID", "GROUP_INSTANCE_ID", "TOPIC", "ASSIGNED_PARTITIONS"); err != nil {

columns := make([]string, 0, 5)
columns = append(columns, "CLIENT_HOST", "CLIENT_ID")
if supportsGroupInstanceID {
columns = append(columns, "GROUP_INSTANCE_ID")
}
columns = append(columns, "TOPIC", "ASSIGNED_PARTITIONS")

if err := tableWriter.WriteHeader(columns...); err != nil {
return err
}

for _, m := range consumerGroupDescription.Members {
for _, topic := range m.AssignedPartitions {
partitions := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(topic.Partitions)), ","), "[]")

columns = columns[:0]
columns = append(columns, m.ClientHost, m.ClientID)
if supportsGroupInstanceID {
columns = append(columns, m.GroupInstanceID)
}
columns = append(columns, topic.Name, partitions)

if err := tableWriter.Write(m.ClientHost, m.ClientID, m.GroupInstanceID, topic.Name, partitions); err != nil {
return err
}
Expand Down

0 comments on commit bec03bb

Please sign in to comment.