diff --git a/async_producer.go b/async_producer.go index 5f257524b..b0be145ff 100644 --- a/async_producer.go +++ b/async_producer.go @@ -16,9 +16,16 @@ import ( // ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow") -// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function. -// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit. -const minFunctionalRetryBufferLength = 4 * 1024 +const ( + // minFunctionalRetryBufferLength defines the minimum number of messages the retry buffer must support. + // If Producer.Retry.MaxBufferLength is set to a non-zero value below this limit, it will be adjusted to this value. + // This ensures the retry buffer remains functional under typical workloads. + minFunctionalRetryBufferLength = 4 * 1024 + // minFunctionalRetryBufferBytes defines the minimum total byte size the retry buffer must support. + // If Producer.Retry.MaxBufferBytes is set to a non-zero value below this limit, it will be adjusted to this value. + // A 32 MB lower limit ensures sufficient capacity for retrying larger messages without exhausting resources. + minFunctionalRetryBufferBytes = 32 * 1024 * 1024 +) // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, @@ -1214,11 +1221,22 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - maxBufferSize := p.conf.Producer.Retry.MaxBufferLength - if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength { - maxBufferSize = minFunctionalRetryBufferLength + maxBufferLength := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { + maxBufferLength = minFunctionalRetryBufferLength + } + + maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes + if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { + maxBufferBytes = minFunctionalRetryBufferBytes } + version := 1 + if p.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + + var currentByteSize int64 var msg *ProducerMessage buf := queue.New() @@ -1229,7 +1247,8 @@ func (p *asyncProducer) retryHandler() { select { case msg = <-p.retries: case p.input <- buf.Peek().(*ProducerMessage): - buf.Remove() + msgToRemove := buf.Remove().(*ProducerMessage) + currentByteSize -= int64(msgToRemove.ByteSize(version)) continue } } @@ -1239,15 +1258,18 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) + currentByteSize += int64(msg.ByteSize(version)) - if maxBufferSize > 0 && buf.Length() >= maxBufferSize { + if (maxBufferLength > 0 && buf.Length() >= maxBufferLength) || (maxBufferBytes > 0 && currentByteSize >= maxBufferBytes) { msgToHandle := buf.Peek().(*ProducerMessage) if msgToHandle.flags == 0 { select { case p.input <- msgToHandle: buf.Remove() + currentByteSize -= int64(msgToHandle.ByteSize(version)) default: buf.Remove() + currentByteSize -= int64(msgToHandle.ByteSize(version)) p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) } } diff --git a/async_producer_test.go b/async_producer_test.go index 8e395c755..2cef52477 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1,5 +1,3 @@ -//go:build !functional - package sarama import ( @@ -9,6 +7,7 @@ import ( "os" "os/signal" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -2176,7 +2175,7 @@ func TestTxnCanAbort(t *testing.T) { require.NoError(t, err) } -func TestPreventRetryBufferOverflow(t *testing.T) { +func TestProducerRetryBufferLimits(t *testing.T) { broker := NewMockBroker(t, 1) defer broker.Close() topic := "test-topic" @@ -2199,57 +2198,86 @@ func TestPreventRetryBufferOverflow(t *testing.T) { "MetadataRequest": metadataRequestHandlerFunc, }) - config := NewTestConfig() - config.Producer.Flush.MaxMessages = 1 - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength - config.Producer.Return.Successes = true - - producer, err := NewAsyncProducer([]string{broker.Addr()}, config) - if err != nil { - t.Fatal(err) + tests := []struct { + name string + configureBuffer func(*Config) + messageSize int + numMessages int + }{ + { + name: "MaxBufferLength", + configureBuffer: func(config *Config) { + config.Producer.Flush.MaxMessages = 1 + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + }, + messageSize: 1, // Small message size + numMessages: 10000, + }, + { + name: "MaxBufferBytes", + configureBuffer: func(config *Config) { + config.Producer.Flush.MaxMessages = 1 + config.Producer.Retry.MaxBufferBytes = minFunctionalRetryBufferBytes + }, + messageSize: 950 * 1024, // 950 KB + numMessages: 1000, + }, } - var ( - wg sync.WaitGroup - successes, producerErrors int - errorFound bool - ) - - wg.Add(1) - go func() { - defer wg.Done() - for range producer.Successes() { - successes++ - } - }() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := NewTestConfig() + config.Producer.Return.Successes = true + tt.configureBuffer(config) - wg.Add(1) - go func() { - defer wg.Done() - for errMsg := range producer.Errors() { - if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) { - errorFound = true + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) } - producerErrors++ - } - }() - numMessages := 100000 - for i := 0; i < numMessages; i++ { - kv := StringEncoder(strconv.Itoa(i)) - producer.Input() <- &ProducerMessage{ - Topic: topic, - Key: kv, - Value: kv, - Metadata: i, - } - } + var ( + wg sync.WaitGroup + successes, producerErrors int + errorFound bool + ) + + wg.Add(1) + go func() { + defer wg.Done() + for range producer.Successes() { + successes++ + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for errMsg := range producer.Errors() { + if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) { + errorFound = true + } + producerErrors++ + } + }() - producer.AsyncClose() - wg.Wait() + longString := strings.Repeat("a", tt.messageSize) + val := StringEncoder(longString) - require.Equal(t, successes+producerErrors, numMessages, "Expected all messages to be processed") - require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow") + for i := 0; i < tt.numMessages; i++ { + msg := &ProducerMessage{ + Topic: topic, + Value: val, + } + producer.Input() <- msg + } + + producer.AsyncClose() + wg.Wait() + + require.Equal(t, successes+producerErrors, tt.numMessages, "Expected all messages to be processed") + require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow") + }) + } } // This example shows how to use the producer while simultaneously diff --git a/config.go b/config.go index 8c7c4c985..6a198dc89 100644 --- a/config.go +++ b/config.go @@ -276,6 +276,13 @@ type Config struct { // Any value between 0 and 4096 is pushed to 4096. // A zero or negative value indicates unlimited. MaxBufferLength int + // The maximum total byte size of messages in the bridging buffer between `input` + // and `retries` channels in AsyncProducer#retryHandler. + // This limit prevents the buffer from consuming excessive memory. + // Defaults to 0 for unlimited. + // Any value between 0 and 32 MB is pushed to 32 MB. + // A zero or negative value indicates unlimited. + MaxBufferBytes int64 } // Interceptors to be called when the producer dispatcher reads the