diff --git a/server/raft.go b/server/raft.go index b78d2161e4..9fbb76fefb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3391,8 +3391,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var success bool if n.commit > 0 && ae.pindex <= n.commit { - // If we have already committed this entry, just mark success. - success = true + // Check if only our terms do not match here. + if ae.pindex == n.pindex { + // Make sure pterms match and we take on the leader's. + // This prevents constant spinning. + n.truncateWAL(ae.pterm, ae.pindex) + } else { + // If we have already committed this entry, just mark success. + success = true + } } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. @@ -3412,8 +3419,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(eae.pterm, eae.pindex) } - // Cancel regardless. - n.cancelCatchup() + // Cancel regardless if unsuccessful. + if !success { + n.cancelCatchup() + } // Create response. ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) diff --git a/server/raft_test.go b/server/raft_test.go index ae6c5c5dff..c0e209c3ad 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1273,7 +1273,7 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) - aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Initial case is simple, just store the entry. n.processAppendEntry(aeMsg1, n.aesub) @@ -1308,3 +1308,148 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.commit, 3) } + +func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, first leader. + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Timeline, leader changed, but pterm got set to term. + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 1) + + // Heartbeat from another leader, pterm got set to term, make sure to only up our pterm. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 2) +} + +func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Heartbeat, triggers catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // First catchup message has the incorrect pterm, stop catchup and re-trigger later with the correct pterm. + n.processAppendEntry(aeMsg, n.catchup.sub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pterm, 1) + require_Equal(t, n.pindex, 0) + + // Heartbeat, triggers catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Now we get the message again and can continue to store it. + n.processAppendEntry(aeMsg, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Now heartbeat is able to commit the entry. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) +} + +func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, first leader. + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Timeline, leader changed, but pterm got set to term. + aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 1) + + // Heartbeat from another leader, we missed a message so we need catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.pindex + + // We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pterm, 2) + require_Equal(t, n.pindex, 1) + + // Heartbeat re-triggers catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 2) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.pindex + + // Now we get the message again and can continue to store it. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup != nil) + + // Heartbeat can now cancel catchup and move up our commit. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.commit, 2) + require_True(t, n.catchup == nil) + +}