-
Notifications
You must be signed in to change notification settings - Fork 480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snssqs: fix consumer starvation #3478
base: main
Are you sure you want to change the base?
Conversation
6e6601d
to
3248974
Compare
Thanks for this PR. I can see why the change is useful and agree the current implementation has some issues. However, by removing all waiters, what I am concerned with now is that Dapr could be "overloaded" if too many messages are received at once, as it would create new goroutines without any limit. Perhaps a better solution could be to limit the number of concurrent goroutines that are created? Some other components (like Azure Service Bus) have similar configuration options where you can set a max number of active messages. This way there's also some "natural backpressure" implemented. |
Backpressure is definitively valuable, and from what I can see, implementations like Redis are already controlling the number of goroutines: https://github.com/dapr/components-contrib/blob/main/pubsub/redis/redis.go#L89. However, this sounds like a global concern that could be implemented at the pubsub level and not as a driver feature. I think that a global concurrency control allow operators to switch from pubsub implementation keeping the same backpressure settings. |
That does make sense, but it would require some major changes to a lot of components. Additionally integrating that would not be straightforward, since Dapr currently delegates concurrency control for this scenario to individual components. If for now you could implement a |
You are right. Is this something you think its worth working on?
Sure, I'm happy to add the feature. Initially I thought about
|
I just looked at the Azure Service Bus component and looks like the default is actually 0, i.e. unlimited. I'm not a fan of unlimited, but for consistency it could make sense. However, I'll accept other suggestions too!
My gut initially said "no", but then I saw that other components do allow un-capping the limit (like Service Bus), so perhaps it does make sense. (although I would personally not recommend controlling the limit on the caller (i.e. the publisher) side) |
Ok, let'a use that one then.
While
Right, by In summary, I'd document this as follows:
|
e846809
to
74537ab
Compare
pubsub/aws/snssqs/snssqs.go
Outdated
// This is the back pressure mechanism. | ||
// It will block until another goroutine frees a slot. | ||
if sem != nil { | ||
sem <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this line (that blocks) should be outside of the lambda. Currently, the goroutine is still spawned for each message, but then the goroutine is blocked. Perhaps it'd be wiser to not create the goroutine at all, and block the loop, since too many goroutines (even blocked) can "clog" the entire Dapr application (and cause a lot of pressure on the GC)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great point, didn't see that. I've just fixed it.
8f559d0
to
9acedbf
Compare
@qustavo can you open an issue in this repo and also explain the problem there then link to it in this PR? We don't usually accept PRs from non-maintainers/approvers without an issue being open first (and I don't mean issues that have been opened at the same time as the PR) Please note that the 1.14 release is already underway and we have already cut the components-contrib release branch for 1.14. So this will not make it into the 1.14 release most likely. We can assess the need for this fix to improvement to make it into the 1.14 release within the issue (to be created). |
Sure, will create issue |
I'm ok with merging now - but in the absence of an issue this definitely cannot be considered to be included in 1.14. |
/ok-to-test |
Components certification testCommit ref: 9acedbf ❌ Some certification tests failedThese tests failed:
|
Components conformance testCommit ref: 9acedbf ❌ Some conformance tests failedThese tests failed:
|
Complete Build MatrixThe build status is currently not updated here. Please visit the action run below directly. Commit ref: 9acedbf |
Unfortunately the SNS integration tests have been broken. I think this needs to be fixed by @sicoyle @artursouza or someone else with more AWS integration test experience (and the credentials to manage the test infrastructure). While I'm sure nothing is inherently wrong with this PR, we just cannot be certain due to the broken tests (which were not broken by this PR). |
@berndverst AFAICS the issue is with |
/ok-to-test |
Complete Build MatrixThe build status is currently not updated here. Please visit the action run below directly. Commit ref: 882ac66 |
Components conformance testCommit ref: 882ac66 ❌ Some conformance tests failedThese tests failed:
|
Components certification testCommit ref: 882ac66 ❌ Some certification tests failedThese tests failed:
|
any progress on this? This is currently stopping our sns/sqs deployment, please lmk if there's anything I can do to get it merged. |
Taking a look today |
any insight? |
@qustavo the build is failing, please see the errors |
The current implementation uses separate goroutines to process each message retrieved by `sqsClient.ReceiveMessageWithContext`, which is executed when `concurrencyMode` is set to `parallel`. The `consumeSubscription` function waits for all spawned goroutines to complete before receiving the next batch. However, this design has a critical flaw: it starves the consumer if new messages are published during the processing of an existing batch. These newly published messages won't be processed until all goroutines from the original batch have finished their work, effectively blocking the consumption of the new messages. This PR changes that by removing the waiting mechanism. Signed-off-by: Gustavo Chain <[email protected]>
The `ConcurrencyLimit` param controls the number of concurrent process that handles messages. Signed-off-by: Gustavo Chain <[email protected]>
Signed-off-by: Alessandro (Ale) Segala <[email protected]>
98b880f
to
0c8b4d5
Compare
Sorry, I assumed it was a certification error, seems like it was a merge issue, will fix. |
/ok-to-test |
@ItalyPaleAle @yaron2 Trying to reproduce the CI error by running: DAPR_TEST_MQTT_URL=tcp://localhost:1883 go test ./... -v But locally all my tests are passing, not really sure how to reproduce it, any suggestion? |
/ok-to-test |
Components conformance testCommit ref: 7827b51 ❌ Some conformance tests failedThese tests failed:
|
Complete Build MatrixThe build status is currently not updated here. Please visit the action run below directly. Commit ref: 7827b51 |
Components certification testCommit ref: 7827b51 ❌ Some certification tests failedThese tests failed:
|
@qustavo please open a docs PR to document the concurrency limit property for the component. Once you link the docs PR here we can merge this. |
Docs have been updated here: https://github.com/dapr/components-contrib/pull/3478/files#diff-337af9a04d4da7e737aeb7f517ae43a22a3125b4b65cebe94c6d7f39f3c3b06c where else would you document the properties? Also, tests are still failing but I don't know how this change is relevant to those failures, any hint I can use to diagnose the problem? |
Closes #3479