From 0a44b14e809fe04ebb9f9deef44ee233dda29f02 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 16 Dec 2024 21:19:13 +0100 Subject: [PATCH] De-flake TestJetStreamClusterConsumerAckSyncReporting Signed-off-by: Maurice van Veen --- server/jetstream_cluster_1_test.go | 59 ++++++++++++++++-------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 1676d508f0..cd1c15642e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6064,42 +6064,45 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) { last = m continue } - m.AckSync() + err = m.AckSync() + require_NoError(t, err) + } + require_NotNil(t, skipped) + require_NotNil(t, last) + + checkAckFloor := func(consumer, stream uint64) { + opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true} + checkFor(t, 3*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + jsz, err := s.Jsz(opts) + if err != nil { + return err + } + ci := jsz.AccountDetails[0].Streams[0].Consumer[0] + if ci.AckFloor.Consumer != consumer { + return fmt.Errorf("AckFloor.Consumer is not %d: %v", consumer, ci.AckFloor.Consumer) + } + if ci.AckFloor.Stream != stream { + return fmt.Errorf("AckFloor.Stream is not %d: %v", stream, ci.AckFloor.Stream) + } + } + return nil + }) } // Now we want to make sure that jsz reporting will show the same // state for ack floor. - c.waitOnAllCurrent() - opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true} - for _, s := range c.servers { - jsz, err := s.Jsz(opts) - require_NoError(t, err) - ci := jsz.AccountDetails[0].Streams[0].Consumer[0] - require_Equal(t, ci.AckFloor.Consumer, dontAck-1) - require_Equal(t, ci.AckFloor.Stream, dontAck-1) - } + checkAckFloor(dontAck-1, dontAck-1) // Now ack the skipped message - skipped.AckSync() - c.waitOnAllCurrent() - for _, s := range c.servers { - jsz, err := s.Jsz(opts) - require_NoError(t, err) - ci := jsz.AccountDetails[0].Streams[0].Consumer[0] - require_Equal(t, ci.AckFloor.Consumer, 9) - require_Equal(t, ci.AckFloor.Stream, 9) - } + err = skipped.AckSync() + require_NoError(t, err) + checkAckFloor(9, 9) // Now ack the last message - last.AckSync() - c.waitOnAllCurrent() - for _, s := range c.servers { - jsz, err := s.Jsz(opts) - require_NoError(t, err) - ci := jsz.AccountDetails[0].Streams[0].Consumer[0] - require_Equal(t, ci.AckFloor.Consumer, 20) - require_Equal(t, ci.AckFloor.Stream, 10) - } + err = last.AckSync() + require_NoError(t, err) + checkAckFloor(20, 10) } func TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers(t *testing.T) {