From 80657f4569ed162417b09d64350181a922592f06 Mon Sep 17 00:00:00 2001 From: Adriel Velazquez Date: Sat, 30 Dec 2023 00:12:02 -0800 Subject: [PATCH] Adding a new delivery report struct for more detailed information from librdkafka --- kafka/message.go | 20 ++++++++++++++++++++ kafka/producer_test.go | 6 ++++++ 2 files changed, 26 insertions(+) diff --git a/kafka/message.go b/kafka/message.go index d473771a7..4c6a05f10 100644 --- a/kafka/message.go +++ b/kafka/message.go @@ -68,6 +68,11 @@ func (t TimestampType) String() string { } } +type DeliveryReport struct { + ProducerLatency int64 + BrokerID int32 +} + // Message represents a Kafka message type Message struct { TopicPartition TopicPartition @@ -76,6 +81,7 @@ type Message struct { Timestamp time.Time TimestampType TimestampType Opaque interface{} + DeliveryReport DeliveryReport Headers []Header LeaderEpoch *int32 // Deprecated: LeaderEpoch or nil if not available. Use m.TopicPartition.LeaderEpoch instead. } @@ -135,6 +141,20 @@ func (h *handle) newMessageFromGlueMsg(gMsg *C.glue_msg_t) (msg *Message) { return msg } +func generateDeliveryReport(msg *Message, cmsg *C.rd_kafka_message_t) { + msg.DeliveryReport = DeliveryReport{} + + // The latency for a produced message measured from the produce() call. + producerLatency := int64(C.rd_kafka_message_latency(cmsg)) + if producerLatency >= 0 { + msg.DeliveryReport.ProducerLatency = producerLatency + } + + // The latency for a produced message measured from the produce() call. + brokerId := int32(C.rd_kafka_message_broker_id(cmsg)) + msg.DeliveryReport.BrokerID = brokerId +} + // setupMessageFromC sets up a message object from a C rd_kafka_message_t func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) { if cmsg.rkt != nil { diff --git a/kafka/producer_test.go b/kafka/producer_test.go index 634746202..b50853fb5 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -136,6 +136,12 @@ func TestProducerAPIs(t *testing.T) { switch e := ev.(type) { case *Message: msgCnt++ + if *&e.DeliveryReport.ProducerLatency <= 0 { + t.Errorf("Producer Latency should be included in delivery reports, instead got %d", *&e.DeliveryReport.ProducerLatency) + } + if *&e.DeliveryReport.BrokerID == -1 { + t.Errorf("Broker ID should be included in delivery reports, instead got %d", *&e.DeliveryReport.BrokerID) + } if (string)(e.Value) == "ProducerChannel" { s := e.Opaque.(*string) if s != &myOpq {