Skip to content

Commit

Permalink
consensus/istanbul: implement gossip network
Browse files Browse the repository at this point in the history
  • Loading branch information
markya0616 authored and yutelin committed Sep 2, 2017
1 parent 78a616e commit 470ade5
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 5 deletions.
5 changes: 4 additions & 1 deletion consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ type Backend interface {
// EventMux returns the event mux in backend
EventMux() *event.TypeMux

// Broadcast sends a message to all validators
// Broadcast sends a message to all validators (include self)
Broadcast(valSet ValidatorSet, payload []byte) error

// Gossip sends a message to all validators (exclude self)
Gossip(valSet ValidatorSet, payload []byte) error

// Commit delivers an approved proposal to backend.
// The delivered proposal will be put into blockchain.
Commit(proposal Proposal, seals [][]byte) error
Expand Down
28 changes: 27 additions & 1 deletion consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
// Allocate the snapshot caches and create the engine
recents, _ := lru.NewARC(inmemorySnapshots)
recentMessages, _ := lru.NewARC(inmemoryPeers)
knownMessages, _ := lru.NewARC(inmemoryMessages)
backend := &backend{
config: config,
istanbulEventMux: new(event.TypeMux),
Expand All @@ -50,6 +52,8 @@ func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Databas
recents: recents,
candidates: make(map[common.Address]bool),
coreStarted: false,
recentMessages: recentMessages,
knownMessages: knownMessages,
}
backend.core = istanbulCore.New(backend, backend.config)
return backend
Expand Down Expand Up @@ -84,6 +88,9 @@ type backend struct {

// event subscription for ChainHeadEvent event
broadcaster consensus.Broadcaster

recentMessages *lru.ARCCache // the cache of peer's messages
knownMessages *lru.ARCCache // the cache of self messages
}

// Address implements istanbul.Backend.Address
Expand All @@ -108,7 +115,11 @@ func (sb *backend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error
return nil
}

// Broadcast implements istanbul.Backend.Gossip
func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error {
hash := istanbul.RLPHash(payload)
sb.knownMessages.Add(hash, true)

targets := make(map[common.Address]bool)
for _, val := range valSet.List() {
if val.Address() != sb.Address() {
Expand All @@ -118,7 +129,22 @@ func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error {

if sb.broadcaster != nil && len(targets) > 0 {
ps := sb.broadcaster.FindPeers(targets)
for _, p := range ps {
for addr, p := range ps {
ms, ok := sb.recentMessages.Get(addr)
var m *lru.ARCCache
if ok {
m, _ = ms.(*lru.ARCCache)
if _, k := m.Get(hash); k {
// This peer had this event, skip it
continue
}
} else {
m, _ = lru.NewARC(inmemoryMessages)
}

m.Add(hash, true)
sb.recentMessages.Add(addr, m)

go p.Send(istanbulMsg, payload)
}
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
const (
checkpointInterval = 1024 // Number of blocks after which to save the vote snapshot to the database
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
inmemoryPeers = 40
inmemoryMessages = 1024
)

var (
Expand Down
20 changes: 20 additions & 0 deletions consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
lru "github.com/hashicorp/golang-lru"
)

const (
Expand Down Expand Up @@ -59,6 +60,25 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
return true, errDecodeFailed
}

hash := istanbul.RLPHash(data)

// Mark peer's message
ms, ok := sb.recentMessages.Get(addr)
var m *lru.ARCCache
if ok {
m, _ = ms.(*lru.ARCCache)
} else {
m, _ = lru.NewARC(inmemoryMessages)
sb.recentMessages.Add(addr, m)
}
m.Add(hash, true)

// Mark self known message
if _, ok := sb.knownMessages.Get(hash); ok {
return true, nil
}
sb.knownMessages.Add(hash, true)

go sb.istanbulEventMux.Post(istanbul.MessageEvent{
Payload: data,
})
Expand Down
14 changes: 11 additions & 3 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,19 @@ func (c *core) handleEvents() {
c.storeRequestMsg(r)
}
case istanbul.MessageEvent:
c.handleMsg(ev.Payload)
if err := c.handleMsg(ev.Payload); err == nil {
c.backend.Gossip(c.valSet, ev.Payload)
}
case backlogEvent:
// No need to check signature for internal messages
c.handleCheckedMsg(ev.msg, ev.src)

if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
p, err := ev.msg.Payload()
if err != nil {
c.logger.Warn("Get message payload failed", "err", err)
continue
}
c.backend.Gossip(c.valSet, p)
}
}
case _, ok := <-c.timeoutSub.Chan():
if !ok {
Expand Down
5 changes: 5 additions & 0 deletions consensus/istanbul/core/testbackend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (self *testSystemBackend) Broadcast(valSet istanbul.ValidatorSet, message [
return nil
}

func (self *testSystemBackend) Gossip(valSet istanbul.ValidatorSet, message []byte) error {
testLogger.Warn("not sign any data")
return nil
}

func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
testLogger.Info("commit message", "address", self.Address())
self.committedMsgs = append(self.committedMsgs, testCommittedMsgs{
Expand Down

0 comments on commit 470ade5

Please sign in to comment.