diff --git a/server/raft.go b/server/raft.go index 6c77cb71fe9..6acbb6eee1e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -184,10 +184,8 @@ type raft struct { sq *sendq // Send queue for outbound RPC messages aesub *subscription // Subscription for handleAppendEntry callbacks - wtv []byte // Term and vote to be written - wps []byte // Peer state to be written - wtvch chan struct{} // Signals when a term vote was just written, to kick file writer - wpsch chan struct{} // Signals when a peer state was just written, to kick file writer + wtv []byte // Term and vote to be written + wps []byte // Peer state to be written catchup *catchupState // For when we need to catch up as a follower. progress map[string]*ipQueue[uint64] // For leader or server catching up a follower. @@ -379,8 +377,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe js: s.getJetStream(), sq: sq, quit: make(chan struct{}), - wtvch: make(chan struct{}, 1), - wpsch: make(chan struct{}, 1), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), prop: newIPQueue[*Entry](s, qpfx+"entry"), @@ -516,8 +512,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe // Start the run goroutine for the Raft state machine. s.startGoRoutine(n.run, labels) - // Start the filewriter. - s.startGoRoutine(n.fileWriter) return n, nil } @@ -3681,11 +3675,11 @@ func (n *raft) writePeerState(ps *peerState) { if bytes.Equal(n.wps, pse) { return } - // Stamp latest and kick writer. + // Stamp latest and write the peer state file. n.wps = pse - select { - case n.wpsch <- struct{}{}: - default: + if err := writePeerState(n.sd, ps); err != nil && !n.isClosed() { + n.setWriteErr(err) + n.warn("Error writing peer state file for %q: %v", n.group, err) } } @@ -3700,10 +3694,7 @@ func writePeerState(sd string, ps *peerState) error { err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms) dios <- struct{}{} - if err != nil { - return err - } - return nil + return err } func readPeerState(sd string) (ps *peerState, err error) { @@ -3720,6 +3711,20 @@ func readPeerState(sd string) (ps *peerState, err error) { const termVoteFile = "tav.idx" const termVoteLen = idLen + 8 +// Writes out our term & vote outside of a specific raft context. +func writeTermVote(sd string, wtv []byte) error { + psf := filepath.Join(sd, termVoteFile) + if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) { + return err + } + + <-dios + err := os.WriteFile(psf, wtv, defaultFilePerms) + dios <- struct{}{} + + return err +} + // readTermVote will read the largest term and who we voted from to stable storage. // Lock should be held. func (n *raft) readTermVote() (term uint64, voted string, err error) { @@ -3786,48 +3791,6 @@ func (n *raft) setWriteErr(err error) { n.setWriteErrLocked(err) } -func (n *raft) fileWriter() { - s := n.s - defer s.grWG.Done() - - n.RLock() - tvf := filepath.Join(n.sd, termVoteFile) - psf := filepath.Join(n.sd, peerStateFile) - n.RUnlock() - - for s.isRunning() { - select { - case <-n.quit: - return - case <-n.wtvch: - // We've been asked to write out the term-and-vote file. - var buf [termVoteLen]byte - n.RLock() - copy(buf[0:], n.wtv) - n.RUnlock() - <-dios - err := os.WriteFile(tvf, buf[:], defaultFilePerms) - dios <- struct{}{} - if err != nil && !n.isClosed() { - n.setWriteErr(err) - n.warn("Error writing term and vote file for %q: %v", n.group, err) - } - case <-n.wpsch: - // We've been asked to write out the peer state file. - n.RLock() - buf := copyBytes(n.wps) - n.RUnlock() - <-dios - err := os.WriteFile(psf, buf, defaultFilePerms) - dios <- struct{}{} - if err != nil && !n.isClosed() { - n.setWriteErr(err) - n.warn("Error writing peer state file for %q: %v", n.group, err) - } - } - } -} - // writeTermVote will record the largest term and who we voted for to stable storage. // Lock should be held. func (n *raft) writeTermVote() { @@ -3841,11 +3804,11 @@ func (n *raft) writeTermVote() { if bytes.Equal(n.wtv, b) { return } - // Stamp latest and kick writer. + // Stamp latest and write the term & vote file. n.wtv = b - select { - case n.wtvch <- struct{}{}: - default: + if err := writeTermVote(n.sd, n.wtv); err != nil && !n.isClosed() { + n.setWriteErr(err) + n.warn("Error writing term and vote file for %q: %v", n.group, err) } }