From 98ef43a9f7041e73ec5e1323ac9afb01ed9bb575 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 24 Oct 2024 13:54:14 +0200 Subject: [PATCH] [FIXED] Catchup must not extend past requested sequence range Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 34 ++++-------------------------- server/jetstream_cluster_2_test.go | 20 ++++++++++++++---- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b473a9fb505..b7b908360a0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8964,17 +8964,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // mset.store never changes after being set, don't need lock. mset.store.FastState(&state) - // Reset notion of first if this request wants sequences before our starting sequence - // and we would have nothing to send. If we have partial messages still need to send skips for those. - // We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state. - if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq { - s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d", - mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq) - if state.LastSeq > sreq.LastSeq { - sreq.LastSeq = state.LastSeq - } - } - // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq mset.setCatchupPeer(sreq.Peer, last-seq) @@ -9138,25 +9127,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - // Check for a condition where our state's first is now past the last that we could have sent. - // If so reset last and continue sending. - var state StreamState - mset.mu.RLock() - mset.store.FastState(&state) - mset.mu.RUnlock() - if last < state.FirstSeq { - last = state.LastSeq - } - // Recheck our exit condition. - if seq == last { - if drOk && dr.First > 0 { - sendDR() - } - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) - // EOF - s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) - return false - } + s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + // EOF + s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) + return false } select { case <-remoteQuitCh: diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 42c9eee6689..338e58e9f5a 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6720,12 +6720,24 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { return nil }) - // Make sure we only sent 1002 sync catchup msgs. - // This is for the new messages, the delete range, and the EOF. + // Make sure we only sent 2 sync catchup msgs. + // This is for the delete range, and the EOF. nmsgs, _, _ := sub.Pending() - if nmsgs != 1002 { - t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs) + if nmsgs != 2 { + t.Fatalf("Expected only 2 sync catchup msgs to be sent signaling eof, but got %d", nmsgs) } + + msg, err := sub.NextMsg(0) + require_NoError(t, err) + mbuf := msg.Data[1:] + dr, err := decodeDeleteRange(mbuf) + require_NoError(t, err) + require_Equal(t, dr.First, 1001) + require_Equal(t, dr.Num, 1000) + + msg, err = sub.NextMsg(0) + require_NoError(t, err) + require_Equal(t, len(msg.Data), 0) } func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) {