-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnode.go
93 lines (77 loc) · 2.4 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package rafting
import (
"context"
"encoding/json"
"fmt"
"time"
transport "github.com/Jille/raft-grpc-transport"
"github.com/danielgatis/go-discovery"
"github.com/danielgatis/go-keyval"
"github.com/hashicorp/raft"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
type Node struct {
id string
port int
snapshotRetain int
dataDir string
addr string
marshalFn func(interface{}) ([]byte, error)
unmarshalFn func([]byte, interface{}) error
fsm raft.FSM
raft *raft.Raft
raftTransport *transport.Manager
discovery discovery.Discovery
discoveryLookupInterval time.Duration
logger logrus.FieldLogger
grpcServer *grpc.Server
}
func NewNode(ID string, fsm raft.FSM, port int, opts ...NodeOption) (*Node, error) {
var (
defaultSnapshotRetain = 5
defaultDataDir = "./data"
defaultLogger = logrus.StandardLogger()
defaultDiscovery = discovery.NewNullDiscovery()
defaultDiscoveryLookupInterval = time.Second
defaultMarshalFn = json.Marshal
defaultUnmarshalFn = json.Unmarshal
)
n := &Node{
id: ID,
port: port,
dataDir: defaultDataDir,
snapshotRetain: defaultSnapshotRetain,
logger: defaultLogger,
discovery: defaultDiscovery,
discoveryLookupInterval: defaultDiscoveryLookupInterval,
marshalFn: defaultMarshalFn,
unmarshalFn: defaultUnmarshalFn,
fsm: fsm,
}
for _, opt := range opts {
opt(n)
}
if err := initRaft(n); err != nil {
return nil, fmt.Errorf(`initRaft(...): %w`, err)
}
return n, nil
}
func (n *Node) Start(ctx context.Context) error {
g := new(errgroup.Group)
g.Go(func() error { return startRaft(ctx, n) })
g.Go(func() error { return startDiscoveryLookup(ctx, n) })
g.Go(func() error { return startDiscoveryRegister(ctx, n) })
return g.Wait()
}
func (n *Node) Apply(cmd string, timeout time.Duration, args ...interface{}) (interface{}, error) {
payload, err := n.marshalFn(&command{cmd, keyval.ToMap(args...)})
if err != nil {
return nil, fmt.Errorf(`n.marshalFn(...): %w`, err)
}
if err := n.raft.VerifyLeader().Error(); err != nil {
return applyOnLeader(n, payload, timeout)
}
return apply(n, payload, timeout)
}