diff --git a/server/norace_test.go b/server/norace_test.go index 81209dc127..eb534987cf 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10729,27 +10729,42 @@ func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *t nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: fmt.Sprintf("TEST:%d", n), - Storage: nats.MemoryStorage, - Retention: nats.WorkQueuePolicy, - Subjects: []string{fmt.Sprintf("foo.%d.*", n)}, - Replicas: 3, - }, nats.MaxWait(30*time.Second)) - require_NoError(t, err) + checkFor(t, 5*time.Second, time.Second, func() error { + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST:%d", n), + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Subjects: []string{fmt.Sprintf("foo.%d.*", n)}, + Replicas: 3, + }, nats.MaxWait(time.Second)) + return err + }) + subj := fmt.Sprintf("foo.%d.bar", n) for i := 0; i < 22; i++ { - js.Publish(subj, nil) + checkFor(t, 5*time.Second, time.Second, func() error { + _, err := js.Publish(subj, nil) + return err + }) } - // Now consumer them all as well. - sub, err := js.PullSubscribe(subj, "wq") - require_NoError(t, err) - msgs, err := sub.Fetch(22, nats.MaxWait(20*time.Second)) - require_NoError(t, err) + // Now consume them all as well. + var err error + var sub *nats.Subscription + checkFor(t, 5*time.Second, time.Second, func() error { + sub, err = js.PullSubscribe(subj, "wq") + return err + }) + + var msgs []*nats.Msg + checkFor(t, 5*time.Second, time.Second, func() error { + msgs, err = sub.Fetch(22, nats.MaxWait(time.Second)) + return err + }) require_Equal(t, len(msgs), 22) for _, m := range msgs { - err := m.AckSync() - require_NoError(t, err) + checkFor(t, 5*time.Second, time.Second, func() error { + return m.AckSync() + }) } }(i) }