Skip to content

Commit

Permalink
snssqs: add concurrencyLimit param
Browse files Browse the repository at this point in the history
The `ConcurrencyLimit` param controls the number of concurrent process
that handles messages.
  • Loading branch information
qustavo committed Jul 6, 2024
1 parent 3248974 commit e846809
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pubsub/aws/snssqs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type snsSqsMetadata struct {
AccountID string `mapstructure:"accountID"`
// processing concurrency mode
ConcurrencyMode pubsub.ConcurrencyMode `mapstructure:"concurrencyMode"`
// limits the number of concurrent goroutines
ConcurrencyLimit int `mapstructure:"concurrencyLimit"`
}

func maskLeft(s string) string {
Expand Down Expand Up @@ -130,6 +132,10 @@ func (s *snsSqs) getSnsSqsMetatdata(meta pubsub.Metadata) (*snsSqsMetadata, erro
return nil, err
}

if md.ConcurrencyLimit < 0 {
return nil, errors.New("concurrencyLimit must be greater than or equal to 0")
}

s.logger.Debug(md.hideDebugPrintedCredentials())

return md, nil
Expand Down
11 changes: 10 additions & 1 deletion pubsub/aws/snssqs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,18 @@ metadata:
default: '"parallel"'
example: '"single", "parallel"'
type: string
- name: concurrencyLimit
required: false
description: |
Defines the maximum number of concurrent workers handling messages.
This value is ignored when concurrentMode is set to “single“.
To avoid limiting the number of concurrent workers set this to “0“.
type: number
default: '0'
example: '100'
- name: accountId
required: false
description: |
The AWS account ID. Resolved automatically if not provided.
example: '""'
type: string
type: string
14 changes: 14 additions & 0 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
WaitTimeSeconds: aws.Int64(s.metadata.MessageWaitTimeSeconds),
}

// sem is a semaphore used to control the concurrencyLimit.
// It is set only when we are in parallel mode and limit is > 0.
var sem chan (struct{}) = nil
if (s.metadata.ConcurrencyMode == pubsub.Parallel) && s.metadata.ConcurrencyLimit > 0 {
sem = make(chan struct{}, s.metadata.ConcurrencyLimit)
}

for {
// If the context is canceled, stop requesting messages
if ctx.Err() != nil {
Expand Down Expand Up @@ -641,6 +648,13 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
f(message)
case pubsub.Parallel:
go func(message *sqs.Message) {
// This is the back pressure mechanism.
// It will block until another goroutine frees a slot.
if sem != nil {
sem <- struct{}{}
defer func() { <-sem }()
}

f(message)
}(message)
}
Expand Down
17 changes: 17 additions & 0 deletions pubsub/aws/snssqs/snssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) {
"consumerID": "consumer",
"Endpoint": "endpoint",
"concurrencyMode": string(pubsub.Single),
"concurrencyLimit": "42",
"accessKey": "a",
"secretKey": "s",
"sessionToken": "t",
Expand All @@ -68,6 +69,7 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) {
r.Equal("consumer", md.SqsQueueName)
r.Equal("endpoint", md.Endpoint)
r.Equal(pubsub.Single, md.ConcurrencyMode)
r.Equal(42, md.ConcurrencyLimit)
r.Equal("a", md.AccessKey)
r.Equal("s", md.SecretKey)
r.Equal("t", md.SessionToken)
Expand Down Expand Up @@ -105,6 +107,7 @@ func Test_getSnsSqsMetatdata_defaults(t *testing.T) {
r.Equal("", md.SessionToken)
r.Equal("r", md.Region)
r.Equal(pubsub.Parallel, md.ConcurrencyMode)
r.Equal(0, md.ConcurrencyLimit)
r.Equal(int64(10), md.MessageVisibilityTimeout)
r.Equal(int64(10), md.MessageRetryLimit)
r.Equal(int64(2), md.MessageWaitTimeSeconds)
Expand Down Expand Up @@ -273,6 +276,20 @@ func Test_getSnsSqsMetatdata_invalidMetadataSetup(t *testing.T) {
}}},
name: "invalid message concurrencyMode",
},
// invalid concurrencyLimit
{
metadata: pubsub.Metadata{Base: metadata.Base{Properties: map[string]string{
"consumerID": "consumer",
"Endpoint": "endpoint",
"AccessKey": "acctId",
"SecretKey": "secret",
"awsToken": "token",
"Region": "region",
"messageRetryLimit": "10",
"concurrencyLimit": "-1",
}}},
name: "invalid message concurrencyLimit",
},
}

l := logger.NewLogger("SnsSqs unit test")
Expand Down

0 comments on commit e846809

Please sign in to comment.