diff --git a/server/raft.go b/server/raft.go index 6df4fcb63c..9fbb76fefb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3151,10 +3151,10 @@ func (n *raft) catchupStalled() bool { if n.catchup == nil { return false } - if n.catchup.pindex == n.commit { + if n.catchup.pindex == n.pindex { return time.Since(n.catchup.active) > 2*time.Second } - n.catchup.pindex = n.commit + n.catchup.pindex = n.pindex n.catchup.active = time.Now() return false } @@ -3173,7 +3173,7 @@ func (n *raft) createCatchup(ae *appendEntry) string { cterm: ae.pterm, cindex: ae.pindex, pterm: n.pterm, - pindex: n.commit, + pindex: n.pindex, active: time.Now(), } inbox := n.newCatchupInbox() @@ -3343,7 +3343,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") inbox = n.createCatchup(ae) - ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false) + ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false) } n.Unlock() if ar != nil { @@ -3383,32 +3383,43 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } - if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { - // Check if this is a lower index than what we were expecting. - if ae.pindex < n.pindex { - n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) + if ae.pterm != n.pterm || ae.pindex != n.pindex { + // Check if this is a lower or equal index than what we were expecting. + if ae.pindex <= n.pindex { + n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse - - // An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex. - seq := ae.pindex + 1 var success bool - if eae, _ := n.loadEntry(seq); eae == nil { + + if n.commit > 0 && ae.pindex <= n.commit { + // Check if only our terms do not match here. + if ae.pindex == n.pindex { + // Make sure pterms match and we take on the leader's. + // This prevents constant spinning. + n.truncateWAL(ae.pterm, ae.pindex) + } else { + // If we have already committed this entry, just mark success. + success = true + } + } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { success = true + } else if ae.pindex == n.pindex { + // Check if only our terms do not match here. + // Make sure pterms match and we take on the leader's. + // This prevents constant spinning. + n.truncateWAL(ae.pterm, ae.pindex) } else { n.resetWAL() } - } else if eae.term != ae.term { + } else { // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(eae.pterm, eae.pindex) - } else { - success = true } - // Cancel regardless if truncated/unsuccessful. + // Cancel regardless if unsuccessful. if !success { n.cancelCatchup() } @@ -3434,16 +3445,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } - // Check if only our terms do not match here. - if ae.pindex == n.pindex { - // Make sure pterms match and we take on the leader's. - // This prevents constant spinning. - n.truncateWAL(ae.pterm, ae.pindex) - n.cancelCatchup() - n.Unlock() - return - } - if ps, err := decodePeerState(ae.entries[1].Data); err == nil { n.processPeerState(ps) // Also need to copy from client's buffer. @@ -3483,19 +3484,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) n.Unlock() return - - } else { - n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit) - if ae.pindex > n.commit { - // Setup our state for catching up. - inbox := n.createCatchup(ae) - ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false) - n.Unlock() - n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) - arPool.Put(ar) - return - } } + + // Setup our state for catching up. + n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) + inbox := n.createCatchup(ae) + ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + n.Unlock() + n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) + arPool.Put(ar) + return } // Save to our WAL if we have entries. diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 127a3642c4..837b434a7c 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -317,3 +317,34 @@ func (rg smGroup) waitOnTotal(t *testing.T, expected int64) { func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine { return &stateAdder{s: s, n: n, cfg: cfg, lch: make(chan bool, 1)} } + +func initSingleMemRaftNode(t *testing.T) (*raft, func()) { + t.Helper() + c := createJetStreamClusterExplicit(t, "R3S", 3) + s := c.servers[0] // RunBasicJetStreamServer not available + + ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) + require_NoError(t, err) + cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms} + + err = s.bootstrapRaftNode(cfg, nil, false) + require_NoError(t, err) + n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + cleanup := func() { + c.shutdown() + } + return n, cleanup +} + +// Encode an AppendEntry. +// An AppendEntry is encoded into a buffer and that's stored into the WAL. +// This is a helper function to generate that buffer. +func encode(t *testing.T, ae *appendEntry) *appendEntry { + t.Helper() + buf, err := ae.encode(nil) + require_NoError(t, err) + ae.buf = buf + return ae +} diff --git a/server/raft_test.go b/server/raft_test.go index 1ec2193a14..c0e209c3ad 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1079,27 +1079,8 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { } func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - s := c.servers[0] // RunBasicJetStreamServer not available - - ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) - require_NoError(t, err) - cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms} - - err = s.bootstrapRaftNode(cfg, nil, false) - require_NoError(t, err) - n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) - require_NoError(t, err) - - // An AppendEntry is encoded into a buffer and that's stored into the WAL. - // This is a helper function to generate that buffer. - encode := func(ae *appendEntry) *appendEntry { - buf, err := ae.encode(nil) - require_NoError(t, err) - ae.buf = buf - return ae - } + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() // Create a sample entry, the content doesn't matter, just that it's stored. esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) @@ -1109,16 +1090,16 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { nats1 := "yrzKKRBu" // "nats-1" // Timeline, for first leader - aeInitial := encode(&appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeat := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) - aeUncommitted := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) - aeNoQuorum := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeInitial := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeUncommitted := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeNoQuorum := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) // Timeline, after leader change - aeMissed := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 2, entries: entries}) - aeCatchupTrigger := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) - aeHeartbeat2 := encode(&appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) - aeHeartbeat3 := encode(&appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) + aeMissed := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeCatchupTrigger := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) + aeHeartbeat3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) // Initial case is simple, just store the entry. n.processAppendEntry(aeInitial, n.aesub) @@ -1145,55 +1126,125 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { require_NoError(t, err) require_Equal(t, entry.leader, nats0) - // We've just had a leader election, and we missed one message from the previous leader, we should catchup. + // We've just had a leader election, and we missed one message from the previous leader. + // We should truncate the last message. n.processAppendEntry(aeCatchupTrigger, n.aesub) - require_Equal(t, n.wal.State().Msgs, 3) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 1) // n.pterm - require_Equal(t, n.catchup.pindex, 1) // n.commit + require_Equal(t, n.catchup.pindex, 2) // n.pindex - // Make sure our WAL was not truncated, and we're still catching up. - aeUncommitted.leader = nats1 - aeUncommitted = encode(aeUncommitted) - n.processAppendEntry(aeUncommitted, n.catchup.sub) + // We now notice the leader indicated a different entry at the (no quorum) index, should save that. + n.processAppendEntry(aeMissed, n.catchup.sub) require_Equal(t, n.wal.State().Msgs, 3) require_True(t, n.catchup != nil) - // Our entry should not be touched, so the 'leader' should've stayed the same. + + // We now get the entry that initially triggered us to catchup, it should be added. + n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 4) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(4) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). + n.processAppendEntry(aeHeartbeat3, n.aesub) + require_Equal(t, n.commit, 4) + require_True(t, n.catchup == nil) +} + +func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, for first leader + aeInitial := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeNoQuorum1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeNoQuorum2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Timeline, after leader change + aeMissed1 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMissed2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 2, entries: entries}) + aeCatchupTrigger := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) + aeHeartbeat3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeInitial, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // We get one entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) entry, err = n.loadEntry(2) require_NoError(t, err) require_Equal(t, entry.leader, nats0) - // We now notice the leader indicated a different entry at the (no quorum) index, should truncate. - n.processAppendEntry(aeMissed, n.catchup.sub) + // We get another entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We've just had a leader election, and we missed messages from the previous leader. + // We should truncate the last message. + n.processAppendEntry(aeCatchupTrigger, n.aesub) require_Equal(t, n.wal.State().Msgs, 2) require_True(t, n.catchup == nil) - // We get a heartbeat that prompts us to catchup again. + // We get a heartbeat that prompts us to catchup. n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.wal.State().Msgs, 2) require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. require_True(t, n.catchup != nil) require_Equal(t, n.catchup.pterm, 1) // n.pterm - require_Equal(t, n.catchup.pindex, 1) // n.commit + require_Equal(t, n.catchup.pindex, 2) // n.pindex + + // We now notice the leader indicated a different entry at the (no quorum) index. We should truncate again. + n.processAppendEntry(aeMissed2, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 1) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.pindex - // We get the uncommitted entry again, it should stay the same. - n.processAppendEntry(aeUncommitted, n.catchup.sub) + // We now get caught up with the missed messages. + n.processAppendEntry(aeMissed1, n.catchup.sub) require_Equal(t, n.wal.State().Msgs, 2) require_True(t, n.catchup != nil) - // Our entry should still stay the same. - entry, err = n.loadEntry(2) - require_NoError(t, err) - require_Equal(t, entry.leader, nats0) - // We now get the missed append entry, store it. - n.processAppendEntry(aeMissed, n.catchup.sub) + n.processAppendEntry(aeMissed2, n.catchup.sub) require_Equal(t, n.wal.State().Msgs, 3) require_True(t, n.catchup != nil) - entry, err = n.loadEntry(3) - require_NoError(t, err) - require_Equal(t, entry.leader, nats1) - // We now get the entry that initially triggered us to catchup again, it should be added. + // We now get the entry that initially triggered us to catchup, it should be added. n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) require_Equal(t, n.wal.State().Msgs, 4) require_True(t, n.catchup != nil) @@ -1206,3 +1257,199 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { require_Equal(t, n.commit, 4) require_True(t, n.catchup == nil) } + +func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 1) + + // Deliver a message. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Deliver another message. + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.commit, 1) + + // Heartbeat, makes sure we commit. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 3) +} + +func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, first leader. + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Timeline, leader changed, but pterm got set to term. + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 1) + + // Heartbeat from another leader, pterm got set to term, make sure to only up our pterm. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 2) +} + +func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Heartbeat, triggers catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // First catchup message has the incorrect pterm, stop catchup and re-trigger later with the correct pterm. + n.processAppendEntry(aeMsg, n.catchup.sub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pterm, 1) + require_Equal(t, n.pindex, 0) + + // Heartbeat, triggers catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Now we get the message again and can continue to store it. + n.processAppendEntry(aeMsg, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Now heartbeat is able to commit the entry. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) +} + +func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, first leader. + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Timeline, leader changed, but pterm got set to term. + aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 1) + + // Heartbeat from another leader, we missed a message so we need catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.pindex + + // We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pterm, 2) + require_Equal(t, n.pindex, 1) + + // Heartbeat re-triggers catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 2) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.pindex + + // Now we get the message again and can continue to store it. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup != nil) + + // Heartbeat can now cancel catchup and move up our commit. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 2) + require_True(t, n.catchup == nil) + +}