From 75dcdef8c985a0a35040bd55115600f88efe0be0 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Thu, 5 Sep 2024 09:08:38 -0700 Subject: [PATCH] =?UTF-8?q?Resolving=20a=20weird=20edge=20case=20in=20case?= =?UTF-8?q?=20of=20a=20poison=20pill=20message=20being=20re=E2=80=A6=20(#3?= =?UTF-8?q?534)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Patrick Assuied Co-authored-by: Patrick Assuied --- common/component/kafka/consumer.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 0a1c2acfff..7cb923a455 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -68,6 +68,15 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai } else { for { select { + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + // Make sure the check for session context done happens before the next message is processed. + // There is a possibility that the pod takes some time to shutdown and in case of a poison pill message, the `retry` would get interrupted (as expected), + // but the next message would be processed as a result, + // therefore dropping the poison pill message regardless of resiliency policy. + case <-session.Context().Done(): + return nil case message, ok := <-claim.Messages(): if !ok { return nil @@ -89,11 +98,6 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) } } - // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: - // https://github.com/IBM/sarama/issues/1192 - case <-session.Context().Done(): - return nil } } }