From 900fd350e0302f7995e05c9f7084012ee4c921cc Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 14:31:04 +0100 Subject: [PATCH] [FIXED] Respect consumer's starting seq, even if in the future Signed-off-by: Maurice van Veen --- server/consumer.go | 12 ++++++-- server/jetstream_cluster_1_test.go | 45 ++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 552f7f457b..b607deff00 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5366,12 +5366,18 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = o.cfg.OptStartSeq } - if state.FirstSeq == 0 { + if state.FirstSeq == 0 && (o.cfg.Direct || o.cfg.OptStartSeq == 0) { + // If the stream is empty, deliver only new. + // But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value. o.sseq = 1 + } else if o.sseq > state.LastSeq && (o.cfg.Direct || o.cfg.OptStartSeq == 0) { + // If selected sequence is in the future, clamp back down. + // But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value. + o.sseq = state.LastSeq + 1 } else if o.sseq < state.FirstSeq { + // If the first sequence is further ahead than the starting sequence, + // there are no messages there anymore, so move the sequence up. o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index cd1c15642e..d3f6a4a8c5 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6945,6 +6945,51 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) { require_NoError(t, err) } +func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Create replicated stream. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // We could have published messages into the stream that have not yet been applied on the follower. + // If we create a consumer with a starting sequence in the future, we must respect it. + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverPolicy: nats.DeliverByStartSequencePolicy, + OptStartSeq: 20, + }) + require_NoError(t, err) + require_Equal(t, ci.Delivered.Stream, 19) + + // Same thing if the first sequence is not 0. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10}) + require_NoError(t, err) + + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverPolicy: nats.DeliverByStartSequencePolicy, + OptStartSeq: 20, + }) + require_NoError(t, err) + require_Equal(t, ci.Delivered.Stream, 19) + + // Only if we're requested to start at a sequence that's not available anymore + // can we safely move it up. That data is gone already, so can't do anything else. + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + DeliverPolicy: nats.DeliverByStartSequencePolicy, + OptStartSeq: 5, + }) + require_NoError(t, err) + require_Equal(t, ci.Delivered.Stream, 9) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value.