Skip to content

Commit

Permalink
Dynamic raft membership support (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
bts authored and joelburget committed Oct 17, 2017
1 parent 57e7e5f commit a218568
Show file tree
Hide file tree
Showing 19 changed files with 1,026 additions and 282 deletions.
11 changes: 10 additions & 1 deletion cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
datadir := ctx.GlobalString(utils.DataDirFlag.Name)
joinExistingId := ctx.GlobalInt(utils.RaftJoinExistingFlag.Name)

raftPort := uint16(ctx.GlobalInt(utils.RaftPortFlag.Name))

if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
privkey := cfg.Node.NodeKey()
strId := discover.PubkeyID(&privkey.PublicKey).String()
Expand All @@ -232,9 +234,16 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth
if joinExistingId > 0 {
myId = uint16(joinExistingId)
joinExisting = true
} else if len(peers) == 0 {
utils.Fatalf("Raft-based consensus requires either (1) an initial peers list (in static-nodes.json) including this enode hash (%v), or (2) the flag --raftjoinexisting RAFT_ID, where RAFT_ID has been issued by an existing cluster member calling `raft.addPeer(ENODE_ID)` with an enode ID containing this node's enode hash.", strId)
} else {
peerIds := make([]string, len(peers))

for peerIdx, peer := range peers {
if !peer.HasRaftPort() {
utils.Fatalf("raftport querystring parameter not specified in static-node enode ID: %v. please check your static-nodes.json file.", peer.String())
}

peerId := peer.ID.String()
peerIds[peerIdx] = peerId
if peerId == strId {
Expand All @@ -249,7 +258,7 @@ func RegisterRaftService(stack *node.Node, ctx *cli.Context, cfg gethConfig, eth

ethereum := <-ethChan

return raft.New(ctx, ethereum.ChainConfig(), myId, joinExisting, blockTimeNanos, ethereum, peers, datadir)
return raft.New(ctx, ethereum.ChainConfig(), myId, raftPort, joinExisting, blockTimeNanos, ethereum, peers, datadir)
}); err != nil {
utils.Fatalf("Failed to register the Raft service: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ var (
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
utils.EmitCheckpointsFlag,
}

Expand Down
9 changes: 9 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ var AppHelpFlagGroups = []flagGroup{
utils.RaftJoinExistingFlag,
},
},
{
Name: "RAFT",
Flags: []cli.Flag{
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.RaftPortFlag,
},
},
{
Name: "ACCOUNT",
Flags: []cli.Flag{
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ var (
Name: "emitcheckpoints",
Usage: "If enabled, emit specially formatted logging checkpoints",
}
RaftPortFlag = cli.IntFlag{
Name: "raftport",
Usage: "The port to bind for the raft transport",
Value: 50400,
}

// Quorum
EnableNodePermissionFlag = cli.BoolFlag{
Expand Down
219 changes: 217 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if p == nil {
return errUnknownPeer
}
if d.mode == BoundedFullSync {
err := d.syncWithPeerUntil(p, hash, td)
if err == nil {
d.processFullSyncContent()
}
return err
}
return d.syncWithPeer(p, hash, td)
}

Expand Down Expand Up @@ -1287,7 +1294,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
if d.mode == FullSync || d.mode == FastSync || d.mode == BoundedFullSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
Expand Down Expand Up @@ -1392,9 +1399,9 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync.Cancel()
if err := d.commitPivotBlock(P); err != nil {
return err

}
}

if err := d.importBlockResults(afterP); err != nil {
return err
}
Expand Down Expand Up @@ -1584,3 +1591,211 @@ func (d *Downloader) requestTTL() time.Duration {
}
return ttl
}

// Extra downloader functionality for non-proof-of-work consensus

// Synchronizes with a peer, but only up to the provided Hash
func (d *Downloader) syncWithPeerUntil(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
}
}()
if p.version < 62 {
return errTooOld
}

log.Info("Synchronising with the network", "id", p.id, "version", p.version)
defer func(start time.Time) {
log.Info("Synchronisation terminated", "duration", time.Since(start))
}(time.Now())

remoteHeader, err := d.fetchHeader(p, hash)
if err != nil {
return err
}

remoteHeight := remoteHeader.Number.Uint64()
localHeight := d.blockchain.CurrentBlock().NumberU64()

d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= localHeight || d.syncStatsChainOrigin > localHeight {
d.syncStatsChainOrigin = localHeight
}
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()

d.queue.Prepare(localHeight+1, d.mode, uint64(0), remoteHeader)
if d.syncInitHook != nil {
d.syncInitHook(localHeight, remoteHeight)
}
fetchers := []func() error{
func() error { return d.fetchBoundedHeaders(p, localHeight+1, remoteHeight) },
func() error { return d.fetchBodies(localHeight + 1) },
func() error { return d.fetchReceipts(localHeight + 1) }, // Receipts are only retrieved during fast sync
func() error { return d.processHeaders(localHeight+1, td) },
}
return d.spawnSync(fetchers)
}

// Fetches a single header from a peer
func (d *Downloader) fetchHeader(p *peerConnection, hash common.Hash) (*types.Header, error) {
log.Info("retrieving remote chain height", "peer", p)

go p.peer.RequestHeadersByHash(hash, 1, 0, false)

timeout := time.After(d.requestTTL())
for {
select {
case <-d.cancelCh:
return nil, errCancelBlockFetch

case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
log.Info("invalid number of head headers (!= 1)", "peer", p, "len(headers)", len(headers))
return nil, errBadPeer
}
return headers[0], nil

case <-timeout:
log.Info("head header timeout", "peer", p)
return nil, errTimeout

case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}

// Not defined in go's stdlib:
func minInt(a, b int) int {
if a < b {
return a
}
return b
}

// Fetches headers between `from` and `to`, inclusive.
// Assumes invariant: from <= to.
func (d *Downloader) fetchBoundedHeaders(p *peerConnection, from uint64, to uint64) error {
log.Info("directing header downloads", "peer", p, "from", from, "to", to)
defer log.Info("header download terminated", "peer", p)

// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
request := time.Now() // time of the last skeleton fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()

getHeaders := func(from uint64) {
request = time.Now()
timeout.Reset(d.requestTTL())

skeletonStart := from + uint64(MaxHeaderFetch) - 1

if skeleton {
if skeletonStart > to {
skeleton = false
}
}

if skeleton {
numSkeletonHeaders := minInt(MaxSkeletonSize, (int(to-from)+1)/MaxHeaderFetch)
log.Trace("fetching skeleton headers", "peer", p, "num skeleton headers", numSkeletonHeaders, "from", from)
go p.peer.RequestHeadersByNumber(skeletonStart, numSkeletonHeaders, MaxHeaderFetch-1, false)
} else {
// There are not enough headers remaining to warrant a skeleton fetch.
// Grab all of the remaining headers.

numHeaders := int(to-from) + 1
log.Trace("fetching full headers", "peer", p, "num headers", numHeaders, "from", from)
go p.peer.RequestHeadersByNumber(from, numHeaders, 0, false)
}
}
// Start pulling the header chain skeleton until all is done
getHeaders(from)

for {
select {
case <-d.cancelCh:
return errCancelHeaderFetch

case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers
if packet.PeerId() != p.id {
log.Info("Received headers from incorrect peer", "peer id", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()

headers := packet.(*headerPack).headers

// If we received a skeleton batch, resolve internals concurrently
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
log.Debug("skeleton chain invalid", "peer", p, "err", err)
return errInvalidChain
}
headers = filled[proced:]
from += uint64(proced)
}
// Insert all the new headers and fetch the next batch
if len(headers) > 0 {
log.Trace("schedule headers", "peer", p, "num headers", len(headers), "from", from)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
return errCancelHeaderFetch
}
from += uint64(len(headers))
}

if from <= to {
getHeaders(from)
} else {
// Notify the content fetchers that no more headers are inbound and return.
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
}
}

case <-timeout.C:
// Header retrieval timed out, consider the peer bad and drop
log.Info("header request timed out", "peer", p)
headerTimeoutMeter.Mark(1)
d.dropPeer(p.id)

// Finish the sync gracefully instead of dumping the gathered data though
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
select {
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
}
}
}
2 changes: 2 additions & 0 deletions eth/downloader/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
FastSync // Quickly download the headers, full sync only at the chain head
LightSync // Download only the headers and terminate afterwards
// Used by raft:
BoundedFullSync SyncMode = 100 // Perform a full sync until the requested hash, and no further
)

func (mode SyncMode) IsValid() bool {
Expand Down
17 changes: 16 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
defer msg.Discard()

if pm.raftMode {
if msg.Code != TxMsg &&
msg.Code != GetBlockHeadersMsg && msg.Code != BlockHeadersMsg &&
msg.Code != GetBlockBodiesMsg && msg.Code != BlockBodiesMsg {

log.Info("raft: ignoring message", "code", msg.Code)

return nil
}
}

// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
Expand Down Expand Up @@ -723,7 +734,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
// NOTE: Raft-based consensus currently assumes that geth broadcasts
// transactions to all peers in the network. A previous comment here
// indicated that this logic might change in the future to only send to a
// subset of peers. If this change occurs upstream, a merge conflict should
// arise here, and we should add logic to send to *all* peers in raft mode.
for _, peer := range peers {
peer.SendTransactions(types.Transactions{tx})
}
Expand Down
10 changes: 7 additions & 3 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,15 @@ func (pm *ProtocolManager) syncer() {
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
go pm.synchronise(pm.peers.BestPeer())
}

case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
if !pm.raftMode {
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
}

case <-pm.noMorePeers:
return
Expand Down
10 changes: 10 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,16 @@ web3._extend({
new web3._extend.Property({
name: 'role',
getter: 'raft_role'
}),
new web3._extend.Method({
name: 'addPeer',
call: 'raft_addPeer',
params: 1
}),
new web3._extend.Method({
name: 'removePeer',
call: 'raft_removePeer',
params: 1
})
]
})
Expand Down
Loading

0 comments on commit a218568

Please sign in to comment.