Skip to content

Commit

Permalink
NRG: Add tests for correcting pterm with committed entries
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Oct 22, 2024
1 parent b2bb911 commit f9d2cc4
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 5 deletions.
17 changes: 13 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3391,8 +3391,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var success bool

if n.commit > 0 && ae.pindex <= n.commit {
// If we have already committed this entry, just mark success.
success = true
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
// If we have already committed this entry, just mark success.
success = true
}
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
Expand All @@ -3412,8 +3419,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(eae.pterm, eae.pindex)
}
// Cancel regardless.
n.cancelCatchup()
// Cancel regardless if unsuccessful.
if !success {
n.cancelCatchup()
}

// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
Expand Down
147 changes: 146 additions & 1 deletion server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
Expand Down Expand Up @@ -1308,3 +1308,148 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 3)
}

func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, first leader.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Timeline, leader changed, but pterm got set to term.
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Heartbeat from another leader, pterm got set to term, make sure to only up our pterm.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 2)
}

func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline.
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 0, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Heartbeat, triggers catchup.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex

// First catchup message has the incorrect pterm, stop catchup and re-trigger later with the correct pterm.
n.processAppendEntry(aeMsg, n.catchup.sub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pterm, 1)
require_Equal(t, n.pindex, 0)

// Heartbeat, triggers catchup.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 0) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex

// Now we get the message again and can continue to store it.
n.processAppendEntry(aeMsg, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Now heartbeat is able to commit the entry.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 1)
}

func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, first leader.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Timeline, leader changed, but pterm got set to term.
aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Heartbeat from another leader, we missed a message so we need catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.pindex

// We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pterm, 2)
require_Equal(t, n.pindex, 1)

// Heartbeat re-triggers catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 2) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.pindex

// Now we get the message again and can continue to store it.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 2)
require_True(t, n.catchup != nil)

// Heartbeat can now cancel catchup and move up our commit.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 2)
require_True(t, n.catchup == nil)

}

0 comments on commit f9d2cc4

Please sign in to comment.