Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Truncate entries without quorum from different pterm #6027

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 36 additions & 38 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3151,10 +3151,10 @@ func (n *raft) catchupStalled() bool {
if n.catchup == nil {
return false
}
if n.catchup.pindex == n.commit {
if n.catchup.pindex == n.pindex {
return time.Since(n.catchup.active) > 2*time.Second
}
n.catchup.pindex = n.commit
n.catchup.pindex = n.pindex
n.catchup.active = time.Now()
return false
}
Expand All @@ -3173,7 +3173,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
cterm: ae.pterm,
cindex: ae.pindex,
pterm: n.pterm,
pindex: n.commit,
pindex: n.pindex,
active: time.Now(),
}
inbox := n.newCatchupInbox()
Expand Down Expand Up @@ -3343,7 +3343,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false)
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
}
n.Unlock()
if ar != nil {
Expand Down Expand Up @@ -3383,32 +3383,43 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.updateLeadChange(false)
}

if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing isNew makes sure we can also detect differences if we're catching up, otherwise we could wrongly assume our own state is correct.

Specifically covered by the test TestNRGWALEntryWithoutQuorumMustTruncate/diverged

// Check if this is a lower index than what we were expecting.
if ae.pindex < n.pindex {
n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
if ae.pterm != n.pterm || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse

// An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex.
seq := ae.pindex + 1
var success bool
if eae, _ := n.loadEntry(seq); eae == nil {

if n.commit > 0 && ae.pindex <= n.commit {
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
// If we have already committed this entry, just mark success.
success = true
}
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
} else if ae.pindex == n.pindex {
mprimi marked this conversation as resolved.
Show resolved Hide resolved
// Check if only our terms do not match here.
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
n.resetWAL()
}
} else if eae.term != ae.term {
} else {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// If terms mismatched, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(eae.pterm, eae.pindex)
} else {
success = true
}
// Cancel regardless if truncated/unsuccessful.
// Cancel regardless if unsuccessful.
if !success {
n.cancelCatchup()
}
Expand All @@ -3434,16 +3445,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return
}

// Check if only our terms do not match here.
if ae.pindex == n.pindex {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition could never be hit before, due to the condition above:

// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {

Now moved up so the pterm can be corrected, otherwise we could spin.

Already covered by the test TestJetStreamClusterStreamCatchupNoState. Previously it used a different way to pass, likely truncating a bit too much and eventually converging. Now it only corrects the pterm as truncating is not needed.

// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
n.cancelCatchup()
n.Unlock()
return
}

if ps, err := decodePeerState(ae.entries[1].Data); err == nil {
n.processPeerState(ps)
// Also need to copy from client's buffer.
Expand Down Expand Up @@ -3483,19 +3484,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.Unlock()
return

} else {
n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit)
if ae.pindex > n.commit {
// Setup our state for catching up.
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
return
}
}

// Setup our state for catching up.
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
return
}

// Save to our WAL if we have entries.
Expand Down
31 changes: 31 additions & 0 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,34 @@ func (rg smGroup) waitOnTotal(t *testing.T, expected int64) {
func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine {
return &stateAdder{s: s, n: n, cfg: cfg, lch: make(chan bool, 1)}
}

func initSingleMemRaftNode(t *testing.T) (*raft, func()) {
t.Helper()
c := createJetStreamClusterExplicit(t, "R3S", 3)
s := c.servers[0] // RunBasicJetStreamServer not available

ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage})
require_NoError(t, err)
cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms}

err = s.bootstrapRaftNode(cfg, nil, false)
require_NoError(t, err)
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

cleanup := func() {
c.shutdown()
}
return n, cleanup
}

// Encode an AppendEntry.
// An AppendEntry is encoded into a buffer and that's stored into the WAL.
mprimi marked this conversation as resolved.
Show resolved Hide resolved
// This is a helper function to generate that buffer.
func encode(t *testing.T, ae *appendEntry) *appendEntry {
t.Helper()
buf, err := ae.encode(nil)
require_NoError(t, err)
ae.buf = buf
return ae
}
Loading