Skip to content

Commit

Permalink
forkChoice: rely on Caplin's limits support (#10343)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored May 15, 2024
1 parent cd37ee6 commit 2679b8a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 92 deletions.
22 changes: 10 additions & 12 deletions erigon-lib/common/dbg/leak_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,19 @@ func NewLeakDetector(name string, slowThreshold time.Duration) *LeakDetector {
d := &LeakDetector{list: map[uint64]LeakDetectorItem{}}
d.SetSlowThreshold(slowThreshold)

if enabled {
go func() {
logEvery := time.NewTicker(60 * time.Second)
defer logEvery.Stop()
go func() {
logEvery := time.NewTicker(60 * time.Second)
defer logEvery.Stop()

for {
select {
case <-logEvery.C:
if list := d.slowList(); len(list) > 0 {
log.Info(fmt.Sprintf("[dbg.%s] long living resources", name), "list", strings.Join(d.slowList(), ", "))
}
for {
select {
case <-logEvery.C:
if list := d.slowList(); len(list) > 0 {
log.Info(fmt.Sprintf("[dbg.%s] long living resources", name), "list", strings.Join(d.slowList(), ", "))
}
}
}()
}
}
}()
return d
}

Expand Down
7 changes: 1 addition & 6 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/c2h5oh/datasize"
"github.com/erigontech/mdbx-go/mdbx"
"github.com/ledgerwatch/erigon-lib/config3"
"github.com/ledgerwatch/erigon/consensus/aura"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -156,11 +155,7 @@ func ExecV3(ctx context.Context,
blocksFreezeCfg := cfg.blockReader.FreezingCfg()

if initialCycle {
if _, ok := engine.(*aura.AuRa); ok { //gnosis collate eating too much RAM, will add ETL later
agg.SetCollateAndBuildWorkers(1)
} else {
agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers()))
}
agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers()))
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
defer agg.DiscardHistory(kv.CommitmentDomain).EnableHistory(kv.CommitmentDomain)
} else {
Expand Down
94 changes: 23 additions & 71 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
return
}

defer e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer)

var validationError string
type canonicalEntry struct {
hash common.Hash
Expand All @@ -134,13 +136,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
defer func() {
if tx != nil {
tx.Rollback()
}
}()

defer e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer)
defer tx.Rollback()

blockHash := originalBlockHash

Expand All @@ -154,7 +150,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore

// Step one, find reconnection point, and mark all of those headers as canonical.
fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, originalBlockHash)
Expand All @@ -167,10 +162,11 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
return
}

tooBigJump := e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit)

if tooBigJump {
limitedBigJump := e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit-2)
isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore
if limitedBigJump {
isSynced = false
log.Info("[sync] limited big jump", "from", finishProgressBefore, "amount", uint64(e.syncCfg.LoopBlockLimit))
}

canonicalHash, err := e.blockReader.CanonicalHash(ctx, tx, fcuHeader.Number.Uint64())
Expand Down Expand Up @@ -272,7 +268,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
}

if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber+1); err != nil {
//if err := rawdbv3.TxNums.Truncate(tx, fcuHeader.Number.Uint64()); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
Expand Down Expand Up @@ -316,43 +311,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
}
}

TooBigJumpStep:
if tx == nil {
tx, err = e.db.BeginRwNosync(ctx)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
defer func() {
if tx != nil {
tx.Rollback()
}
}()
}
finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
tooBigJump = e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64() > finishProgressBefore && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit)
if tooBigJump { //jump forward by 1K blocks
log.Info("[sync] jump by 1K blocks", "currentJumpTo", finishProgressBefore+uint64(e.syncCfg.LoopBlockLimit), "bigJumpTo", fcuHeader.Number.Uint64())
blockHash, err = e.blockReader.CanonicalHash(ctx, tx, finishProgressBefore+uint64(e.syncCfg.LoopBlockLimit))
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
fcuHeader, err = e.blockReader.HeaderByHash(ctx, tx, blockHash)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if fcuHeader == nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("forkchoice: block %x not found or was marked invalid", blockHash))
return
}
}

// Set Progress for headers and bodies accordingly.
if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
Expand All @@ -378,7 +336,7 @@ TooBigJumpStep:
}
}
// Run the forkchoice
initialCycle := tooBigJump
initialCycle := limitedBigJump
if _, err := e.executionPipeline.Run(e.db, wrap.TxContainer{Tx: tx}, initialCycle); err != nil {
err = fmt.Errorf("updateForkChoice: %w", err)
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
Expand All @@ -400,30 +358,27 @@ TooBigJumpStep:
e.logger.Warn("bad forkchoice", "head", headHash, "hash", blockHash)
}
} else {
if !tooBigJump {
valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if !valid {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
Status: execution.ExecutionStatus_InvalidForkchoice,
LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}),
})
return
}
if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash)
if err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if !valid {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
Status: execution.ExecutionStatus_InvalidForkchoice,
LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}),
})
return
}
if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}

if err := tx.Commit(); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
tx = nil

if e.hook != nil {
if err := e.db.View(ctx, func(tx kv.Tx) error {
Expand Down Expand Up @@ -457,9 +412,6 @@ TooBigJumpStep:
timings = append(timings, "commit", time.Since(commitStart), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
e.logger.Info("Timings (slower than 50ms)", timings...)
}
if tooBigJump {
goto TooBigJumpStep
}

sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(headHash),
Expand Down
7 changes: 4 additions & 3 deletions turbo/execution/eth1/inserters.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ func (e *EthereumExecutionModule) InsertBlocks(ctx context.Context, req *executi
}, nil
}
defer e.semaphore.Release(1)
e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer)
frozenBlocks := e.blockReader.FrozenBlocks()

tx, err := e.db.BeginRw(ctx)
if err != nil {
return nil, fmt.Errorf("ethereumExecutionModule.InsertBlocks: could not begin transaction: %s", err)
}
defer tx.Rollback()
e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer)
frozenBlocks := e.blockReader.FrozenBlocks()

for _, block := range req.Blocks {
// Skip frozen blocks.
Expand Down Expand Up @@ -94,5 +95,5 @@ func (e *EthereumExecutionModule) InsertBlocks(ctx context.Context, req *executi

return &execution.InsertionResult{
Result: execution.ExecutionStatus_Success,
}, tx.Commit()
}, nil
}

0 comments on commit 2679b8a

Please sign in to comment.