From 470ade5c0f9968449ca3fd7e7f85455289b858da Mon Sep 17 00:00:00 2001 From: "mark.lin" Date: Fri, 25 Aug 2017 14:50:54 +0800 Subject: [PATCH] consensus/istanbul: implement gossip network --- consensus/istanbul/backend.go | 5 +++- consensus/istanbul/backend/backend.go | 28 ++++++++++++++++++++- consensus/istanbul/backend/engine.go | 2 ++ consensus/istanbul/backend/handler.go | 20 +++++++++++++++ consensus/istanbul/core/handler.go | 14 ++++++++--- consensus/istanbul/core/testbackend_test.go | 5 ++++ 6 files changed, 69 insertions(+), 5 deletions(-) diff --git a/consensus/istanbul/backend.go b/consensus/istanbul/backend.go index 2a4a93e700..2f24a65b32 100644 --- a/consensus/istanbul/backend.go +++ b/consensus/istanbul/backend.go @@ -34,9 +34,12 @@ type Backend interface { // EventMux returns the event mux in backend EventMux() *event.TypeMux - // Broadcast sends a message to all validators + // Broadcast sends a message to all validators (include self) Broadcast(valSet ValidatorSet, payload []byte) error + // Gossip sends a message to all validators (exclude self) + Gossip(valSet ValidatorSet, payload []byte) error + // Commit delivers an approved proposal to backend. // The delivered proposal will be put into blockchain. Commit(proposal Proposal, seals [][]byte) error diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index c6c892e40d..b9345b211c 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -39,6 +39,8 @@ import ( func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul { // Allocate the snapshot caches and create the engine recents, _ := lru.NewARC(inmemorySnapshots) + recentMessages, _ := lru.NewARC(inmemoryPeers) + knownMessages, _ := lru.NewARC(inmemoryMessages) backend := &backend{ config: config, istanbulEventMux: new(event.TypeMux), @@ -50,6 +52,8 @@ func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Databas recents: recents, candidates: make(map[common.Address]bool), coreStarted: false, + recentMessages: recentMessages, + knownMessages: knownMessages, } backend.core = istanbulCore.New(backend, backend.config) return backend @@ -84,6 +88,9 @@ type backend struct { // event subscription for ChainHeadEvent event broadcaster consensus.Broadcaster + + recentMessages *lru.ARCCache // the cache of peer's messages + knownMessages *lru.ARCCache // the cache of self messages } // Address implements istanbul.Backend.Address @@ -108,7 +115,11 @@ func (sb *backend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error return nil } +// Broadcast implements istanbul.Backend.Gossip func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error { + hash := istanbul.RLPHash(payload) + sb.knownMessages.Add(hash, true) + targets := make(map[common.Address]bool) for _, val := range valSet.List() { if val.Address() != sb.Address() { @@ -118,7 +129,22 @@ func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error { if sb.broadcaster != nil && len(targets) > 0 { ps := sb.broadcaster.FindPeers(targets) - for _, p := range ps { + for addr, p := range ps { + ms, ok := sb.recentMessages.Get(addr) + var m *lru.ARCCache + if ok { + m, _ = ms.(*lru.ARCCache) + if _, k := m.Get(hash); k { + // This peer had this event, skip it + continue + } + } else { + m, _ = lru.NewARC(inmemoryMessages) + } + + m.Add(hash, true) + sb.recentMessages.Add(addr, m) + go p.Send(istanbulMsg, payload) } } diff --git a/consensus/istanbul/backend/engine.go b/consensus/istanbul/backend/engine.go index 1a87652d31..0ace594e37 100644 --- a/consensus/istanbul/backend/engine.go +++ b/consensus/istanbul/backend/engine.go @@ -41,6 +41,8 @@ import ( const ( checkpointInterval = 1024 // Number of blocks after which to save the vote snapshot to the database inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory + inmemoryPeers = 40 + inmemoryMessages = 1024 ) var ( diff --git a/consensus/istanbul/backend/handler.go b/consensus/istanbul/backend/handler.go index 3769250cb7..c5111c5ef1 100644 --- a/consensus/istanbul/backend/handler.go +++ b/consensus/istanbul/backend/handler.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/istanbul" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" + lru "github.com/hashicorp/golang-lru" ) const ( @@ -59,6 +60,25 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) { return true, errDecodeFailed } + hash := istanbul.RLPHash(data) + + // Mark peer's message + ms, ok := sb.recentMessages.Get(addr) + var m *lru.ARCCache + if ok { + m, _ = ms.(*lru.ARCCache) + } else { + m, _ = lru.NewARC(inmemoryMessages) + sb.recentMessages.Add(addr, m) + } + m.Add(hash, true) + + // Mark self known message + if _, ok := sb.knownMessages.Get(hash); ok { + return true, nil + } + sb.knownMessages.Add(hash, true) + go sb.istanbulEventMux.Post(istanbul.MessageEvent{ Payload: data, }) diff --git a/consensus/istanbul/core/handler.go b/consensus/istanbul/core/handler.go index 3c4cb1a3d3..b9556b7432 100644 --- a/consensus/istanbul/core/handler.go +++ b/consensus/istanbul/core/handler.go @@ -95,11 +95,19 @@ func (c *core) handleEvents() { c.storeRequestMsg(r) } case istanbul.MessageEvent: - c.handleMsg(ev.Payload) + if err := c.handleMsg(ev.Payload); err == nil { + c.backend.Gossip(c.valSet, ev.Payload) + } case backlogEvent: // No need to check signature for internal messages - c.handleCheckedMsg(ev.msg, ev.src) - + if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil { + p, err := ev.msg.Payload() + if err != nil { + c.logger.Warn("Get message payload failed", "err", err) + continue + } + c.backend.Gossip(c.valSet, p) + } } case _, ok := <-c.timeoutSub.Chan(): if !ok { diff --git a/consensus/istanbul/core/testbackend_test.go b/consensus/istanbul/core/testbackend_test.go index 8d9e5f162e..69c6e46d2e 100644 --- a/consensus/istanbul/core/testbackend_test.go +++ b/consensus/istanbul/core/testbackend_test.go @@ -87,6 +87,11 @@ func (self *testSystemBackend) Broadcast(valSet istanbul.ValidatorSet, message [ return nil } +func (self *testSystemBackend) Gossip(valSet istanbul.ValidatorSet, message []byte) error { + testLogger.Warn("not sign any data") + return nil +} + func (self *testSystemBackend) Commit(proposal istanbul.Proposal, seals [][]byte) error { testLogger.Info("commit message", "address", self.Address()) self.committedMsgs = append(self.committedMsgs, testCommittedMsgs{