Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Catchup must not extend past requested sequence range #6038

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 4 additions & 30 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8964,17 +8964,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// mset.store never changes after being set, don't need lock.
mset.store.FastState(&state)

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq
mset.setCatchupPeer(sreq.Peer, last-seq)
Expand Down Expand Up @@ -9138,25 +9127,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
select {
case <-remoteQuitCh:
Expand Down
20 changes: 16 additions & 4 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6720,12 +6720,24 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
return nil
})

// Make sure we only sent 1002 sync catchup msgs.
// This is for the new messages, the delete range, and the EOF.
// Make sure we only sent 2 sync catchup msgs.
// This is for the delete range, and the EOF.
nmsgs, _, _ := sub.Pending()
if nmsgs != 1002 {
t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
if nmsgs != 2 {
t.Fatalf("Expected only 2 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
}

msg, err := sub.NextMsg(0)
require_NoError(t, err)
mbuf := msg.Data[1:]
dr, err := decodeDeleteRange(mbuf)
require_NoError(t, err)
require_Equal(t, dr.First, 1001)
require_Equal(t, dr.Num, 1000)

msg, err = sub.NextMsg(0)
require_NoError(t, err)
require_Equal(t, len(msg.Data), 0)
}

func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) {
Expand Down