-
Notifications
You must be signed in to change notification settings - Fork 0
/
logs.go
110 lines (93 loc) · 2.33 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package raftgo
import (
"sync"
)
type LogEntry struct {
LogIndex int
LogTerm int
Command string
}
type Logs struct {
store Storage
stateMachine StateMachine
commitIndex int
lastApplied int
mu sync.RWMutex
}
func NewLogs(store Storage, stateMachine StateMachine) *Logs {
return &Logs{
store: store,
stateMachine: stateMachine,
}
}
func (l *Logs) GetCommitIndex() int {
l.mu.RLock()
defer l.mu.RUnlock()
return l.commitIndex
}
func (l *Logs) SetCommitIndex(index int) {
l.mu.Lock()
defer l.mu.Unlock()
l.commitIndex = index
for l.commitIndex > l.lastApplied {
l.lastApplied++
command := []byte(l.Entry(l.lastApplied).Command)
l.stateMachine.Apply(command)
}
}
func (l *Logs) Entry(logIndex int) *LogEntry {
return l.store.Entry(logIndex)
}
func (l *Logs) LastIndex() int {
return l.store.LastIndex()
}
func (l *Logs) Last() *LogEntry {
return l.store.Entry(l.store.LastIndex())
}
func (l *Logs) Append(command string, term int) int {
logIndex := l.LastIndex() + 1
l.store.TruncateAndAppend([]LogEntry{{LogIndex: logIndex, LogTerm: term, Command: command}})
return logIndex
}
func (l *Logs) AppendEntries(prevLogIndex int, prevLogTerm int, entries []LogEntry, leaderCommit int) bool {
entry := l.Entry(prevLogIndex)
if entry == nil || entry.LogTerm != prevLogTerm {
return false
}
if len(entries) > 0 {
entries = entries[l.findConflict(entries)-entries[0].LogIndex:]
l.store.TruncateAndAppend(entries)
}
if leaderCommit > l.GetCommitIndex() {
l.SetCommitIndex(min(leaderCommit, prevLogIndex+len(entries)))
}
return true
}
func (l *Logs) BatchEntries(startLogIndex int, maxLen int) (prevLogIndex int, prevLogTerm int, entries []LogEntry) {
if maxLen == -1 {
maxLen = 100
}
entries = l.store.BatchEntries(startLogIndex-1, maxLen+1)
prevLog := entries[0]
entries = entries[1:]
return prevLog.LogIndex, prevLog.LogTerm, entries
}
func (l *Logs) IsUpToDate(logIndex int, logTerm int) bool {
last := l.Last()
return logTerm > last.LogTerm || (last.LogTerm == logTerm && logIndex >= last.LogIndex)
}
func (l *Logs) findConflict(entries []LogEntry) int {
for _, e := range entries {
entry := l.Entry(e.LogIndex)
if entry == nil || entry.LogTerm != e.LogTerm {
return e.LogIndex
}
}
return entries[0].LogIndex
}
func min(a, b int) int {
if a < b {
return a
}
return b
}