Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve consumer pending count tracking during stream contention #6297

Merged
merged 1 commit into from
Dec 23, 2024

Conversation

MauriceVanVeen
Copy link
Member

The drifting tests would occasionally fail due to the consumer pending count drifting. This was due to a race condition described on checkNumPending:

// Does some sanity checks to see if we should re-calculate.
// Since there is a race when decrementing when there is contention at the beginning of the stream.
// The race is a getNextMsg skips a deleted msg, and then the decStreamPending call fires.
// This does some quick sanity checks to see if we should re-calculate num pending.
// Lock should be held.
func (o *consumer) checkNumPending() uint64 {

This PR doesn't fix this race condition, but improves the tracking which improves test reliability. If the race condition happens we can still check if the deleted message is between o.asflr and the o.sseq that's skipped ahead. In which case we can still decrement the pending count (o.npc) if the message is not pending/delivered. This improves the reliability of the pending count tracking as long as the ack floor hasn't moved up yet.

Signed-off-by: Maurice van Veen [email protected]

@MauriceVanVeen MauriceVanVeen requested a review from a team as a code owner December 23, 2024 14:08
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@derekcollison derekcollison merged commit c543f53 into main Dec 23, 2024
5 checks passed
@derekcollison derekcollison deleted the maurice/improve-pending-count branch December 23, 2024 17:32
// Either we have not reached the message yet, or we've hit the race condition
// when there is contention at the beginning of the stream. In which case we can
// only decrement if the ack floor is still low enough to be able to detect it.
if o.isFilteredMatch(subj) && sseq > o.asflr && (sseq >= o.sseq || !wasPending) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: given that o.isFilteredMatch(subj) can be more expensive than the other checks, I would have put it last:

if sseq > o.asflr && (sseq >= o.sseq || !wasPending) && o.isFilteredMatch(subj) {

derekcollison added a commit that referenced this pull request Dec 24, 2024
Makes the change suggested in
#6297 (comment)

Signed-off-by: Maurice van Veen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants