Skip to content

Commit

Permalink
eth/protocols/eth: add JustifiedNumber into StatusPacket
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 committed Sep 12, 2024
1 parent 282aee5 commit 3c5badd
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 61 deletions.
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruni
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
func (s *Ethereum) Merger() *consensus.Merger { return s.merger }
func (s *Ethereum) SyncMode() downloader.SyncMode {
mode, _ := s.handler.chainSync.modeAndLocalHead()
mode, _, _ := s.handler.chainSync.modeAndLocalHead()
return mode
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
mode := d.getMode()

// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
latest, _, _ := p.peer.Head()
fetch := 1
if mode == SnapSync {
fetch = 2 // head + pivot headers
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type peerConnection struct {

// Peer encapsulates the methods required to synchronise with a remote full peer.
type Peer interface {
Head() (common.Hash, *big.Int)
Head() (common.Hash, *uint64, *big.Int)
MarkLagging()
RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
Expand Down
15 changes: 8 additions & 7 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {

// Execute the Ethereum handshake
var (
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
justifiedNumber = h.chain.GetJustifiedNumber(head)
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
)
forkID := forkid.NewID(h.chain.Config(), genesis, number, head.Time)
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
if err := peer.Handshake(h.networkID, justifiedNumber, td, hash, genesis.Hash(), forkID, h.forkFilter, &eth.UpgradeStatusExtension{DisablePeerTxBroadcast: h.disablePeerTxBroadcast}); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down Expand Up @@ -920,7 +921,7 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) {
headBlock := h.chain.CurrentBlock()
currentTD := h.chain.GetTd(headBlock.Hash(), headBlock.Number.Uint64())
for _, peer := range peers {
_, peerTD := peer.Head()
_, _, peerTD := peer.Head()
deltaTD := new(big.Int).Abs(new(big.Int).Sub(currentTD, peerTD))
if deltaTD.Cmp(big.NewInt(deltaTdThreshold)) < 1 && peer.bscExt != nil {
voteMap[peer] = vote
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_bsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func testSendVotes(t *testing.T, protocol uint) {
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -227,7 +227,7 @@ func testRecvVotes(t *testing.T, protocol uint) {
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := remoteEth.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
}

Expand Down
24 changes: 20 additions & 4 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)

Expand Down Expand Up @@ -138,10 +139,25 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, packet *eth.NewBlockPa
trueHead = block.ParentHash()
trueTD = new(big.Int).Sub(td, block.Difficulty())
)
// Update the peer's total difficulty if better than the previous
if _, td := peer.Head(); trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueTD)
h.chainSync.handlePeerEvent()
// Update the peer's justifiedNumber and total difficulty if better than the previous
trueJustifiedNumber := h.chain.GetJustifiedNumber(block.Header())
if trueJustifiedNumber == 0 {
log.Error("Fail to GetJustifiedNumber when handleBlockBroadcast")
}
_, justifiedNumber, td := peer.Head()
if trueJustifiedNumber != 0 && justifiedNumber != nil {
if trueJustifiedNumber > *justifiedNumber ||
(trueJustifiedNumber == *justifiedNumber && trueTD.Cmp(td) > 0) {
peer.SetHead(trueHead, trueJustifiedNumber, trueTD)
h.chainSync.handlePeerEvent()
}
} else {
// back to behavior without fast finality
if trueTD.Cmp(td) > 0 {
peer.SetHead(trueHead, trueJustifiedNumber, trueTD)
h.chainSync.handlePeerEvent()
}
}

return nil
}
10 changes: 5 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// Send the transaction to the sink and verify that it's added to the tx pool
Expand Down Expand Up @@ -419,7 +419,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.Number.Uint64())
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := sink.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -636,7 +636,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Wait a bit for the above handlers to start
time.Sleep(100 * time.Millisecond)

if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
if err := sinkPeer.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
go eth.Handle(sink, sinkPeer)
Expand Down Expand Up @@ -709,7 +709,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
if err := sink.Handshake(1, 0, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
Expand Down Expand Up @@ -811,7 +811,7 @@ func TestOptionMaxPeersPerIP(t *testing.T) {
t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}(tryNum)

if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
if err := src.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// make sure runEthPeer execute one by one.
Expand Down
38 changes: 30 additions & 8 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,24 +515,46 @@ func (ps *peerSet) snapLen() int {
return ps.snapPeers
}

// peerWithHighestTD retrieves the known peer with the currently highest total
// difficulty, but below the given PoS switchover threshold.
func (ps *peerSet) peerWithHighestTD() *eth.Peer {
// peerWithHighestHead retrieves the known peer with the currently highest head
func (ps *peerSet) peerWithHighestHead() *eth.Peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

var (
bestPeer *eth.Peer
bestTd *big.Int
)
var knowJustifiedPeers, notKnowJustifiedPeers []*ethPeer
for _, p := range ps.peers {
if p.Lagging() {
continue
}
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
if _, justifiedNumber, _ := p.Head(); justifiedNumber != nil {
knowJustifiedPeers = append(knowJustifiedPeers, p)
} else {
notKnowJustifiedPeers = append(notKnowJustifiedPeers, p)
}
}

var (
bestPeer *eth.Peer
bestJustified *uint64
bestTd *big.Int
)
for _, p := range knowJustifiedPeers {
_, justifiedNumber, td := p.Head()
if bestPeer == nil {
bestPeer, bestTd = p.Peer, td
} else if *justifiedNumber > *bestJustified ||
(*justifiedNumber == *bestJustified && td.Cmp(bestTd) > 0) {
bestPeer = p.Peer
if td.Cmp(bestTd) > 0 {
bestTd = td
}
}
}
for _, p := range notKnowJustifiedPeers {
if _, _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
bestPeer, bestTd = p.Peer, td
}
}

return bestPeer
}

Expand Down
4 changes: 3 additions & 1 deletion eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (

// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
func (p *Peer) Handshake(network uint64, justifiedNumber uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter, extension *UpgradeStatusExtension) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)

Expand All @@ -51,6 +51,8 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
Head: head,
Genesis: genesis,
ForkID: forkID,
// after another hardfork, all nodes can pase new `StatusPacket`, then can uncomment the following line
// JustifiedNumber: &justifiedNumber,
})
})
gopool.Submit(func() {
Expand Down
10 changes: 5 additions & 5 deletions eth/protocols/eth/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ func testHandshake(t *testing.T, protocol uint) {
want: errNoStatusMsg,
},
{
code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID},
code: StatusMsg, data: StatusPacket{10, 1, td, head.Hash(), genesis.Hash(), forkID, nil},
want: errProtocolVersionMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID},
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, td, head.Hash(), genesis.Hash(), forkID, nil},
want: errNetworkIDMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID},
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), common.Hash{3}, forkID, nil},
want: errGenesisMismatch,
},
{
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}},
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, td, head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}, nil},
want: errForkIDRejected,
},
}
Expand All @@ -80,7 +80,7 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data)

err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
err := peer.Handshake(1, 0, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain), nil)
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {
Expand Down
18 changes: 10 additions & 8 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type Peer struct {
version uint // Protocol version negotiated
statusExtension *UpgradeStatusExtension

lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
justifiedNumber *uint64 // Latest advertised justified block number
td *big.Int // Latest advertised head block total difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -160,21 +161,22 @@ func (p *Peer) MarkLagging() {
p.lagging = true
}

// Head retrieves the current head hash and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, td *big.Int) {
// Head retrieves the current head hash, justifiedNumber and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, justifiedNumber *uint64, td *big.Int) {
p.lock.RLock()
defer p.lock.RUnlock()

copy(hash[:], p.head[:])
return hash, new(big.Int).Set(p.td)
return hash, p.justifiedNumber, new(big.Int).Set(p.td)
}

// SetHead updates the head hash and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
// SetHead updates the head hash, justifiedNumber and total difficulty of the peer.
func (p *Peer) SetHead(hash common.Hash, justifiedNumber uint64, td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()
p.lagging = false
copy(p.head[:], hash[:])
p.justifiedNumber = &justifiedNumber
p.td.Set(td)
}

Expand Down
1 change: 1 addition & 0 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type StatusPacket struct {
Head common.Hash
Genesis common.Hash
ForkID forkid.ID
JustifiedNumber *uint64 `rlp:"optional"`
}

type UpgradeStatusExtension struct {
Expand Down
35 changes: 19 additions & 16 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ type chainSyncer struct {

// chainSyncOp is a scheduled sync operation.
type chainSyncOp struct {
mode downloader.SyncMode
peer *eth.Peer
td *big.Int
head common.Hash
mode downloader.SyncMode
peer *eth.Peer
justifiedNumber *uint64
td *big.Int
head common.Hash
}

// newChainSyncer creates a chainSyncer.
Expand Down Expand Up @@ -163,16 +164,17 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
if cs.handler.peers.len() < minPeers {
return nil
}
// We have enough peers, pick the one with the highest TD, but avoid going
// over the terminal total difficulty. Above that we expect the consensus
// We have enough peers, pick the one with the highest Head. Above that we expect the consensus
// clients to direct the chain head to sync to.
peer := cs.handler.peers.peerWithHighestTD()
peer := cs.handler.peers.peerWithHighestHead()
if peer == nil {
return nil
}
mode, ourTD := cs.modeAndLocalHead()
mode, justifiedNumber, ourTD := cs.modeAndLocalHead()
op := peerToSyncOp(mode, peer)
if op.td.Cmp(ourTD) <= 0 {
// TODO: op.td.Cmp(ourTD) <= 0 --> op.td.Cmp(ourTD) < 0?
if (op.justifiedNumber == nil && op.td.Cmp(ourTD) <= 0) ||
op.justifiedNumber != nil && (*op.justifiedNumber < *justifiedNumber || (*op.justifiedNumber == *justifiedNumber && op.td.Cmp(ourTD) <= 0)) {
// We seem to be in sync according to the legacy rules. In the merge
// world, it can also mean we're stuck on the merge block, waiting for
// a beacon client. In the latter case, notify the user.
Expand All @@ -186,16 +188,16 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
}

func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp {
peerHead, peerTD := p.Head()
return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead}
peerHead, peerJustifiedNumber, peerTD := p.Head()
return &chainSyncOp{mode: mode, peer: p, justifiedNumber: peerJustifiedNumber, td: peerTD, head: peerHead}
}

func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *uint64, *big.Int) {
// If we're in snap sync mode, return that directly
if cs.handler.snapSync.Load() {
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
return downloader.SnapSync, td
return downloader.SnapSync, nil, td
}
// We are probably in full sync, but we might have rewound to before the
// snap sync pivot, check if we should re-enable snap sync.
Expand All @@ -207,7 +209,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
}
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
return downloader.SnapSync, td
return downloader.SnapSync, nil, td
}
}
// We are in a full sync, but the associated head state is missing. To complete
Expand All @@ -217,11 +219,12 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
log.Info("Reenabled snap sync as chain is stateless")
return downloader.SnapSync, td
return downloader.SnapSync, nil, td
}
// Nope, we're really full syncing
justifiedNumber := cs.handler.chain.GetJustifiedNumber(head)
td := cs.handler.chain.GetTd(head.Hash(), head.Number.Uint64())
return downloader.FullSync, td
return downloader.FullSync, &justifiedNumber, td
}

// startSync launches doSync in a new goroutine.
Expand Down
Loading

0 comments on commit 3c5badd

Please sign in to comment.