Skip to content

Commit

Permalink
De-flake TestJetStreamClusterConsumerAckSyncReporting (#6269)
Browse files Browse the repository at this point in the history
De-flakes the test by using `checkFor`.

`AckSync` only guarantees the ack was received, had quorum and is
applied on the leader. It does not provided immediate consistency on all
followers, so it might need some retries. Previously
`c.waitOnAllCurrent()` was added but that only checks meta is current,
not streams or consumers, so it would only wait for 100 milliseconds per
server.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Dec 16, 2024
2 parents 98444c3 + 0a44b14 commit e425a9d
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6064,42 +6064,45 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {
last = m
continue
}
m.AckSync()
err = m.AckSync()
require_NoError(t, err)
}
require_NotNil(t, skipped)
require_NotNil(t, last)

checkAckFloor := func(consumer, stream uint64) {
opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
checkFor(t, 3*time.Second, 200*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
if err != nil {
return err
}
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
if ci.AckFloor.Consumer != consumer {
return fmt.Errorf("AckFloor.Consumer is not %d: %v", consumer, ci.AckFloor.Consumer)
}
if ci.AckFloor.Stream != stream {
return fmt.Errorf("AckFloor.Stream is not %d: %v", stream, ci.AckFloor.Stream)
}
}
return nil
})
}

// Now we want to make sure that jsz reporting will show the same
// state for ack floor.
c.waitOnAllCurrent()
opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.AckFloor.Consumer, dontAck-1)
require_Equal(t, ci.AckFloor.Stream, dontAck-1)
}
checkAckFloor(dontAck-1, dontAck-1)

// Now ack the skipped message
skipped.AckSync()
c.waitOnAllCurrent()
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.AckFloor.Consumer, 9)
require_Equal(t, ci.AckFloor.Stream, 9)
}
err = skipped.AckSync()
require_NoError(t, err)
checkAckFloor(9, 9)

// Now ack the last message
last.AckSync()
c.waitOnAllCurrent()
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.AckFloor.Consumer, 20)
require_Equal(t, ci.AckFloor.Stream, 10)
}
err = last.AckSync()
require_NoError(t, err)
checkAckFloor(20, 10)
}

func TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers(t *testing.T) {
Expand Down

0 comments on commit e425a9d

Please sign in to comment.