Skip to content

Commit

Permalink
track validator votking activity
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Nov 28, 2024
1 parent cb3e0fa commit 874ce59
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 10 deletions.
13 changes: 9 additions & 4 deletions indexer/beacon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type CacheDebugStats struct {
ParentIdCacheMiss uint64
}
ValidatorCache struct {
Validators uint64
ValidatorDiffs uint64
ValidatorData uint64
PubkeyMap CacheDebugMapSize
Validators uint64
ValidatorDiffs uint64
ValidatorData uint64
ValidatorActivity uint64
PubkeyMap CacheDebugMapSize
}
}

Expand Down Expand Up @@ -161,6 +162,10 @@ func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats)
refs++
}
cacheStats.ValidatorCache.ValidatorDiffs += uint64(refs)

if validator.recentActivity != nil {
cacheStats.ValidatorCache.ValidatorActivity++
}
}

cacheStats.ValidatorCache.ValidatorData = uint64(len(validatorsMap))
Expand Down
10 changes: 7 additions & 3 deletions indexer/beacon/epochvotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain

voteAmount := phase0.Gwei(0)
slotIndex := chainState.SlotToSlotIndex(attData.Slot)
updateActivity := func(validatorIndex phase0.ValidatorIndex) {
indexer.validatorCache.updateValidatorActivity(validatorIndex, epoch, attData.Slot, slot, block.forkId)
}

if attVersioned.Version >= spec.DataVersionElectra {
// EIP-7549 changes the attestation aggregation
Expand All @@ -159,7 +162,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
}

if epochStatsValues != nil {
voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, activity)
voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, activity, updateActivity)
voteAmount += voteAmt
aggregationBitsOffset += committeeSize
} else {
Expand All @@ -171,7 +174,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
} else {
// pre electra attestation aggregation
if epochStatsValues != nil {
voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, activity)
voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, activity, updateActivity)
voteAmount += voteAmt
} else {
voteAmt := votes.aggregateVotesWithoutDuties(deduplicationMap, slotIndex, uint64(attData.Index), attAggregationBits, 1, 0)
Expand Down Expand Up @@ -217,7 +220,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
}

// aggregateVotes aggregates the votes for a specific slot and committee based on the provided epoch statistics, aggregation bits, and offset.
func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activity *EpochVoteActivity) (phase0.Gwei, uint64) {
func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activity *EpochVoteActivity, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) {
voteAmount := phase0.Gwei(0)

voteDuties := epochStatsValues.AttesterDuties[slotIndex][committee]
Expand All @@ -232,6 +235,7 @@ func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slot
voteAmount += phase0.Gwei(effectiveBalance) * EtherGweiFactor

activity.ActivityBitfield.SetBitAt(uint64(validatorIndex), true)
updateActivity(phase0.ValidatorIndex(validatorIndex))
}
}
return voteAmount, uint64(len(voteDuties))
Expand Down
43 changes: 40 additions & 3 deletions indexer/beacon/validatorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type validatorEntry struct {
index phase0.ValidatorIndex
finalValidator *phase0.Validator
validatorDiffs map[validatorDiffKey]*validatorDiff
recentActivity []validatorActivity
}

type validatorActivity struct {
dutySlot phase0.Slot
voteSlot phase0.Slot
forkId ForkKey
}

type validatorDiff struct {
Expand Down Expand Up @@ -166,6 +173,39 @@ func (cache *validatorCache) updateValidatorSet(epoch phase0.Epoch, dependentRoo
}
}

// updateValidatorActivity updates the validator activity cache.
func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.ValidatorIndex, epoch phase0.Epoch, dutySlot phase0.Slot, voteSlot phase0.Slot, forkId ForkKey) {
currentEpoch := cache.indexer.consensusPool.GetChainState().CurrentEpoch()
if epoch+phase0.Epoch(cache.indexer.inMemoryEpochs) < currentEpoch {
// ignore old activity
return
}

cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()

if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) {
return
}

cachedValidator := cache.valsetCache[validatorIndex]
if cachedValidator == nil {
return
}

if cachedValidator.recentActivity == nil {
cachedValidator.recentActivity = make([]validatorActivity, cache.indexer.inMemoryEpochs)
}

activityIndex := epoch % phase0.Epoch(cache.indexer.inMemoryEpochs)

cachedValidator.recentActivity[activityIndex] = validatorActivity{
dutySlot: dutySlot,
voteSlot: voteSlot,
forkId: forkId,
}
}

// setFinalizedEpoch sets the last finalized epoch.
func (cache *validatorCache) setFinalizedEpoch(epochStats *EpochStats) {
cache.cacheMutex.Lock()
Expand Down Expand Up @@ -251,9 +291,6 @@ func (cache *validatorCache) getValidatorSetForRoot(blockRoot phase0.Root) []*ph

// getValidatorByIndex returns the validator by index for a given forkId.
func (cache *validatorCache) getValidatorByIndex(index phase0.ValidatorIndex, overrideForkId *ForkKey) *phase0.Validator {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()

canonicalHead := cache.indexer.GetCanonicalHead(overrideForkId)
if canonicalHead == nil {
return nil
Expand Down

0 comments on commit 874ce59

Please sign in to comment.