diff --git a/erigon-lib/common/dbg/leak_detector.go b/erigon-lib/common/dbg/leak_detector.go index d4369f2be9e..433063b578e 100644 --- a/erigon-lib/common/dbg/leak_detector.go +++ b/erigon-lib/common/dbg/leak_detector.go @@ -37,21 +37,19 @@ func NewLeakDetector(name string, slowThreshold time.Duration) *LeakDetector { d := &LeakDetector{list: map[uint64]LeakDetectorItem{}} d.SetSlowThreshold(slowThreshold) - if enabled { - go func() { - logEvery := time.NewTicker(60 * time.Second) - defer logEvery.Stop() + go func() { + logEvery := time.NewTicker(60 * time.Second) + defer logEvery.Stop() - for { - select { - case <-logEvery.C: - if list := d.slowList(); len(list) > 0 { - log.Info(fmt.Sprintf("[dbg.%s] long living resources", name), "list", strings.Join(d.slowList(), ", ")) - } + for { + select { + case <-logEvery.C: + if list := d.slowList(); len(list) > 0 { + log.Info(fmt.Sprintf("[dbg.%s] long living resources", name), "list", strings.Join(d.slowList(), ", ")) } } - }() - } + } + }() return d } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index b5be1bd3c79..4aef00db17b 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -16,7 +16,6 @@ import ( "github.com/c2h5oh/datasize" "github.com/erigontech/mdbx-go/mdbx" "github.com/ledgerwatch/erigon-lib/config3" - "github.com/ledgerwatch/erigon/consensus/aura" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/errgroup" @@ -156,11 +155,7 @@ func ExecV3(ctx context.Context, blocksFreezeCfg := cfg.blockReader.FreezingCfg() if initialCycle { - if _, ok := engine.(*aura.AuRa); ok { //gnosis collate eating too much RAM, will add ETL later - agg.SetCollateAndBuildWorkers(1) - } else { - agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers())) - } + agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers())) agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) defer agg.DiscardHistory(kv.CommitmentDomain).EnableHistory(kv.CommitmentDomain) } else { diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index 7c22e845057..d79c3695940 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -124,6 +124,8 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original return } + defer e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer) + var validationError string type canonicalEntry struct { hash common.Hash @@ -134,13 +136,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - - defer e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer) + defer tx.Rollback() blockHash := originalBlockHash @@ -154,7 +150,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore // Step one, find reconnection point, and mark all of those headers as canonical. fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, originalBlockHash) @@ -167,10 +162,11 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original return } - tooBigJump := e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit) - - if tooBigJump { + limitedBigJump := e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit-2) + isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore + if limitedBigJump { isSynced = false + log.Info("[sync] limited big jump", "from", finishProgressBefore, "amount", uint64(e.syncCfg.LoopBlockLimit)) } canonicalHash, err := e.blockReader.CanonicalHash(ctx, tx, fcuHeader.Number.Uint64()) @@ -272,7 +268,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber+1); err != nil { - //if err := rawdbv3.TxNums.Truncate(tx, fcuHeader.Number.Uint64()); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } @@ -316,43 +311,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } } -TooBigJumpStep: - if tx == nil { - tx, err = e.db.BeginRwNosync(ctx) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - } - finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - tooBigJump = e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64() > finishProgressBefore && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit) - if tooBigJump { //jump forward by 1K blocks - log.Info("[sync] jump by 1K blocks", "currentJumpTo", finishProgressBefore+uint64(e.syncCfg.LoopBlockLimit), "bigJumpTo", fcuHeader.Number.Uint64()) - blockHash, err = e.blockReader.CanonicalHash(ctx, tx, finishProgressBefore+uint64(e.syncCfg.LoopBlockLimit)) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - fcuHeader, err = e.blockReader.HeaderByHash(ctx, tx, blockHash) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if fcuHeader == nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("forkchoice: block %x not found or was marked invalid", blockHash)) - return - } - } - // Set Progress for headers and bodies accordingly. if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) @@ -378,7 +336,7 @@ TooBigJumpStep: } } // Run the forkchoice - initialCycle := tooBigJump + initialCycle := limitedBigJump if _, err := e.executionPipeline.Run(e.db, wrap.TxContainer{Tx: tx}, initialCycle); err != nil { err = fmt.Errorf("updateForkChoice: %w", err) sendForkchoiceErrorWithoutWaiting(outcomeCh, err) @@ -400,30 +358,27 @@ TooBigJumpStep: e.logger.Warn("bad forkchoice", "head", headHash, "hash", blockHash) } } else { - if !tooBigJump { - valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if !valid { - sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ - Status: execution.ExecutionStatus_InvalidForkchoice, - LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}), - }) - return - } - if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } + valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if !valid { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + Status: execution.ExecutionStatus_InvalidForkchoice, + LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}), + }) + return + } + if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return } if err := tx.Commit(); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - tx = nil if e.hook != nil { if err := e.db.View(ctx, func(tx kv.Tx) error { @@ -457,9 +412,6 @@ TooBigJumpStep: timings = append(timings, "commit", time.Since(commitStart), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys)) e.logger.Info("Timings (slower than 50ms)", timings...) } - if tooBigJump { - goto TooBigJumpStep - } sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ LatestValidHash: gointerfaces.ConvertHashToH256(headHash), diff --git a/turbo/execution/eth1/inserters.go b/turbo/execution/eth1/inserters.go index 378c629ef8d..1dfa5e036a3 100644 --- a/turbo/execution/eth1/inserters.go +++ b/turbo/execution/eth1/inserters.go @@ -42,13 +42,14 @@ func (e *EthereumExecutionModule) InsertBlocks(ctx context.Context, req *executi }, nil } defer e.semaphore.Release(1) + e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer) + frozenBlocks := e.blockReader.FrozenBlocks() + tx, err := e.db.BeginRw(ctx) if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.InsertBlocks: could not begin transaction: %s", err) } defer tx.Rollback() - e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer) - frozenBlocks := e.blockReader.FrozenBlocks() for _, block := range req.Blocks { // Skip frozen blocks. @@ -94,5 +95,5 @@ func (e *EthereumExecutionModule) InsertBlocks(ctx context.Context, req *executi return &execution.InsertionResult{ Result: execution.ExecutionStatus_Success, - }, tx.Commit() + }, nil }