Skip to content

Commit

Permalink
NRG: Write peer state/term and vote files inline (#5151)
Browse files Browse the repository at this point in the history
This should avoid some potential race conditions from scheduling
persistence of the peer state and term and vote files asynchronously. It
also gets rid of another goroutine per Raft group which is a nice side
effect.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Mar 1, 2024
2 parents 8c6d0f9 + eb81c93 commit 812982c
Showing 1 changed file with 25 additions and 62 deletions.
87 changes: 25 additions & 62 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,8 @@ type raft struct {
sq *sendq // Send queue for outbound RPC messages
aesub *subscription // Subscription for handleAppendEntry callbacks

wtv []byte // Term and vote to be written
wps []byte // Peer state to be written
wtvch chan struct{} // Signals when a term vote was just written, to kick file writer
wpsch chan struct{} // Signals when a peer state was just written, to kick file writer
wtv []byte // Term and vote to be written
wps []byte // Peer state to be written

catchup *catchupState // For when we need to catch up as a follower.
progress map[string]*ipQueue[uint64] // For leader or server catching up a follower.
Expand Down Expand Up @@ -379,8 +377,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
js: s.getJetStream(),
sq: sq,
quit: make(chan struct{}),
wtvch: make(chan struct{}, 1),
wpsch: make(chan struct{}, 1),
reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"),
votes: newIPQueue[*voteResponse](s, qpfx+"vresp"),
prop: newIPQueue[*Entry](s, qpfx+"entry"),
Expand Down Expand Up @@ -516,8 +512,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe

// Start the run goroutine for the Raft state machine.
s.startGoRoutine(n.run, labels)
// Start the filewriter.
s.startGoRoutine(n.fileWriter)

return n, nil
}
Expand Down Expand Up @@ -3681,11 +3675,11 @@ func (n *raft) writePeerState(ps *peerState) {
if bytes.Equal(n.wps, pse) {
return
}
// Stamp latest and kick writer.
// Stamp latest and write the peer state file.
n.wps = pse
select {
case n.wpsch <- struct{}{}:
default:
if err := writePeerState(n.sd, ps); err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing peer state file for %q: %v", n.group, err)
}
}

Expand All @@ -3700,10 +3694,7 @@ func writePeerState(sd string, ps *peerState) error {
err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms)
dios <- struct{}{}

if err != nil {
return err
}
return nil
return err
}

func readPeerState(sd string) (ps *peerState, err error) {
Expand All @@ -3720,6 +3711,20 @@ func readPeerState(sd string) (ps *peerState, err error) {
const termVoteFile = "tav.idx"
const termVoteLen = idLen + 8

// Writes out our term & vote outside of a specific raft context.
func writeTermVote(sd string, wtv []byte) error {
psf := filepath.Join(sd, termVoteFile)
if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) {
return err
}

<-dios
err := os.WriteFile(psf, wtv, defaultFilePerms)
dios <- struct{}{}

return err
}

// readTermVote will read the largest term and who we voted from to stable storage.
// Lock should be held.
func (n *raft) readTermVote() (term uint64, voted string, err error) {
Expand Down Expand Up @@ -3786,48 +3791,6 @@ func (n *raft) setWriteErr(err error) {
n.setWriteErrLocked(err)
}

func (n *raft) fileWriter() {
s := n.s
defer s.grWG.Done()

n.RLock()
tvf := filepath.Join(n.sd, termVoteFile)
psf := filepath.Join(n.sd, peerStateFile)
n.RUnlock()

for s.isRunning() {
select {
case <-n.quit:
return
case <-n.wtvch:
// We've been asked to write out the term-and-vote file.
var buf [termVoteLen]byte
n.RLock()
copy(buf[0:], n.wtv)
n.RUnlock()
<-dios
err := os.WriteFile(tvf, buf[:], defaultFilePerms)
dios <- struct{}{}
if err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
case <-n.wpsch:
// We've been asked to write out the peer state file.
n.RLock()
buf := copyBytes(n.wps)
n.RUnlock()
<-dios
err := os.WriteFile(psf, buf, defaultFilePerms)
dios <- struct{}{}
if err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing peer state file for %q: %v", n.group, err)
}
}
}
}

// writeTermVote will record the largest term and who we voted for to stable storage.
// Lock should be held.
func (n *raft) writeTermVote() {
Expand All @@ -3841,11 +3804,11 @@ func (n *raft) writeTermVote() {
if bytes.Equal(n.wtv, b) {
return
}
// Stamp latest and kick writer.
// Stamp latest and write the term & vote file.
n.wtv = b
select {
case n.wtvch <- struct{}{}:
default:
if err := writeTermVote(n.sd, n.wtv); err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
}

Expand Down

0 comments on commit 812982c

Please sign in to comment.