Skip to content

Commit

Permalink
snssqs: fix consumer starvation
Browse files Browse the repository at this point in the history
The current implementation uses separate goroutines to process each
message retrieved by `sqsClient.ReceiveMessageWithContext`, which is
executed when `concurrencyMode` is set to `parallel`.
The `consumeSubscription` function waits for all spawned goroutines to
complete before receiving the next batch. However, this design has a
critical flaw: it starves the consumer if new messages are published
during the processing of an existing batch. These newly published
messages won't be processed until all goroutines from the original batch
have finished their work, effectively blocking the consumption of the
new messages.

This PR changes that by removing the waiting mechanism.

Signed-off-by: Gustavo Chain <[email protected]>
  • Loading branch information
qustavo committed Jul 5, 2024
1 parent cfee998 commit 6e6601d
Showing 1 changed file with 0 additions and 7 deletions.
7 changes: 0 additions & 7 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -623,33 +622,27 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
}
s.logger.Debugf("%v message(s) received on queue %s", len(messageResponse.Messages), queueInfo.arn)

var wg sync.WaitGroup
for _, message := range messageResponse.Messages {
if err := s.validateMessage(ctx, message, queueInfo, deadLettersQueueInfo); err != nil {
s.logger.Errorf("message is not valid for further processing by the handler. error is: %v", err)
continue
}

f := func(message *sqs.Message) {
defer wg.Done()
if err := s.callHandler(ctx, message, queueInfo); err != nil {
s.logger.Errorf("error while handling received message. error is: %v", err)
}
}

wg.Add(1)
switch s.metadata.ConcurrencyMode {
case pubsub.Single:
f(message)
case pubsub.Parallel:
wg.Add(1)
go func(message *sqs.Message) {
defer wg.Done()
f(message)
}(message)
}
}
wg.Wait()
}
}

Expand Down

0 comments on commit 6e6601d

Please sign in to comment.