diff --git a/golangraft/consensus.go b/golangraft/consensus.go index 7e11d48..c24130f 100644 --- a/golangraft/consensus.go +++ b/golangraft/consensus.go @@ -13,12 +13,6 @@ const ( Leader ) -// Node represents a node in the Raft cluster. -type Node struct { - ID int - Addr string // Address of the node for communication -} - // LogEntry represents an entry in the Raft log. type LogEntry struct { Term int @@ -38,10 +32,10 @@ type Consensus struct { } // Assuming a list of all nodes in the cluster: -var nodes = []Node{ +var nodes = []*Node{ // TODO: Add nodes here - // Example: {ID: 1, Addr: "localhost:8001"}, - // {ID: 2, Addr: "localhost:8002"}, + // Example: &Node{ID: 1, Addr: "localhost:8001"}, + // &Node{ID: 2, Addr: "localhost:8002"}, } func NewConsensus() *Consensus { diff --git a/golangraft/node.go b/golangraft/node.go new file mode 100644 index 0000000..1d35b2c --- /dev/null +++ b/golangraft/node.go @@ -0,0 +1,81 @@ +package main + +import ( + "sync" + "time" +) + +const HeartbeatInterval = 100 * time.Millisecond + +type Node struct { + mu sync.Mutex + ID int + State int + Consensus + // Channel to signal stopping of heartbeat + stopHeartbeat chan bool +} + +func NewNode(id int) *Node { + return &Node{ + ID: id, + State: Follower, + Consensus: *NewConsensus(), + } +} + +func (n *Node) TransitionToCandidate() { + n.mu.Lock() + defer n.mu.Unlock() + + n.StartElection() + if n.State != Candidate { + n.State = Candidate + } +} + +func (n *Node) SendHeartbeat() { + // TODO: Implement sending of AppendEntries RPC to all followers. +} + +func (n *Node) StartHeartbeatLoop() { + ticker := time.NewTicker(HeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + n.SendHeartbeat() + // Exit the loop when a signal is received on stopHeartbeat channel + case <-n.stopHeartbeat: + return + } + } +} + +// Send a signal to stop the heartbeat loop +func (n *Node) StopHeartbeat() { + n.stopHeartbeat <- true +} + +func (n *Node) TransitionToLeader() { + n.mu.Lock() + defer n.mu.Unlock() + + n.becomeLeader() + if n.State != Leader { + n.State = Leader + n.stopHeartbeat = make(chan bool) // Initialize the channel + go n.StartHeartbeatLoop() // Start sending heartbeats + } +} + +func (n *Node) TransitionToFollower() { + n.mu.Lock() + defer n.mu.Unlock() + + if n.State == Leader { + n.StopHeartbeat() // Stop sending heartbeats if the node was previously a leader + } + n.State = Follower +} diff --git a/golangraft/node_test.go b/golangraft/node_test.go new file mode 100644 index 0000000..c651574 --- /dev/null +++ b/golangraft/node_test.go @@ -0,0 +1,37 @@ +package main + +import ( + "testing" +) + +func TestNewNode(t *testing.T) { + node := NewNode(1) + if node.ID != 1 { + t.Errorf("Expected node ID to be 1, got %v", node.ID) + } + if node.State != Follower { + t.Errorf("Expected initial node state to be Follower, got %v", node.State) + } +} + +func TestNodeTransitions(t *testing.T) { + node := NewNode(1) + + // Test transition to Candidate + node.TransitionToCandidate() + if node.State != Candidate { + t.Errorf("Expected node state to be Candidate after transition, got %v", node.State) + } + + // Test transition back to Follower + node.TransitionToFollower() + if node.State != Follower { + t.Errorf("Expected node state to be Follower after transition, got %v", node.State) + } + + // Test transition to Leader + node.TransitionToLeader() + if node.State != Leader { + t.Errorf("Expected node state to be Leader after transition, got %v", node.State) + } +}