From 708a5699dca9864e0dce07801bf92f129e0c409b Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 28 Aug 2024 17:08:00 +0200 Subject: [PATCH 1/2] improve fork tracking --- indexer/beacon/client.go | 51 ++++++++++++++------------------- indexer/beacon/forkcache.go | 30 +++++++++++++------ indexer/beacon/forkdetection.go | 32 ++++++++++++++------- indexer/beacon/writedb.go | 24 +++++++++++++--- 4 files changed, 84 insertions(+), 53 deletions(-) diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index fd4b3c73..cf40241b 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -106,23 +106,30 @@ func (c *Client) startClientLoop() { } } -func (c *Client) formatProcessingTimes(processingTimes []time.Duration) string { - if len(processingTimes) == 0 { - return "" - } +func (c *Client) emitBlockLogEntry(slot phase0.Slot, root phase0.Root, source string, isNew bool, forkId ForkKey, processingTimes []time.Duration) { + chainState := c.client.GetPool().GetChainState() - str := strings.Builder{} - str.WriteString(" (") - for i, pt := range processingTimes { - if i > 0 { - str.WriteString(", ") + processingTimesStr := "" + if len(processingTimes) > 0 { + str := strings.Builder{} + str.WriteString(" (") + for i, pt := range processingTimes { + if i > 0 { + str.WriteString(", ") + } + + str.WriteString(fmt.Sprintf("%v ms", pt.Milliseconds())) } + str.WriteString(")") - str.WriteString(fmt.Sprintf("%v ms", pt.Milliseconds())) + processingTimesStr = str.String() } - str.WriteString(")") - return str.String() + if isNew { + c.logger.Infof("received block %v:%v [0x%x] %v %v fork: %v", chainState.EpochOfSlot(slot), slot, root[:], source, processingTimesStr, forkId) + } else { + c.logger.Debugf("received known block %v:%v [0x%x] %v %v fork: %v", chainState.EpochOfSlot(slot), slot, root[:], source, processingTimesStr, forkId) + } } // runClientLoop runs the client event processing subroutine. @@ -145,11 +152,7 @@ func (c *Client) runClientLoop() error { return fmt.Errorf("failed loading head block: %v", err) } - if isNew { - c.logger.Infof("received block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes)) - } else { - c.logger.Debugf("received known block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes)) - } + c.emitBlockLogEntry(headSlot, headRoot, "head", isNew, headBlock.forkId, processingTimes) // 2 - backfill old blocks up to the finalization checkpoint or known in cache err = c.indexer.withBackfillTracker(func() error { @@ -288,13 +291,7 @@ func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, return nil, err } - chainState := c.client.GetPool().GetChainState() - - if isNew { - c.logger.Infof("received block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes)) - } else { - c.logger.Debugf("received known block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes)) - } + c.emitBlockLogEntry(slot, root, "stream", isNew, block.forkId, processingTimes) return block, nil } @@ -494,11 +491,7 @@ func (c *Client) backfillParentBlocks(headBlock *Block) error { } } - if isNewBlock { - c.logger.Infof("received block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes)) - } else { - c.logger.Debugf("received known block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes)) - } + c.emitBlockLogEntry(parentSlot, parentRoot, "backfill", isNewBlock, parentBlock.forkId, processingTimes) if parentSlot == 0 { c.logger.Debugf("backfill cache: reached gensis slot [0x%x]", parentRoot) diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index f0cebfe3..347aa68a 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -195,24 +195,36 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo cache.cacheMutex.Lock() defer cache.cacheMutex.Unlock() - closestForkId := ForkKey(0) - closestDistance := uint64(0) - for _, fork := range cache.forkMap { if fork.leafSlot >= finalizedSlot { continue } - isInFork, distance := cache.indexer.blockCache.getCanonicalDistance(fork.leafRoot, justifiedRoot, 0) - if isInFork && (closestForkId == 0 || distance < closestDistance) { - closestForkId = fork.forkId - closestDistance = distance + delete(cache.forkMap, fork.forkId) + } + + latestFinalizedBlock := cache.indexer.blockCache.getBlockByRoot(justifiedRoot) + finalizedForkId := ForkKey(0) + for { + if latestFinalizedBlock == nil { + break } - delete(cache.forkMap, fork.forkId) + finalizedForkId = latestFinalizedBlock.forkId + + if latestFinalizedBlock.Slot <= finalizedSlot { + break + } + + parentRoot := latestFinalizedBlock.GetParentRoot() + if parentRoot == nil { + break + } + + latestFinalizedBlock = cache.indexer.blockCache.getBlockByRoot(*parentRoot) } - cache.finalizedForkId = closestForkId + cache.finalizedForkId = finalizedForkId err := db.RunDBTransaction(func(tx *sqlx.Tx) error { return cache.updateForkState(tx) diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go index 12a6e344..d62ac73e 100644 --- a/indexer/beacon/forkdetection.go +++ b/indexer/beacon/forkdetection.go @@ -40,7 +40,14 @@ func (cache *forkCache) processBlock(block *Block) error { parentIsProcessed := false parentIsFinalized := false - if parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot); parentBlock == nil { + if block.Slot == 0 { + // genesis block + parentForkId = 0 + parentSlot = 0 + parentIsProcessed = false + parentIsFinalized = true + + } else if parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot); parentBlock == nil { // parent block might already be finalized, check if it's in the database blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) if blockHead != nil { @@ -98,7 +105,7 @@ func (cache *forkCache) processBlock(block *Block) error { } newForks = append(newForks, newFork) - fmt.Fprintf(&logbuf, ", head1: %v [%v, ? upd]", block.Slot, block.Root.String()) + fmt.Fprintf(&logbuf, ", head1(%v): %v [%v]", fork.forkId, block.Slot, block.Root.String()) } if !parentIsFinalized && len(otherChildren) == 1 { @@ -119,7 +126,7 @@ func (cache *forkCache) processBlock(block *Block) error { otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) cache.addFork(otherFork) - updatedRoots, updatedFork := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + updatedRoots, updatedFork, _ := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) newFork := &newForkInfo{ fork: otherFork, updateRoots: updatedRoots, @@ -130,12 +137,12 @@ func (cache *forkCache) processBlock(block *Block) error { updateForks = append(updateForks, updatedFork) } - fmt.Fprintf(&logbuf, ", head2: %v [%v, %v upd]", newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) + fmt.Fprintf(&logbuf, ", head2(%v): %v [%v, %v upd]", otherFork.forkId, newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) } } if logbuf.Len() > 0 { - cache.indexer.logger.Infof("new fork leaf detected (base %v [%v]%v)", parentSlot, parentRoot.String(), logbuf.String()) + cache.indexer.logger.Infof("new fork leaf detected (base(%v) %v [%v]%v)", parentForkId, parentSlot, parentRoot.String(), logbuf.String()) } } } @@ -161,7 +168,8 @@ func (cache *forkCache) processBlock(block *Block) error { fork := newFork(cache.lastForkId, block.Slot, block.Root, child, currentForkId) cache.addFork(fork) - updatedRoots, updatedFork := cache.updateForkBlocks(child, fork.forkId, false) + updatedRoots, updatedFork, headBlock := cache.updateForkBlocks(child, fork.forkId, false) + fork.headBlock = headBlock newFork := &newForkInfo{ fork: fork, updateRoots: updatedRoots, @@ -182,7 +190,7 @@ func (cache *forkCache) processBlock(block *Block) error { } // update fork ids of all blocks building on top of the current block - updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true) + updatedBlocks, updatedFork, headBlock := cache.updateForkBlocks(block, currentForkId, true) if updatedFork != nil { updateForks = append(updateForks, updatedFork) } @@ -195,10 +203,10 @@ func (cache *forkCache) processBlock(block *Block) error { fork := cache.getForkById(currentForkId) if fork != nil { lastBlock := block - if len(updatedBlocks) > 0 { - lastBlock = cache.indexer.blockCache.getBlockByRoot(phase0.Root(updatedBlocks[len(updatedBlocks)-1])) + if headBlock != nil && headBlock.Slot > lastBlock.Slot { + lastBlock = headBlock } - if lastBlock != nil && (fork.headBlock == nil || lastBlock.Slot > fork.headBlock.Slot) { + if fork.headBlock == nil || lastBlock.Slot > fork.headBlock.Slot { fork.headBlock = lastBlock } } @@ -282,11 +290,12 @@ func (cache *forkCache) processBlock(block *Block) error { } // updateForkBlocks updates the blocks building on top of the given block in the fork and returns the updated block roots. -func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) (blockRoots [][]byte, updatedFork *updateForkInfo) { +func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) (blockRoots [][]byte, updatedFork *updateForkInfo, headBlock *Block) { blockRoots = [][]byte{} if !skipStartBlock { blockRoots = append(blockRoots, startBlock.Root[:]) + headBlock = startBlock } for { @@ -321,6 +330,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip nextBlock.forkId = forkId blockRoots = append(blockRoots, nextBlock.Root[:]) + headBlock = nextBlock startBlock = nextBlock } diff --git a/indexer/beacon/writedb.go b/indexer/beacon/writedb.go index c9bf83a5..be393df2 100644 --- a/indexer/beacon/writedb.go +++ b/indexer/beacon/writedb.go @@ -5,6 +5,7 @@ import ( "math" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethpandaops/dora/clients/consensus" "github.com/ethpandaops/dora/db" "github.com/ethpandaops/dora/dbtypes" "github.com/ethpandaops/dora/utils" @@ -195,11 +196,26 @@ func (dbw *dbWriter) persistSyncAssignments(tx *sqlx.Tx, epoch phase0.Epoch, epo func (dbw *dbWriter) buildDbBlock(block *Block, epochStats *EpochStats, overrideForkId *ForkKey) *dbtypes.Slot { if block.Slot == 0 { // genesis block + header := block.GetHeader() + if header == nil { + // some clients do not serve the genesis block header, so add a fallback here + header = &phase0.SignedBeaconBlockHeader{ + Message: &phase0.BeaconBlockHeader{ + Slot: 0, + ProposerIndex: math.MaxInt64, + ParentRoot: consensus.NullRoot, + StateRoot: consensus.NullRoot, + }, + } + } + return &dbtypes.Slot{ - Slot: 0, - Proposer: math.MaxInt64, - Status: dbtypes.Canonical, - Root: block.Root[:], + Slot: 0, + Proposer: math.MaxInt64, + Status: dbtypes.Canonical, + Root: block.Root[:], + ParentRoot: header.Message.ParentRoot[:], + StateRoot: header.Message.StateRoot[:], } } From 2fe2e26afa6bcee7bef22d698aa17fd3b8ef1969 Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 28 Aug 2024 21:13:52 +0200 Subject: [PATCH 2/2] improve canonical chain selection --- .hack/devnet/run.sh | 2 +- indexer/beacon/canonical.go | 196 +++++++++++++++++++++----------- indexer/beacon/forkdetection.go | 4 +- indexer/beacon/indexer.go | 2 +- 4 files changed, 133 insertions(+), 71 deletions(-) diff --git a/.hack/devnet/run.sh b/.hack/devnet/run.sh index 18f860a3..ef087ff0 100755 --- a/.hack/devnet/run.sh +++ b/.hack/devnet/run.sh @@ -37,7 +37,7 @@ server: port: "8080" frontend: enabled: true - debug: false + debug: true pprof: true minimize: false siteName: "Dora the Explorer" diff --git a/indexer/beacon/canonical.go b/indexer/beacon/canonical.go index e72bbf30..065e5d27 100644 --- a/indexer/beacon/canonical.go +++ b/indexer/beacon/canonical.go @@ -2,9 +2,11 @@ package beacon import ( "bytes" + "fmt" "math" "slices" "sort" + "strings" "time" v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -15,10 +17,9 @@ const FarFutureEpoch = phase0.Epoch(math.MaxUint64) // ChainHead represents a head block of the chain. type ChainHead struct { - HeadBlock *Block // The head block of the chain. - AggregatedHeadVotes phase0.Gwei // The aggregated votes of the last 2 epochs for the head block. - LastEpochVotingPercent float64 // The voting percentage in the last epoch. - ThisEpochVotingPercent float64 // The voting percentage in the current epoch. + HeadBlock *Block // The head block of the chain. + AggregatedHeadVotes phase0.Gwei // The aggregated votes of the last 2 epochs for the head block. + PerEpochVotingPercent []float64 // The voting percentage in the last epochs (ascendeing order). } // GetCanonicalHead returns the canonical head block of the chain. @@ -47,12 +48,21 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block { if len(chainHeadCandidates) > 0 { sort.Slice(chainHeadCandidates, func(i, j int) bool { - if chainHeadCandidates[i].LastEpochVotingPercent != chainHeadCandidates[j].LastEpochVotingPercent { - return chainHeadCandidates[i].LastEpochVotingPercent > chainHeadCandidates[j].LastEpochVotingPercent + percentagesI := float64(0) + percentagesJ := float64(0) + for k := range chainHeadCandidates[i].PerEpochVotingPercent { + factor := float64(1) + if k == len(chainHeadCandidates[i].PerEpochVotingPercent)-1 { + factor = 0.5 + } + percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor + percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor } - if chainHeadCandidates[i].ThisEpochVotingPercent != chainHeadCandidates[j].ThisEpochVotingPercent { - return chainHeadCandidates[i].ThisEpochVotingPercent > chainHeadCandidates[j].ThisEpochVotingPercent + + if percentagesI != percentagesJ { + return percentagesI > percentagesJ } + return chainHeadCandidates[i].HeadBlock.Slot > chainHeadCandidates[j].HeadBlock.Slot }) @@ -70,12 +80,21 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead { heads := make([]*ChainHead, len(indexer.cachedChainHeads)) copy(heads, indexer.cachedChainHeads) sort.Slice(heads, func(i, j int) bool { - if heads[i].LastEpochVotingPercent != heads[j].LastEpochVotingPercent { - return heads[i].LastEpochVotingPercent > heads[j].LastEpochVotingPercent + percentagesI := float64(0) + percentagesJ := float64(0) + for k := range heads[i].PerEpochVotingPercent { + factor := float64(1) + if k == len(heads[i].PerEpochVotingPercent)-1 { + factor = 0.5 + } + percentagesI += heads[i].PerEpochVotingPercent[k] * factor + percentagesJ += heads[j].PerEpochVotingPercent[k] * factor } - if heads[i].ThisEpochVotingPercent != heads[j].ThisEpochVotingPercent { - return heads[i].ThisEpochVotingPercent > heads[j].ThisEpochVotingPercent + + if percentagesI != percentagesJ { + return percentagesI > percentagesJ } + return heads[i].HeadBlock.Slot > heads[j].HeadBlock.Slot }) @@ -103,6 +122,14 @@ func (indexer *Indexer) computeCanonicalChain() bool { var headBlock *Block = nil var chainHeads []*ChainHead = nil + + chainState := indexer.consensusPool.GetChainState() + specs := chainState.GetSpecs() + aggregateEpochs := (32 / specs.SlotsPerEpoch) + 1 // aggregate votes of last 48 slots (2 epochs for mainnet, 5 epochs for minimal config) + if aggregateEpochs < 2 { + aggregateEpochs = 2 + } + t1 := time.Now() defer func() { @@ -124,22 +151,26 @@ func (indexer *Indexer) computeCanonicalChain() bool { if len(latestBlocks) > 0 { headBlock = latestBlocks[0] - forkVotes, thisEpochPercent, lastEpochPercent := indexer.aggregateForkVotes(headBlock.forkId) + forkVotes, epochParticipation := indexer.aggregateForkVotes(headBlock.forkId, aggregateEpochs) + participationStr := make([]string, len(epochParticipation)) + for i, p := range epochParticipation { + participationStr[i] = fmt.Sprintf("%.2f%%", p) + } + indexer.logger.Debugf( - "fork %v votes in last 2 epochs: %v ETH (%.2f%%, %.2f%%), head: %v (%v)", + "fork %v votes in last %v epochs: %v ETH (%v), head: %v (%v)", headBlock.forkId, + aggregateEpochs, forkVotes/EtherGweiFactor, - lastEpochPercent, - thisEpochPercent, + strings.Join(participationStr, ", "), headBlock.Slot, headBlock.Root.String(), ) chainHeads = []*ChainHead{{ - HeadBlock: headBlock, - AggregatedHeadVotes: forkVotes, - LastEpochVotingPercent: lastEpochPercent, - ThisEpochVotingPercent: thisEpochPercent, + HeadBlock: headBlock, + AggregatedHeadVotes: forkVotes, + PerEpochVotingPercent: epochParticipation, }} } } else { @@ -153,22 +184,25 @@ func (indexer *Indexer) computeCanonicalChain() bool { continue } - forkVotes, thisEpochPercent, lastEpochPercent := indexer.aggregateForkVotes(fork.ForkId) + forkVotes, epochParticipation := indexer.aggregateForkVotes(fork.ForkId, aggregateEpochs) headForkVotes[fork.ForkId] = forkVotes chainHeads = append(chainHeads, &ChainHead{ - HeadBlock: fork.Block, - AggregatedHeadVotes: forkVotes, - LastEpochVotingPercent: lastEpochPercent, - ThisEpochVotingPercent: thisEpochPercent, + HeadBlock: fork.Block, + AggregatedHeadVotes: forkVotes, + PerEpochVotingPercent: epochParticipation, }) if forkVotes > 0 { + participationStr := make([]string, len(epochParticipation)) + for i, p := range epochParticipation { + participationStr[i] = fmt.Sprintf("%.2f%%", p) + } + indexer.logger.Infof( - "fork %v votes in last 2 epochs: %v ETH (%.2f%%, %.2f%%), head: %v (%v)", + "fork %v: votes in last 2 epochs: %v ETH (%v), head: %v (%v)", fork.ForkId, forkVotes/EtherGweiFactor, - lastEpochPercent, - thisEpochPercent, + strings.Join(participationStr, ", "), fork.Block.Slot, fork.Block.Root.String(), ) @@ -187,13 +221,19 @@ func (indexer *Indexer) computeCanonicalChain() bool { } // aggregateForkVotes aggregates the votes for a given fork. -func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gwei, thisEpochPercent float64, lastEpochPercent float64) { +func (indexer *Indexer) aggregateForkVotes(forkId ForkKey, epochLimit uint64) (totalVotes phase0.Gwei, epochPercent []float64) { chainState := indexer.consensusPool.GetChainState() specs := chainState.GetSpecs() currentEpoch := chainState.CurrentEpoch() + + epochPercent = make([]float64, 0, epochLimit) + if epochLimit == 0 { + return + } + minAggregateEpoch := currentEpoch - if minAggregateEpoch > 1 { - minAggregateEpoch -= 1 + if minAggregateEpoch > phase0.Epoch(epochLimit)-1 { + minAggregateEpoch -= phase0.Epoch(epochLimit) - 1 } else { minAggregateEpoch = 0 } @@ -206,12 +246,12 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw return } - // get all blocks for given fork (and its parents) from the last 2 epochs + // get all blocks for given fork (and its parents) from the last epochs lastBlocks := []*Block{} lastSlot := phase0.Slot(0) thisForkId := forkId for { - for _, block := range indexer.blockCache.getLatestBlocks(2*specs.SlotsPerEpoch, &thisForkId) { + for _, block := range indexer.blockCache.getLatestBlocks(epochLimit*specs.SlotsPerEpoch, &thisForkId) { lastSlot = block.Slot if block.Slot < minAggregateSlot { break @@ -236,52 +276,72 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw } // already sorted descending by getLatestBlocks, reverse to ascending for aggregation - lastBlock := lastBlocks[0] slices.Reverse(lastBlocks) - // aggregate votes for last & current epoch - if chainState.EpochOfSlot(lastBlock.Slot) == currentEpoch { - thisEpochDependent := indexer.blockCache.getDependentBlock(chainState, lastBlock, nil) - if thisEpochDependent == nil { - return - } - lastBlock = thisEpochDependent - - thisEpochStats := indexer.epochCache.getEpochStats(currentEpoch, thisEpochDependent.Root) - if thisEpochStats != nil { - thisBlocks := []*Block{} - for _, block := range lastBlocks { - if chainState.EpochOfSlot(block.Slot) == currentEpoch { - thisBlocks = append(thisBlocks, block) - } - } - - epochVotes := indexer.aggregateEpochVotes(currentEpoch, chainState, thisBlocks, thisEpochStats) - if epochVotes.AmountIsCount { - totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount * 32 * EtherGweiFactor + // aggregate votes per epoch + lastBlockIdx := 0 + for epoch := minAggregateEpoch; epoch <= currentEpoch; epoch++ { + epochVotingBlocks := []*Block{} + nextBlockIdx := 0 + for lastBlockIdx < len(lastBlocks) { + if chainState.EpochOfSlot(lastBlocks[lastBlockIdx].Slot) == epoch { + epochVotingBlocks = append(epochVotingBlocks, lastBlocks[lastBlockIdx]) + lastBlockIdx++ + } else if lastBlockIdx+nextBlockIdx < len(lastBlocks) && chainState.EpochOfSlot(lastBlocks[lastBlockIdx+nextBlockIdx].Slot) == epoch+1 { + epochVotingBlocks = append(epochVotingBlocks, lastBlocks[lastBlockIdx+nextBlockIdx]) + nextBlockIdx++ } else { - totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + break } - thisEpochPercent = epochVotes.TargetVotePercent } - } - if chainState.EpochOfSlot(lastBlock.Slot)+1 == currentEpoch { - lastEpochDependent := indexer.blockCache.getDependentBlock(chainState, lastBlock, nil) - if lastEpochDependent == nil { - return + if len(epochVotingBlocks) == 0 { + epochPercent = append(epochPercent, 0) + continue + } + + dependentRoot := epochVotingBlocks[0].GetParentRoot() + if dependentRoot == nil { + epochPercent = append(epochPercent, 0) + continue + } + + epochStats := indexer.epochCache.getEpochStats(epoch, *dependentRoot) + if epochStats == nil { + epochPercent = append(epochPercent, 0) + continue } - lastEpochStats := indexer.epochCache.getEpochStats(currentEpoch-1, lastEpochDependent.Root) - if lastEpochStats != nil { - epochVotes := indexer.aggregateEpochVotes(currentEpoch-1, chainState, lastBlocks, lastEpochStats) - if epochVotes.AmountIsCount { - totalVotes += (epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount) * 32 * EtherGweiFactor + epochVotes := indexer.aggregateEpochVotes(epoch, chainState, epochVotingBlocks, epochStats) + if epochVotes.AmountIsCount { + totalVotes += (epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount) * 32 * EtherGweiFactor + } else { + totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount + } + + lastBlock := epochVotingBlocks[len(epochVotingBlocks)-1] + epochProgress := float64(100) + + if chainState.EpochOfSlot(lastBlock.Slot) == epoch { + lastBlockIndex := chainState.SlotToSlotIndex(lastBlock.Slot) + if lastBlockIndex > 0 { + epochProgress = float64(100*lastBlockIndex) / float64(chainState.GetSpecs().SlotsPerEpoch) } else { - totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount + epochProgress = 0 } - lastEpochPercent = epochVotes.TargetVotePercent } + + var participationExtrapolation float64 + if epochProgress == 0 { + participationExtrapolation = 0 + } else { + participationExtrapolation = 100 * epochVotes.TargetVotePercent / epochProgress + } + if participationExtrapolation > 100 { + participationExtrapolation = 100 + } + + epochPercent = append(epochPercent, participationExtrapolation) } return diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go index d62ac73e..428c9c8c 100644 --- a/indexer/beacon/forkdetection.go +++ b/indexer/beacon/forkdetection.go @@ -126,7 +126,8 @@ func (cache *forkCache) processBlock(block *Block) error { otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) cache.addFork(otherFork) - updatedRoots, updatedFork, _ := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + updatedRoots, updatedFork, headBlock := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + otherFork.headBlock = headBlock newFork := &newForkInfo{ fork: otherFork, updateRoots: updatedRoots, @@ -295,6 +296,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip if !skipStartBlock { blockRoots = append(blockRoots, startBlock.Root[:]) + startBlock.forkId = forkId headBlock = startBlock } diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index 5345fc16..dcdee71d 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -278,7 +278,7 @@ func (indexer *Indexer) StartIndexer() { err = db.StreamUnfinalizedEpochs(uint64(finalizedEpoch), func(unfinalizedEpoch *dbtypes.UnfinalizedEpoch) { epochStats := indexer.epochCache.getEpochStats(phase0.Epoch(unfinalizedEpoch.Epoch), phase0.Root(unfinalizedEpoch.DependentRoot)) if epochStats == nil { - indexer.logger.Warnf("failed restoring epoch aggregations for epoch %v [%x] from db: epoch stats not found", unfinalizedEpoch.Epoch, unfinalizedEpoch.DependentRoot) + indexer.logger.Debugf("failed restoring epoch aggregations for epoch %v [%x] from db: epoch stats not found", unfinalizedEpoch.Epoch, unfinalizedEpoch.DependentRoot) return }