diff --git a/peer.go b/peer.go index abc636f..ce6d6a3 100644 --- a/peer.go +++ b/peer.go @@ -1,6 +1,7 @@ package raft import ( + "math" "sync" "time" ) @@ -20,6 +21,8 @@ type Peer struct { mutex sync.RWMutex stopChan chan bool heartbeatInterval time.Duration + heartbeatTicker <-chan time.Time + failedHeartbeats float64 } //------------------------------------------------------------------------------ @@ -90,6 +93,24 @@ func (p *Peer) stopHeartbeat(flush bool) { p.stopChan <- flush } +// Resets the ticker to the default heartbeat interval. +func (p *Peer) resetHeartbeatInterval() { + p.failedHeartbeats = 0 + p.heartbeatTicker = time.Tick(p.heartbeatInterval) + debugln("peer.heartbeat.reset: ", p.Name, p.heartbeatInterval) +} + +// Increases failed heartbeat count and updates ticker with updated backoff interval +func (p *Peer) backoffHeartbeat() { + p.failedHeartbeats++ + debugln("peer.heartbeat.backoff: ", p.Name, p.failedHeartbeats, p.heartbeatDuration()) + p.heartbeatTicker = time.Tick(p.heartbeatDuration()) +} + +func (p *Peer) heartbeatDuration() time.Duration { + return time.Duration(p.heartbeatInterval * time.Duration(math.Pow(float64(2), p.failedHeartbeats))) +} + //-------------------------------------- // Copying //-------------------------------------- @@ -116,7 +137,7 @@ func (p *Peer) heartbeat(c chan bool) { c <- true - ticker := time.Tick(p.heartbeatInterval) + p.heartbeatTicker = time.Tick(p.heartbeatInterval) debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval) @@ -134,7 +155,7 @@ func (p *Peer) heartbeat(c chan bool) { return } - case <-ticker: + case <-p.heartbeatTicker: start := time.Now() p.flush() duration := time.Now().Sub(start) @@ -168,10 +189,16 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req) if resp == nil { + p.backoffHeartbeat() p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil)) debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name) return } + + // Reset the failedHeartbeats if it isn't already 0. + if p.failedHeartbeats != 0 { + p.resetHeartbeatInterval() + } traceln("peer.append.resp: ", p.server.Name(), "<-", p.Name) // If successful then update the previous log index. diff --git a/peer_test.go b/peer_test.go new file mode 100644 index 0000000..4950ea1 --- /dev/null +++ b/peer_test.go @@ -0,0 +1,40 @@ +package raft + +import ( + "testing" + "time" +) + +func TestHeartbeatDuration(t *testing.T) { + heartbeatInterval := time.Duration(20) + p := newPeer(nil, "testpeer", "testpeer", heartbeatInterval) + if p.failedHeartbeats != 0 { + t.Error("Failed heartbeat counter's default is not 0.") + } + + if p.heartbeatDuration() != heartbeatInterval { + t.Errorf("Unexpected heartbeat duration. Expected %d, got %d", heartbeatInterval, p.heartbeatDuration()) + } + + p.backoffHeartbeat() + + if p.failedHeartbeats != 1 { + t.Errorf("Failed heartbeat counter did not increment. Expected %d, got %d", 1, p.failedHeartbeats) + } + + firstBackoff := time.Duration(heartbeatInterval * 2) + if p.heartbeatDuration() != firstBackoff { + t.Errorf("Unexpected heartbeat duration. Expected %d, got %d", firstBackoff, p.heartbeatDuration()) + } + + p.backoffHeartbeat() + secondBackoff := time.Duration(heartbeatInterval * 4) + if p.heartbeatDuration() != secondBackoff { + t.Errorf("Unexpected heartbeat duration. Expected %d, got %d", secondBackoff, p.heartbeatDuration()) + } + + p.resetHeartbeatInterval() + if p.heartbeatDuration() != heartbeatInterval { + t.Errorf("Unexpected heartbeat interval. Expected %d, got %d", heartbeatInterval, p.heartbeatDuration()) + } +}