Skip to content

Commit

Permalink
feat: remove pipecommit (#2742)
Browse files Browse the repository at this point in the history
  • Loading branch information
buddh0 authored Oct 23, 2024
1 parent 675449a commit caa9e79
Show file tree
Hide file tree
Showing 22 changed files with 234 additions and 670 deletions.
1 change: 0 additions & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var (
utils.DirectBroadcastFlag,
utils.DisableSnapProtocolFlag,
utils.EnableTrustProtocolFlag,
utils.PipeCommitFlag,
utils.RangeLimitFlag,
utils.USBFlag,
utils.SmartCardDaemonPathFlag,
Expand Down
8 changes: 0 additions & 8 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ var (
Usage: "Enable trust protocol",
Category: flags.FastNodeCategory,
}
PipeCommitFlag = &cli.BoolFlag{
Name: "pipecommit",
Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false)",
Category: flags.DeprecatedCategory,
}
RangeLimitFlag = &cli.BoolFlag{
Name: "rangelimit",
Usage: "Enable 5000 blocks limit for range query",
Expand Down Expand Up @@ -1982,9 +1977,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(EnableTrustProtocolFlag.Name) {
cfg.EnableTrustProtocol = ctx.IsSet(EnableTrustProtocolFlag.Name)
}
if ctx.IsSet(PipeCommitFlag.Name) {
log.Warn("The --pipecommit flag is deprecated and could be removed in the future!")
}
if ctx.IsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.Bool(RangeLimitFlag.Name)
}
Expand Down
30 changes: 6 additions & 24 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ package core
import (
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

const badBlockCacheExpire = 30 * time.Second

type BlockValidatorOption func(*BlockValidator) *BlockValidator

func EnableRemoteVerifyManager(remoteValidator *remoteVerifyManager) BlockValidatorOption {
Expand Down Expand Up @@ -74,9 +70,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) {
return ErrKnownBlock
}
if v.bc.isCachedBadBlock(block) {
return ErrKnownBadBlock
}
// Header validity is known at this point. Here we verify that uncles, transactions
// and withdrawals given in the block body match the header.
header := block.Header()
Expand Down Expand Up @@ -192,23 +185,12 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
return nil
},
}
if statedb.IsPipeCommit() {
validateFuns = append(validateFuns, func() error {
if err := statedb.WaitPipeVerification(); err != nil {
return err
}
statedb.CorrectAccountsRoot(common.Hash{})
statedb.Finalise(v.config.IsEIP158(header.Number))
return nil
})
} else {
validateFuns = append(validateFuns, func() error {
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
})
}
validateFuns = append(validateFuns, func() error {
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
})
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
tmpFunc := f
Expand Down
78 changes: 5 additions & 73 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ var (

blockRecvTimeDiffGauge = metrics.NewRegisteredGauge("chain/block/recvtimediff", nil)

errStateRootVerificationFailed = errors.New("state root verification failed")
errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")
errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")
)

const (
Expand All @@ -116,7 +115,6 @@ const (
receiptsCacheLimit = 10000
sidecarsCacheLimit = 1024
txLookupCacheLimit = 1024
maxBadBlockLimit = 16
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128
Expand All @@ -126,8 +124,6 @@ const (
diffLayerFreezerRecheckInterval = 3 * time.Second
maxDiffForkDist = 11 // Maximum allowed backward distance from the chain head

rewindBadBlockInterval = 1 * time.Second

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
// Changelog:
Expand Down Expand Up @@ -294,8 +290,6 @@ type BlockChain struct {

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]
// Cache for the blocks that failed to pass MPT root verification
badBlockCache *lru.Cache[common.Hash, time.Time]

// trusted diff layers
diffLayerCache *exlru.Cache // Cache for the diffLayers
Expand All @@ -316,7 +310,6 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
pipeCommit bool

// monitor
doubleSignMonitor *monitor.DoubleSignMonitor
Expand Down Expand Up @@ -378,7 +371,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
badBlockCache: lru.NewCache[common.Hash, time.Time](maxBadBlockLimit),
diffLayerCache: diffLayerCache,
diffLayerChanCache: diffLayerChanCache,
engine: engine,
Expand Down Expand Up @@ -559,11 +551,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.wg.Add(1)
go bc.trustedDiffLayerLoop()
}
if bc.pipeCommit {
// check current block and rewind invalid one
bc.wg.Add(1)
go bc.rewindInvalidHeaderBlockLoop()
}

if bc.doubleSignMonitor != nil {
bc.wg.Add(1)
Expand Down Expand Up @@ -817,26 +804,6 @@ func (bc *BlockChain) SetHeadWithTimestamp(timestamp uint64) error {
return nil
}

func (bc *BlockChain) tryRewindBadBlocks() {
if !bc.chainmu.TryLock() {
return
}
defer bc.chainmu.Unlock()
block := bc.CurrentBlock()
snaps := bc.snaps
// Verified and Result is false
if snaps != nil && snaps.Snapshot(block.Root) != nil &&
snaps.Snapshot(block.Root).Verified() && !snaps.Snapshot(block.Root).WaitAndGetVerifyRes() {
// Rewind by one block
log.Warn("current block verified failed, rewind to its parent", "height", block.Number.Uint64(), "hash", block.Hash())
bc.futureBlocks.Remove(block.Hash())
bc.badBlockCache.Add(block.Hash(), time.Now())
bc.diffLayerCache.Remove(block.Hash())
bc.reportBlock(bc.GetBlockByHash(block.Hash()), nil, errStateRootVerificationFailed)
bc.setHeadBeyondRoot(block.Number.Uint64()-1, 0, common.Hash{}, false)
}
}

// rewindHashHead implements the logic of rewindHead in the context of hash scheme.
func (bc *BlockChain) rewindHashHead(head *types.Header, root common.Hash) (*types.Header, uint64) {
var (
Expand Down Expand Up @@ -1893,7 +1860,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return nil
}
// Commit all cached state changes into underlying memory database.
_, diffLayer, err := state.Commit(block.NumberU64(), bc.tryRewindBadBlocks, tryCommitTrieDB)
_, diffLayer, err := state.Commit(block.NumberU64(), tryCommitTrieDB)
if err != nil {
return err
}
Expand Down Expand Up @@ -2269,9 +2236,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}

// Process block using the parent state as reference point
if bc.pipeCommit {
statedb.EnablePipeCommit()
}
statedb.SetExpectedStateRoot(block.Root())
pstart := time.Now()
statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
Expand Down Expand Up @@ -2889,22 +2853,6 @@ func (bc *BlockChain) updateFutureBlocks() {
}
}

func (bc *BlockChain) rewindInvalidHeaderBlockLoop() {
recheck := time.NewTicker(rewindBadBlockInterval)
defer func() {
recheck.Stop()
bc.wg.Done()
}()
for {
select {
case <-recheck.C:
bc.tryRewindBadBlocks()
case <-bc.quit:
return
}
}
}

func (bc *BlockChain) trustedDiffLayerLoop() {
recheck := time.NewTicker(diffLayerFreezerRecheckInterval)
defer func() {
Expand Down Expand Up @@ -3042,17 +2990,6 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
return false
}

func (bc *BlockChain) isCachedBadBlock(block *types.Block) bool {
if timeAt, exist := bc.badBlockCache.Get(block.Hash()); exist {
if time.Since(timeAt) >= badBlockCacheExpire {
bc.badBlockCache.Remove(block.Hash())
return false
}
return true
}
return false
}

// reportBlock logs a bad block error.
// bad block need not save receipts & sidecars.
func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
Expand Down Expand Up @@ -3114,11 +3051,6 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) {

func (bc *BlockChain) TriesInMemory() uint64 { return bc.triesInMemory }

func EnablePipelineCommit(bc *BlockChain) (*BlockChain, error) {
bc.pipeCommit = false
return bc, nil
}

func EnablePersistDiff(limit uint64) BlockChainOption {
return func(chain *BlockChain) (*BlockChain, error) {
chain.diffLayerFreezerBlockLimit = limit
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestFreezeDiffLayer(t *testing.T) {
// Wait for the buffer to be zero.
}
// Minus one empty block.
if fullBackend.chain.diffQueue.Size() > blockNum-1 && fullBackend.chain.diffQueue.Size() < blockNum-2 {
if fullBackend.chain.diffQueue.Size() != blockNum-1 {
t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum-1, fullBackend.chain.diffQueue.Size())
}

Expand Down
6 changes: 0 additions & 6 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,6 @@ func (bc *BlockChain) HasState(hash common.Hash) bool {
if bc.NoTries() {
return bc.snaps != nil && bc.snaps.Snapshot(hash) != nil
}
if bc.pipeCommit && bc.snaps != nil {
// If parent snap is pending on verification, treat it as state exist
if s := bc.snaps.Snapshot(hash); s != nil && !s.Verified() {
return true
}
}
_, err := bc.stateCache.OpenTrie(hash)
return err == nil
}
Expand Down
Loading

0 comments on commit caa9e79

Please sign in to comment.