Skip to content

Commit

Permalink
add deposit related schemas & deposit indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed May 2, 2024
1 parent 617221a commit 47e22bb
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 42 deletions.
123 changes: 123 additions & 0 deletions db/deposits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package db

import (
"fmt"
"strings"

"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
)

func InsertDepositTxs(depositTxs []*dbtypes.DepositTx, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO deposit_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO deposit_txs ",
}),
"(deposit_index, block_number, block_root, publickey, withdrawalcredentials, amount, signature, valid_signature, orphaned, tx_hash, tx_sender, tx_origin, tx_target)",
" VALUES ",
)
argIdx := 0
fieldCount := 13

args := make([]any, len(depositTxs)*fieldCount)
for i, depositTx := range depositTxs {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = depositTx.Index
args[argIdx+1] = depositTx.BlockNumber
args[argIdx+2] = depositTx.BlockRoot
args[argIdx+3] = depositTx.PublicKey
args[argIdx+4] = depositTx.WithdrawalCredentials
args[argIdx+5] = depositTx.Amount
args[argIdx+6] = depositTx.Signature
args[argIdx+7] = depositTx.ValidSignature
args[argIdx+8] = depositTx.Orphaned
args[argIdx+9] = depositTx.TxTash
args[argIdx+10] = depositTx.TxSender
args[argIdx+11] = depositTx.TxOrigin
args[argIdx+12] = depositTx.TxTarget
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (deposit_index, block_root) DO UPDATE SET orphaned = excluded.orphaned",
dbtypes.DBEngineSqlite: "",
}))
_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

type Deposit struct {
Index uint64 `db:"deposit_index"`
SlotNumber uint64 `db:"slot_number"`
SlotIndex uint64 `db:"slot_index"`
SlotRoot []byte `db:"slot_root"`
Orphaned bool `db:"orphaned"`
PublicKey []byte `db:"publickey"`
WithdrawalCredentials []byte `db:"withdrawalcredentials"`
Amount uint64 `db:"amount"`
}

func InsertDeposits(deposits []*dbtypes.Deposit, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO deposits ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO deposits ",
}),
"(deposit_index, slot_number, slot_index, slot_root, orphaned, publickey, withdrawalcredentials, amount)",
" VALUES ",
)
argIdx := 0
fieldCount := 8

args := make([]any, len(deposits)*fieldCount)
for i, deposit := range deposits {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = deposit.Index
args[argIdx+1] = deposit.SlotNumber
args[argIdx+2] = deposit.SlotIndex
args[argIdx+3] = deposit.SlotRoot
args[argIdx+4] = deposit.Orphaned
args[argIdx+5] = deposit.PublicKey
args[argIdx+6] = deposit.WithdrawalCredentials
args[argIdx+7] = deposit.Amount
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (slot_index, slot_root) DO UPDATE SET deposit_index = excluded.deposit_index, orphaned = excluded.orphaned",
dbtypes.DBEngineSqlite: "",
}))
_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}
70 changes: 70 additions & 0 deletions db/schema/pgsql/20240429121225_deposit-index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS deposit_txs (
deposit_index INT NOT NULL,
block_number INT NOT NULL,
block_root bytea NOT NULL,
publickey bytea NOT NULL,
withdrawalcredentials bytea NOT NULL,
amount BIGINT NOT NULL,
signature bytea NOT NULL,
valid_signature bool NOT NULL DEFAULT TRUE,
orphaned bool NOT NULL DEFAULT FALSE,
tx_hash bytea NOT NULL,
tx_sender bytea NOT NULL,
tx_origin bytea NOT NULL,
tx_target bytea NOT NULL,
CONSTRAINT deposit_txs_pkey PRIMARY KEY (deposit_index, block_root)
);

CREATE INDEX IF NOT EXISTS "deposit_txs_deposit_index_idx"
ON public."deposit_txs"
("deposit_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "deposit_txs_publickey_idx"
ON public."deposit_txs"
("publickey" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "deposit_txs_tx_sender_idx"
ON public."deposit_txs"
("tx_sender" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "deposit_txs_tx_origin_idx"
ON public."deposit_txs"
("tx_origin" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "deposit_txs_tx_target_idx"
ON public."deposit_txs"
("tx_target" ASC NULLS FIRST);

CREATE TABLE IF NOT EXISTS deposits (
deposit_index INT NULL,
slot_number INT NOT NULL,
slot_index INT NOT NULL,
slot_root bytea NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
publickey bytea NOT NULL,
withdrawalcredentials bytea NOT NULL,
amount BIGINT NOT NULL,
CONSTRAINT deposit_pkey PRIMARY KEY (slot_index, slot_root)
);

CREATE INDEX IF NOT EXISTS "deposits_deposit_index_idx"
ON public."deposits"
("deposit_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "deposits_slot_number_idx"
ON public."deposits"
("slot_number" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "deposits_publickey_idx"
ON public."deposits"
("publickey" ASC NULLS FIRST);


-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
69 changes: 69 additions & 0 deletions db/schema/sqlite/20240429121225_deposit-index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS deposit_txs (
deposit_index INT NOT NULL,
block_number INT NOT NULL,
block_root BLOB NOT NULL,
publickey BLOB NOT NULL,
withdrawalcredentials BLOB NOT NULL,
amount BIGINT NOT NULL,
signature BLOB NOT NULL,
valid_signature bool NOT NULL DEFAULT TRUE,
orphaned bool NOT NULL DEFAULT FALSE,
tx_hash BLOB NOT NULL,
tx_sender BLOB NOT NULL,
tx_origin BLOB NOT NULL,
tx_target BLOB NOT NULL,
CONSTRAINT deposit_txs_pkey PRIMARY KEY (deposit_index, block_root)
);

CREATE INDEX IF NOT EXISTS "deposit_txs_deposit_index_idx"
ON "deposit_txs"
("deposit_index" ASC);

CREATE INDEX IF NOT EXISTS "deposit_txs_publickey_idx"
ON "deposit_txs"
("publickey" ASC);

CREATE INDEX IF NOT EXISTS "deposit_txs_tx_sender_idx"
ON "deposit_txs"
("tx_sender" ASC);

CREATE INDEX IF NOT EXISTS "deposit_txs_tx_origin_idx"
ON "deposit_txs"
("tx_origin" ASC);

CREATE INDEX IF NOT EXISTS "deposit_txs_tx_target_idx"
ON "deposit_txs"
("tx_target" ASC);

CREATE TABLE IF NOT EXISTS deposits (
deposit_index INT NULL,
slot_number INT NOT NULL,
slot_index INT NOT NULL,
slot_root BLOB NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
publickey BLOB NOT NULL,
withdrawalcredentials BLOB NOT NULL,
amount BIGINT NOT NULL,
CONSTRAINT deposit_pkey PRIMARY KEY (slot_index, slot_root)
);

CREATE INDEX IF NOT EXISTS "deposits_deposit_index_idx"
ON "deposits"
("deposit_index" ASC);

CREATE INDEX IF NOT EXISTS "deposits_slot_number_idx"
ON "deposits"
("slot_number" ASC);

CREATE INDEX IF NOT EXISTS "deposits_publickey_idx"
ON "deposits"
("publickey" ASC);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
27 changes: 27 additions & 0 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,30 @@ type TxPendingFunctionSignature struct {
Bytes []byte `db:"bytes"`
QueueTime uint64 `db:"queuetime"`
}

type DepositTx struct {
Index uint64 `db:"deposit_index"`
BlockNumber uint64 `db:"block_number"`
BlockRoot []byte `db:"block_root"`
PublicKey []byte `db:"publickey"`
WithdrawalCredentials []byte `db:"withdrawalcredentials"`
Amount uint64 `db:"amount"`
Signature []byte `db:"signature"`
ValidSignature bool `db:"valid_signature"`
Orphaned bool `db:"orphaned"`
TxTash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxOrigin []byte `db:"tx_origin"`
TxTarget []byte `db:"tx_target"`
}

type Deposit struct {
Index *uint64 `db:"deposit_index"`
SlotNumber uint64 `db:"slot_number"`
SlotIndex uint64 `db:"slot_index"`
SlotRoot []byte `db:"slot_root"`
Orphaned bool `db:"orphaned"`
PublicKey []byte `db:"publickey"`
WithdrawalCredentials []byte `db:"withdrawalcredentials"`
Amount uint64 `db:"amount"`
}
27 changes: 18 additions & 9 deletions indexer/cache_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error {
// calculate votes
epochVotes := aggregateEpochVotes(canonicalMap, epoch, epochStats, epochTarget, false, true)

if epochStats.validatorStats != nil {
logger.Infof("epoch %v stats: %v validators (%v)", epoch, epochStats.validatorStats.ValidatorCount, epochStats.validatorStats.EligibleAmount)
if epochStats.stateStats != nil {
logger.Infof("epoch %v stats: %v validators (%v)", epoch, epochStats.stateStats.ValidatorCount, epochStats.stateStats.EligibleAmount)
}
logger.Infof("epoch %v votes: target %v + %v = %v", epoch, epochVotes.currentEpoch.targetVoteAmount, epochVotes.nextEpoch.targetVoteAmount, epochVotes.currentEpoch.targetVoteAmount+epochVotes.nextEpoch.targetVoteAmount)
logger.Infof("epoch %v votes: head %v + %v = %v", epoch, epochVotes.currentEpoch.headVoteAmount, epochVotes.nextEpoch.headVoteAmount, epochVotes.currentEpoch.headVoteAmount+epochVotes.nextEpoch.headVoteAmount)
Expand Down Expand Up @@ -246,8 +246,11 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error {
if !block.IsReady() {
continue
}
dbBlock := buildDbBlock(block, nil)
db.InsertSlot(dbBlock, tx)

err := persistBlockData(block, nil, nil, false, tx)
if err != nil {
logger.Errorf("error while persisting slot: %v", err)
}
}

}
Expand Down Expand Up @@ -311,17 +314,16 @@ func (cache *indexerCache) processOrphanedBlocks(processedEpoch int64) error {
if !block.IsReady() {
continue
}
dbBlock := buildDbBlock(block, cache.getEpochStats(utils.EpochOfSlot(block.Slot), nil))
if block.IsCanonical(cache.indexer, cache.justifiedRoot) {
isCanonical := block.IsCanonical(cache.indexer, cache.justifiedRoot)
if isCanonical {
logger.Warnf("canonical block in orphaned block processing: %v [0x%x]", block.Slot, block.Root)
} else {
dbBlock.Status = dbtypes.Orphaned
db.InsertOrphanedBlock(block.buildOrphanedBlock(), tx)
}

err := db.InsertSlot(dbBlock, tx)
err := persistBlockData(block, cache.getEpochStats(utils.EpochOfSlot(block.Slot), nil), nil, !isCanonical, tx)
if err != nil {
logger.Errorf("failed inserting orphaned block: %v", err)
logger.Errorf("error while persisting orphaned slot: %v", err)
}
}

Expand Down Expand Up @@ -439,6 +441,13 @@ func (cache *indexerCache) processCachePersistence() error {
logger.Errorf("error inserting unfinalized block: %v", err)
return err
}

err = persistBlockDeposits(block, nil, true, tx)
if err != nil {
logger.Errorf("error persisting unfinalized deposits: %v", err)
return err
}

block.isInDb = true
}
}
Expand Down
Loading

0 comments on commit 47e22bb

Please sign in to comment.