From 47e22bbe33582487d33a3986b4d4f2767c47d0e1 Mon Sep 17 00:00:00 2001 From: pk910 Date: Thu, 2 May 2024 12:37:02 +0200 Subject: [PATCH] add deposit related schemas & deposit indexer --- db/deposits.go | 123 ++++++++++++++++++ .../pgsql/20240429121225_deposit-index.sql | 70 ++++++++++ .../sqlite/20240429121225_deposit-index.sql | 69 ++++++++++ dbtypes/dbtypes.go | 27 ++++ indexer/cache_logic.go | 27 ++-- indexer/epoch_stats.go | 58 ++++++--- indexer/{block_ssz.go => ssz_helper.go} | 0 indexer/state_helper.go | 19 +++ indexer/synchronizer.go | 2 +- indexer/votes.go | 10 +- indexer/write_db.go | 97 ++++++++++++-- rpc/beaconapi.go | 16 +++ 12 files changed, 476 insertions(+), 42 deletions(-) create mode 100644 db/deposits.go create mode 100644 db/schema/pgsql/20240429121225_deposit-index.sql create mode 100644 db/schema/sqlite/20240429121225_deposit-index.sql rename indexer/{block_ssz.go => ssz_helper.go} (100%) create mode 100644 indexer/state_helper.go diff --git a/db/deposits.go b/db/deposits.go new file mode 100644 index 00000000..785d0e1c --- /dev/null +++ b/db/deposits.go @@ -0,0 +1,123 @@ +package db + +import ( + "fmt" + "strings" + + "github.com/ethpandaops/dora/dbtypes" + "github.com/jmoiron/sqlx" +) + +func InsertDepositTxs(depositTxs []*dbtypes.DepositTx, tx *sqlx.Tx) error { + var sql strings.Builder + fmt.Fprint(&sql, + EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: "INSERT INTO deposit_txs ", + dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO deposit_txs ", + }), + "(deposit_index, block_number, block_root, publickey, withdrawalcredentials, amount, signature, valid_signature, orphaned, tx_hash, tx_sender, tx_origin, tx_target)", + " VALUES ", + ) + argIdx := 0 + fieldCount := 13 + + args := make([]any, len(depositTxs)*fieldCount) + for i, depositTx := range depositTxs { + if i > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "(") + for f := 0; f < fieldCount; f++ { + if f > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "$%v", argIdx+f+1) + + } + fmt.Fprintf(&sql, ")") + + args[argIdx+0] = depositTx.Index + args[argIdx+1] = depositTx.BlockNumber + args[argIdx+2] = depositTx.BlockRoot + args[argIdx+3] = depositTx.PublicKey + args[argIdx+4] = depositTx.WithdrawalCredentials + args[argIdx+5] = depositTx.Amount + args[argIdx+6] = depositTx.Signature + args[argIdx+7] = depositTx.ValidSignature + args[argIdx+8] = depositTx.Orphaned + args[argIdx+9] = depositTx.TxTash + args[argIdx+10] = depositTx.TxSender + args[argIdx+11] = depositTx.TxOrigin + args[argIdx+12] = depositTx.TxTarget + argIdx += fieldCount + } + fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: " ON CONFLICT (deposit_index, block_root) DO UPDATE SET orphaned = excluded.orphaned", + dbtypes.DBEngineSqlite: "", + })) + _, err := tx.Exec(sql.String(), args...) + if err != nil { + return err + } + return nil +} + +type Deposit struct { + Index uint64 `db:"deposit_index"` + SlotNumber uint64 `db:"slot_number"` + SlotIndex uint64 `db:"slot_index"` + SlotRoot []byte `db:"slot_root"` + Orphaned bool `db:"orphaned"` + PublicKey []byte `db:"publickey"` + WithdrawalCredentials []byte `db:"withdrawalcredentials"` + Amount uint64 `db:"amount"` +} + +func InsertDeposits(deposits []*dbtypes.Deposit, tx *sqlx.Tx) error { + var sql strings.Builder + fmt.Fprint(&sql, + EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: "INSERT INTO deposits ", + dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO deposits ", + }), + "(deposit_index, slot_number, slot_index, slot_root, orphaned, publickey, withdrawalcredentials, amount)", + " VALUES ", + ) + argIdx := 0 + fieldCount := 8 + + args := make([]any, len(deposits)*fieldCount) + for i, deposit := range deposits { + if i > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "(") + for f := 0; f < fieldCount; f++ { + if f > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "$%v", argIdx+f+1) + + } + fmt.Fprintf(&sql, ")") + + args[argIdx+0] = deposit.Index + args[argIdx+1] = deposit.SlotNumber + args[argIdx+2] = deposit.SlotIndex + args[argIdx+3] = deposit.SlotRoot + args[argIdx+4] = deposit.Orphaned + args[argIdx+5] = deposit.PublicKey + args[argIdx+6] = deposit.WithdrawalCredentials + args[argIdx+7] = deposit.Amount + argIdx += fieldCount + } + fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: " ON CONFLICT (slot_index, slot_root) DO UPDATE SET deposit_index = excluded.deposit_index, orphaned = excluded.orphaned", + dbtypes.DBEngineSqlite: "", + })) + _, err := tx.Exec(sql.String(), args...) + if err != nil { + return err + } + return nil +} diff --git a/db/schema/pgsql/20240429121225_deposit-index.sql b/db/schema/pgsql/20240429121225_deposit-index.sql new file mode 100644 index 00000000..4e6acb1b --- /dev/null +++ b/db/schema/pgsql/20240429121225_deposit-index.sql @@ -0,0 +1,70 @@ +-- +goose Up +-- +goose StatementBegin + +CREATE TABLE IF NOT EXISTS deposit_txs ( + deposit_index INT NOT NULL, + block_number INT NOT NULL, + block_root bytea NOT NULL, + publickey bytea NOT NULL, + withdrawalcredentials bytea NOT NULL, + amount BIGINT NOT NULL, + signature bytea NOT NULL, + valid_signature bool NOT NULL DEFAULT TRUE, + orphaned bool NOT NULL DEFAULT FALSE, + tx_hash bytea NOT NULL, + tx_sender bytea NOT NULL, + tx_origin bytea NOT NULL, + tx_target bytea NOT NULL, + CONSTRAINT deposit_txs_pkey PRIMARY KEY (deposit_index, block_root) +); + +CREATE INDEX IF NOT EXISTS "deposit_txs_deposit_index_idx" + ON public."deposit_txs" + ("deposit_index" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "deposit_txs_publickey_idx" + ON public."deposit_txs" + ("publickey" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "deposit_txs_tx_sender_idx" + ON public."deposit_txs" + ("tx_sender" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "deposit_txs_tx_origin_idx" + ON public."deposit_txs" + ("tx_origin" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "deposit_txs_tx_target_idx" + ON public."deposit_txs" + ("tx_target" ASC NULLS FIRST); + +CREATE TABLE IF NOT EXISTS deposits ( + deposit_index INT NULL, + slot_number INT NOT NULL, + slot_index INT NOT NULL, + slot_root bytea NOT NULL, + orphaned bool NOT NULL DEFAULT FALSE, + publickey bytea NOT NULL, + withdrawalcredentials bytea NOT NULL, + amount BIGINT NOT NULL, + CONSTRAINT deposit_pkey PRIMARY KEY (slot_index, slot_root) +); + +CREATE INDEX IF NOT EXISTS "deposits_deposit_index_idx" + ON public."deposits" + ("deposit_index" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "deposits_slot_number_idx" + ON public."deposits" + ("slot_number" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "deposits_publickey_idx" + ON public."deposits" + ("publickey" ASC NULLS FIRST); + + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/db/schema/sqlite/20240429121225_deposit-index.sql b/db/schema/sqlite/20240429121225_deposit-index.sql new file mode 100644 index 00000000..62d70bc2 --- /dev/null +++ b/db/schema/sqlite/20240429121225_deposit-index.sql @@ -0,0 +1,69 @@ +-- +goose Up +-- +goose StatementBegin + +CREATE TABLE IF NOT EXISTS deposit_txs ( + deposit_index INT NOT NULL, + block_number INT NOT NULL, + block_root BLOB NOT NULL, + publickey BLOB NOT NULL, + withdrawalcredentials BLOB NOT NULL, + amount BIGINT NOT NULL, + signature BLOB NOT NULL, + valid_signature bool NOT NULL DEFAULT TRUE, + orphaned bool NOT NULL DEFAULT FALSE, + tx_hash BLOB NOT NULL, + tx_sender BLOB NOT NULL, + tx_origin BLOB NOT NULL, + tx_target BLOB NOT NULL, + CONSTRAINT deposit_txs_pkey PRIMARY KEY (deposit_index, block_root) +); + +CREATE INDEX IF NOT EXISTS "deposit_txs_deposit_index_idx" + ON "deposit_txs" + ("deposit_index" ASC); + +CREATE INDEX IF NOT EXISTS "deposit_txs_publickey_idx" + ON "deposit_txs" + ("publickey" ASC); + +CREATE INDEX IF NOT EXISTS "deposit_txs_tx_sender_idx" + ON "deposit_txs" + ("tx_sender" ASC); + +CREATE INDEX IF NOT EXISTS "deposit_txs_tx_origin_idx" + ON "deposit_txs" + ("tx_origin" ASC); + +CREATE INDEX IF NOT EXISTS "deposit_txs_tx_target_idx" + ON "deposit_txs" + ("tx_target" ASC); + +CREATE TABLE IF NOT EXISTS deposits ( + deposit_index INT NULL, + slot_number INT NOT NULL, + slot_index INT NOT NULL, + slot_root BLOB NOT NULL, + orphaned bool NOT NULL DEFAULT FALSE, + publickey BLOB NOT NULL, + withdrawalcredentials BLOB NOT NULL, + amount BIGINT NOT NULL, + CONSTRAINT deposit_pkey PRIMARY KEY (slot_index, slot_root) +); + +CREATE INDEX IF NOT EXISTS "deposits_deposit_index_idx" + ON "deposits" + ("deposit_index" ASC); + +CREATE INDEX IF NOT EXISTS "deposits_slot_number_idx" + ON "deposits" + ("slot_number" ASC); + +CREATE INDEX IF NOT EXISTS "deposits_publickey_idx" + ON "deposits" + ("publickey" ASC); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go index b549c9c6..8e62d0e9 100644 --- a/dbtypes/dbtypes.go +++ b/dbtypes/dbtypes.go @@ -127,3 +127,30 @@ type TxPendingFunctionSignature struct { Bytes []byte `db:"bytes"` QueueTime uint64 `db:"queuetime"` } + +type DepositTx struct { + Index uint64 `db:"deposit_index"` + BlockNumber uint64 `db:"block_number"` + BlockRoot []byte `db:"block_root"` + PublicKey []byte `db:"publickey"` + WithdrawalCredentials []byte `db:"withdrawalcredentials"` + Amount uint64 `db:"amount"` + Signature []byte `db:"signature"` + ValidSignature bool `db:"valid_signature"` + Orphaned bool `db:"orphaned"` + TxTash []byte `db:"tx_hash"` + TxSender []byte `db:"tx_sender"` + TxOrigin []byte `db:"tx_origin"` + TxTarget []byte `db:"tx_target"` +} + +type Deposit struct { + Index *uint64 `db:"deposit_index"` + SlotNumber uint64 `db:"slot_number"` + SlotIndex uint64 `db:"slot_index"` + SlotRoot []byte `db:"slot_root"` + Orphaned bool `db:"orphaned"` + PublicKey []byte `db:"publickey"` + WithdrawalCredentials []byte `db:"withdrawalcredentials"` + Amount uint64 `db:"amount"` +} diff --git a/indexer/cache_logic.go b/indexer/cache_logic.go index 135be3e8..d1b34ccf 100644 --- a/indexer/cache_logic.go +++ b/indexer/cache_logic.go @@ -212,8 +212,8 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error { // calculate votes epochVotes := aggregateEpochVotes(canonicalMap, epoch, epochStats, epochTarget, false, true) - if epochStats.validatorStats != nil { - logger.Infof("epoch %v stats: %v validators (%v)", epoch, epochStats.validatorStats.ValidatorCount, epochStats.validatorStats.EligibleAmount) + if epochStats.stateStats != nil { + logger.Infof("epoch %v stats: %v validators (%v)", epoch, epochStats.stateStats.ValidatorCount, epochStats.stateStats.EligibleAmount) } logger.Infof("epoch %v votes: target %v + %v = %v", epoch, epochVotes.currentEpoch.targetVoteAmount, epochVotes.nextEpoch.targetVoteAmount, epochVotes.currentEpoch.targetVoteAmount+epochVotes.nextEpoch.targetVoteAmount) logger.Infof("epoch %v votes: head %v + %v = %v", epoch, epochVotes.currentEpoch.headVoteAmount, epochVotes.nextEpoch.headVoteAmount, epochVotes.currentEpoch.headVoteAmount+epochVotes.nextEpoch.headVoteAmount) @@ -246,8 +246,11 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error { if !block.IsReady() { continue } - dbBlock := buildDbBlock(block, nil) - db.InsertSlot(dbBlock, tx) + + err := persistBlockData(block, nil, nil, false, tx) + if err != nil { + logger.Errorf("error while persisting slot: %v", err) + } } } @@ -311,17 +314,16 @@ func (cache *indexerCache) processOrphanedBlocks(processedEpoch int64) error { if !block.IsReady() { continue } - dbBlock := buildDbBlock(block, cache.getEpochStats(utils.EpochOfSlot(block.Slot), nil)) - if block.IsCanonical(cache.indexer, cache.justifiedRoot) { + isCanonical := block.IsCanonical(cache.indexer, cache.justifiedRoot) + if isCanonical { logger.Warnf("canonical block in orphaned block processing: %v [0x%x]", block.Slot, block.Root) } else { - dbBlock.Status = dbtypes.Orphaned db.InsertOrphanedBlock(block.buildOrphanedBlock(), tx) } - err := db.InsertSlot(dbBlock, tx) + err := persistBlockData(block, cache.getEpochStats(utils.EpochOfSlot(block.Slot), nil), nil, !isCanonical, tx) if err != nil { - logger.Errorf("failed inserting orphaned block: %v", err) + logger.Errorf("error while persisting orphaned slot: %v", err) } } @@ -439,6 +441,13 @@ func (cache *indexerCache) processCachePersistence() error { logger.Errorf("error inserting unfinalized block: %v", err) return err } + + err = persistBlockDeposits(block, nil, true, tx) + if err != nil { + logger.Errorf("error persisting unfinalized deposits: %v", err) + return err + } + block.isInDb = true } } diff --git a/indexer/epoch_stats.go b/indexer/epoch_stats.go index 63d42983..2482c793 100644 --- a/indexer/epoch_stats.go +++ b/indexer/epoch_stats.go @@ -7,6 +7,7 @@ import ( "sync" v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/dora/db" @@ -22,20 +23,21 @@ type EpochStats struct { seenCount uint64 isInDb bool dutiesMutex sync.RWMutex - validatorsMutex sync.RWMutex proposerAssignments map[uint64]uint64 attestorAssignments map[string][]uint64 syncAssignments []uint64 - validatorStats *EpochValidatorStats + stateStatsMutex sync.RWMutex + stateStats *EpochStateStats dbEpochMutex sync.Mutex dbEpochCache *dbtypes.Epoch } -type EpochValidatorStats struct { +type EpochStateStats struct { ValidatorCount uint64 ValidatorBalance uint64 EligibleAmount uint64 ValidatorBalances map[uint64]uint64 + DepositIndex uint64 } func (cache *indexerCache) getEpochStats(epoch uint64, dependendRoot []byte) *EpochStats { @@ -106,11 +108,11 @@ func (epochStats *EpochStats) IsReady() bool { return true } -func (epochStats *EpochStats) IsValidatorsReady() bool { - if !epochStats.validatorsMutex.TryRLock() { +func (epochStats *EpochStats) IsStateStatsReady() bool { + if !epochStats.stateStatsMutex.TryRLock() { return false } - epochStats.validatorsMutex.RUnlock() + epochStats.stateStatsMutex.RUnlock() return true } @@ -282,7 +284,7 @@ func (epochStats *EpochStats) ensureEpochStatsLazy(client *ConsensusClient, prop } // load validators - if epochStats.validatorStats == nil { + if epochStats.stateStats == nil { go epochStats.ensureValidatorStatsLazy(client, epochStats.dependentStateRef) } @@ -341,33 +343,57 @@ func (epochStats *EpochStats) ensureValidatorStatsLazy(client *ConsensusClient, } func (epochStats *EpochStats) loadValidatorStats(client *ConsensusClient, stateRef string) { - epochStats.validatorsMutex.Lock() - defer epochStats.validatorsMutex.Unlock() - if epochStats.validatorStats != nil { + epochStats.stateStatsMutex.Lock() + defer epochStats.stateStatsMutex.Unlock() + if epochStats.stateStats != nil { return } // `lock` concurrency limit (limit concurrent get validators calls) client.indexerCache.validatorLoadingLimiter <- 1 - var epochValidators map[phase0.ValidatorIndex]*v1.Validator + var epochState *spec.VersionedBeaconState var err error if epochStats.Epoch == 0 { - epochValidators, err = client.rpcClient.GetStateValidators("genesis") + epochState, err = client.rpcClient.GetState("genesis") } else { - epochValidators, err = client.rpcClient.GetStateValidators(stateRef) + epochState, err = client.rpcClient.GetState(stateRef) } // `unlock` concurrency limit <-client.indexerCache.validatorLoadingLimiter if err != nil { - logger.Warnf("error fetching epoch %v validators: %v", epochStats.Epoch, err) + logger.Warnf("error fetching epoch %v state: %v", epochStats.Epoch, err) return } + + validatorList, err := epochState.Validators() + if err != nil { + logger.Warnf("error getting validators from epoch %v state: %v", epochStats.Epoch, err) + return + } + validatorBalances, err := epochState.ValidatorBalances() + if err != nil { + logger.Warnf("error getting validator balances from epoch %v state: %v", epochStats.Epoch, err) + return + } + + epochValidators := map[phase0.ValidatorIndex]*v1.Validator{} + for idx := range validatorList { + state := v1.ValidatorToState(validatorList[idx], &validatorBalances[idx], phase0.Epoch(epochStats.Epoch), 18446744073709551615) + epochValidators[phase0.ValidatorIndex(idx)] = &v1.Validator{ + Index: phase0.ValidatorIndex(idx), + Balance: validatorBalances[idx], + Validator: validatorList[idx], + Status: state, + } + } + client.indexerCache.setLastValidators(epochStats.Epoch, epochValidators) - validatorStats := &EpochValidatorStats{ + validatorStats := &EpochStateStats{ ValidatorBalances: make(map[uint64]uint64), + DepositIndex: getDepositIndexFromState(epochState), } for _, validator := range epochValidators { validatorStats.ValidatorBalances[uint64(validator.Index)] = uint64(validator.Validator.EffectiveBalance) @@ -377,5 +403,5 @@ func (epochStats *EpochStats) loadValidatorStats(client *ConsensusClient, stateR validatorStats.EligibleAmount += uint64(validator.Validator.EffectiveBalance) } } - epochStats.validatorStats = validatorStats + epochStats.stateStats = validatorStats } diff --git a/indexer/block_ssz.go b/indexer/ssz_helper.go similarity index 100% rename from indexer/block_ssz.go rename to indexer/ssz_helper.go diff --git a/indexer/state_helper.go b/indexer/state_helper.go new file mode 100644 index 00000000..d67ce870 --- /dev/null +++ b/indexer/state_helper.go @@ -0,0 +1,19 @@ +package indexer + +import "github.com/attestantio/go-eth2-client/spec" + +func getDepositIndexFromState(state *spec.VersionedBeaconState) uint64 { + switch state.Version { + case spec.DataVersionPhase0: + return state.Phase0.ETH1DepositIndex + case spec.DataVersionAltair: + return state.Altair.ETH1DepositIndex + case spec.DataVersionBellatrix: + return state.Bellatrix.ETH1DepositIndex + case spec.DataVersionCapella: + return state.Capella.ETH1DepositIndex + case spec.DataVersionDeneb: + return state.Deneb.ETH1DepositIndex + } + return 0 +} diff --git a/indexer/synchronizer.go b/indexer/synchronizer.go index 2737dda5..a0410275 100644 --- a/indexer/synchronizer.go +++ b/indexer/synchronizer.go @@ -231,7 +231,7 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, retryCount int, lastT } epochStats.loadValidatorStats(client, epochAssignments.DependendStateRef) - if epochStats.validatorStats == nil && !lastTry { + if epochStats.stateStats == nil && !lastTry { return false, client, fmt.Errorf("error fetching validator stats for epoch %v: %v", syncEpoch, err) } if sync.checkKillChan(0) { diff --git a/indexer/votes.go b/indexer/votes.go index bbeccd04..365e6798 100644 --- a/indexer/votes.go +++ b/indexer/votes.go @@ -37,13 +37,13 @@ func aggregateEpochVotes(blockMap map[uint64]*CacheBlock, epoch uint64, epochSta if awaitDutiesLoaded { epochStats.dutiesMutex.RLock() defer epochStats.dutiesMutex.RUnlock() - epochStats.validatorsMutex.RLock() - defer epochStats.validatorsMutex.RUnlock() + epochStats.stateStatsMutex.RLock() + defer epochStats.stateStatsMutex.RUnlock() } votes := EpochVotes{ ActivityMap: map[uint64]bool{}, - VoteCounts: epochStats.validatorStats == nil, + VoteCounts: epochStats.stateStats == nil, } for slot := firstSlot; slot <= lastSlot; slot++ { @@ -77,8 +77,8 @@ func aggregateEpochVotes(blockMap map[uint64]*CacheBlock, epoch uint64, epochSta if votes.ActivityMap[validatorIdx] { continue } - if epochStats.validatorStats != nil { - voteAmount += uint64(epochStats.validatorStats.ValidatorBalances[validatorIdx]) + if epochStats.stateStats != nil { + voteAmount += uint64(epochStats.stateStats.ValidatorBalances[validatorIdx]) } else { voteAmount += 1 } diff --git a/indexer/write_db.go b/indexer/write_db.go index 873ee18a..bc25009f 100644 --- a/indexer/write_db.go +++ b/indexer/write_db.go @@ -53,6 +53,27 @@ func persistMissedSlots(epoch uint64, blockMap map[uint64]*CacheBlock, epochStat return nil } +func persistBlockData(block *CacheBlock, epochStats *EpochStats, depositIndex *uint64, orphaned bool, tx *sqlx.Tx) error { + // insert block + dbBlock := buildDbBlock(block, epochStats) + if orphaned { + dbBlock.Status = dbtypes.Orphaned + } + + err := db.InsertSlot(dbBlock, tx) + if err != nil { + return fmt.Errorf("error inserting slot: %v", err) + } + + // insert deposits + err = persistBlockDeposits(block, depositIndex, orphaned, tx) + if err != nil { + return err + } + + return nil +} + func persistEpochData(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats *EpochStats, epochVotes *EpochVotes, tx *sqlx.Tx) error { commitTx := false if tx == nil { @@ -66,12 +87,10 @@ func persistEpochData(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats commitTx = true } - dbEpoch := buildDbEpoch(epoch, blockMap, epochStats, epochVotes, func(block *CacheBlock) { - // insert block - dbBlock := buildDbBlock(block, epochStats) - err := db.InsertSlot(dbBlock, tx) + dbEpoch := buildDbEpoch(epoch, blockMap, epochStats, epochVotes, func(block *CacheBlock, depositIndex *uint64) { + err := persistBlockData(block, epochStats, depositIndex, false, tx) if err != nil { - logger.Errorf("error inserting slot: %v", err) + logger.Errorf("error persisting slot: %v", err) } }) @@ -198,12 +217,13 @@ func buildDbBlock(block *CacheBlock, epochStats *EpochStats) *dbtypes.Slot { return &dbBlock } -func buildDbEpoch(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats *EpochStats, epochVotes *EpochVotes, blockFn func(block *CacheBlock)) *dbtypes.Epoch { +func buildDbEpoch(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats *EpochStats, epochVotes *EpochVotes, blockFn func(block *CacheBlock, depositIndex *uint64)) *dbtypes.Epoch { firstSlot := epoch * utils.Config.Chain.Config.SlotsPerEpoch lastSlot := firstSlot + (utils.Config.Chain.Config.SlotsPerEpoch) - 1 totalSyncAssigned := 0 totalSyncVoted := 0 + var depositIndex *uint64 dbEpoch := dbtypes.Epoch{ Epoch: epoch, } @@ -212,10 +232,12 @@ func buildDbEpoch(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats *Epo dbEpoch.VotedHead = epochVotes.currentEpoch.headVoteAmount + epochVotes.nextEpoch.headVoteAmount dbEpoch.VotedTotal = epochVotes.currentEpoch.totalVoteAmount + epochVotes.nextEpoch.totalVoteAmount } - if epochStats != nil && epochStats.validatorStats != nil { - dbEpoch.ValidatorCount = epochStats.validatorStats.ValidatorCount - dbEpoch.ValidatorBalance = epochStats.validatorStats.ValidatorBalance - dbEpoch.Eligible = epochStats.validatorStats.EligibleAmount + if epochStats != nil && epochStats.stateStats != nil { + dbEpoch.ValidatorCount = epochStats.stateStats.ValidatorCount + dbEpoch.ValidatorBalance = epochStats.stateStats.ValidatorBalance + dbEpoch.Eligible = epochStats.stateStats.EligibleAmount + depositIndexField := epochStats.stateStats.DepositIndex + depositIndex = &depositIndexField } // aggregate blocks @@ -230,7 +252,7 @@ func buildDbEpoch(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats *Epo continue } if blockFn != nil { - blockFn(block) + blockFn(block, depositIndex) } attestations, _ := blockBody.Attestations() @@ -276,3 +298,56 @@ func buildDbEpoch(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats *Epo return &dbEpoch } + +func persistBlockDeposits(block *CacheBlock, depositIndex *uint64, orphaned bool, tx *sqlx.Tx) error { + // insert deposits + dbDeposits := buildDbDeposits(block, depositIndex) + if orphaned { + for idx := range dbDeposits { + dbDeposits[idx].Orphaned = true + } + } + + if len(dbDeposits) > 0 { + err := db.InsertDeposits(dbDeposits, tx) + if err != nil { + return fmt.Errorf("error inserting deposits: %v", err) + } + } + + return nil +} + +func buildDbDeposits(block *CacheBlock, depositIndex *uint64) []*dbtypes.Deposit { + blockBody := block.GetBlockBody() + if blockBody == nil { + return nil + } + + deposits, err := blockBody.Deposits() + if err != nil { + return nil + } + + dbDeposits := make([]*dbtypes.Deposit, len(deposits)) + for idx, deposit := range deposits { + dbDeposit := &dbtypes.Deposit{ + SlotNumber: block.Slot, + SlotIndex: uint64(idx), + SlotRoot: block.Root, + Orphaned: false, + PublicKey: deposit.Data.PublicKey[:], + WithdrawalCredentials: deposit.Data.WithdrawalCredentials, + Amount: uint64(deposit.Data.Amount), + } + if depositIndex != nil { + cDepIdx := *depositIndex + dbDeposit.Index = &cDepIdx + *depositIndex++ + } + + dbDeposits[idx] = dbDeposit + } + + return nil +} diff --git a/rpc/beaconapi.go b/rpc/beaconapi.go index 5b25aa7d..214ffc26 100644 --- a/rpc/beaconapi.go +++ b/rpc/beaconapi.go @@ -387,6 +387,22 @@ func (bc *BeaconClient) GetSyncCommitteeDuties(stateRef string, epoch uint64) (* return result.Data, nil } +func (bc *BeaconClient) GetState(stateRef string) (*spec.VersionedBeaconState, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + provider, isProvider := bc.clientSvc.(eth2client.BeaconStateProvider) + if !isProvider { + return nil, fmt.Errorf("get validators not supported") + } + result, err := provider.BeaconState(ctx, &api.BeaconStateOpts{ + State: stateRef, + }) + if err != nil { + return nil, err + } + return result.Data, nil +} + func (bc *BeaconClient) GetStateValidators(stateRef string) (map[phase0.ValidatorIndex]*v1.Validator, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()