Skip to content

Commit

Permalink
add separate config setting for validator activity history length
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Nov 30, 2024
1 parent 2d524c2 commit db66236
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions .hack/devnet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ $(for node in $EXECUTION_NODES; do
done)
indexer:
inMemoryEpochs: 8
activityHistoryLength: 6
cachePersistenceDelay: 8
disableIndexWriter: false
syncEpochCooldown: 1
Expand Down
3 changes: 3 additions & 0 deletions config/default.config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ indexer:
# max number of epochs to keep in memory
inMemoryEpochs: 3

# number of epochs to keep validator activity history for (high memory usage for large validator sets)
activityHistoryLength: 6

# disable synchronizing historic data
disableSynchronizer: false

Expand Down
8 changes: 5 additions & 3 deletions handlers/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func buildValidatorPageData(validatorIndex uint64, tabView string) (*models.Vali
if pageData.TabView == "attestations" {
currentEpoch := uint64(chainState.CurrentEpoch())
cutOffEpoch := uint64(0)
if currentEpoch > uint64(services.GlobalBeaconService.GetBeaconIndexer().GetInMemoryEpochs()) {
cutOffEpoch = currentEpoch - uint64(services.GlobalBeaconService.GetBeaconIndexer().GetInMemoryEpochs())
if currentEpoch > uint64(services.GlobalBeaconService.GetBeaconIndexer().GetActivityHistoryLength()) {
cutOffEpoch = currentEpoch - uint64(services.GlobalBeaconService.GetBeaconIndexer().GetActivityHistoryLength())
} else {
cutOffEpoch = 0
}
Expand Down Expand Up @@ -248,7 +248,9 @@ func buildValidatorPageData(validatorIndex uint64, tabView string) (*models.Vali
pageData.RecentAttestations = append(pageData.RecentAttestations, attestation)
validatorActivityIdx++
}
if !found {

validatorStatus := v1.ValidatorToState(validator.Validator, &validator.Balance, epoch, beacon.FarFutureEpoch)
if !found && strings.HasPrefix(validatorStatus.String(), "active_") {
attestation := &models.ValidatorPageDataAttestation{
Epoch: uint64(epoch),
Status: 0,
Expand Down
10 changes: 7 additions & 3 deletions indexer/beacon/epochvotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
}

var epochStatsValues *EpochStatsValues
votesWithPrecalc := false
votesWithValues := false
if epochStats != nil {
epochStatsValues = epochStats.GetOrLoadValues(indexer, true, false)
votesWithPrecalc = epochStats.precalcValues != nil
votesWithValues = epochStats.ready
}
specs := chainState.GetSpecs()

Expand Down Expand Up @@ -150,7 +154,9 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
voteAmount := phase0.Gwei(0)
slotIndex := chainState.SlotToSlotIndex(attData.Slot)
updateActivity := func(validatorIndex phase0.ValidatorIndex) {
indexer.validatorCache.updateValidatorActivity(validatorIndex, epoch, attData.Slot, block)
if votesWithValues {
indexer.validatorCache.updateValidatorActivity(validatorIndex, epoch, attData.Slot, block)
}
}

if attVersioned.Version >= spec.DataVersionElectra {
Expand Down Expand Up @@ -223,8 +229,6 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
votes.TotalVotePercent = float64(votes.CurrentEpoch.TotalVoteAmount+votes.NextEpoch.TotalVoteAmount) * 100 / float64(epochStatsValues.EffectiveBalance)
}

votesWithValues := epochStats != nil && epochStats.ready
votesWithPrecalc := epochStats != nil && epochStats.precalcValues != nil
votesKey := getEpochVotesKey(epoch, targetRoot, blocks[len(blocks)-1].Root, uint8(len(blocks)), votesWithValues, votesWithPrecalc)

indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:])
Expand Down
10 changes: 8 additions & 2 deletions indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Indexer struct {
disableSync bool
blockCompression bool
inMemoryEpochs uint16
activityHistoryLength uint16
maxParallelStateCalls uint16

// caches
Expand Down Expand Up @@ -71,6 +72,10 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index
if inMemoryEpochs < 2 {
inMemoryEpochs = 2
}
activityHistoryLength := utils.Config.Indexer.ActivityHistoryLength
if activityHistoryLength == 0 {
activityHistoryLength = 6
}
maxParallelStateCalls := uint16(utils.Config.Indexer.MaxParallelValidatorSetRequests)
if maxParallelStateCalls < 2 {
maxParallelStateCalls = 2
Expand All @@ -88,6 +93,7 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index
disableSync: utils.Config.Indexer.DisableSynchronizer,
blockCompression: blockCompression,
inMemoryEpochs: inMemoryEpochs,
activityHistoryLength: activityHistoryLength,
maxParallelStateCalls: maxParallelStateCalls,

clients: make([]*Client, 0),
Expand All @@ -103,8 +109,8 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index
return indexer
}

func (indexer *Indexer) GetInMemoryEpochs() uint16 {
return indexer.inMemoryEpochs
func (indexer *Indexer) GetActivityHistoryLength() uint16 {
return indexer.activityHistoryLength
}

func (indexer *Indexer) getMinInMemoryEpoch() phase0.Epoch {
Expand Down
6 changes: 3 additions & 3 deletions indexer/beacon/validatorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
chainState := cache.indexer.consensusPool.GetChainState()
currentEpoch := chainState.CurrentEpoch()
cutOffEpoch := phase0.Epoch(0)
if currentEpoch > phase0.Epoch(cache.indexer.inMemoryEpochs) {
cutOffEpoch = currentEpoch - phase0.Epoch(cache.indexer.inMemoryEpochs)
if currentEpoch > phase0.Epoch(cache.indexer.activityHistoryLength) {
cutOffEpoch = currentEpoch - phase0.Epoch(cache.indexer.activityHistoryLength)
}

if epoch < cutOffEpoch {
Expand Down Expand Up @@ -227,7 +227,7 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
defer cache.activityMutex.Unlock()

if cachedValidator.recentActivity == nil {
cachedValidator.recentActivity = make([]ValidatorActivity, 0, cache.indexer.inMemoryEpochs)
cachedValidator.recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength)
}

replaceIndex := -1
Expand Down
3 changes: 3 additions & 0 deletions test-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ indexer:
# max number of epochs to keep in memory
inMemoryEpochs: 3

# number of epochs to keep validator activity history for (high memory usage for large validator sets)
activityHistoryLength: 6

# disable synchronizing historic data
disableSynchronizer: false

Expand Down
1 change: 1 addition & 0 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Config struct {
ResyncForceUpdate bool `yaml:"resyncForceUpdate" envconfig:"INDEXER_RESYNC_FORCE_UPDATE"`

InMemoryEpochs uint16 `yaml:"inMemoryEpochs" envconfig:"INDEXER_IN_MEMORY_EPOCHS"`
ActivityHistoryLength uint16 `yaml:"activityHistoryLength" envconfig:"INDEXER_ACTIVITY_HISTORY_LENGTH"`
DisableSynchronizer bool `yaml:"disableSynchronizer" envconfig:"INDEXER_DISABLE_SYNCHRONIZER"`
SyncEpochCooldown uint `yaml:"syncEpochCooldown" envconfig:"INDEXER_SYNC_EPOCH_COOLDOWN"`
MaxParallelValidatorSetRequests uint `yaml:"maxParallelValidatorSetRequests" envconfig:"INDEXER_MAX_PARALLEL_VALIDATOR_SET_REQUESTS"`
Expand Down

0 comments on commit db66236

Please sign in to comment.