Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change database schema to avoid expensive joins on search views #57

Merged
merged 10 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions db/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package db

import (
"fmt"
"strings"

"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
)

func InsertBlob(blob *dbtypes.Blob, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO blobs (
commitment, proof, size, blob
) VALUES ($1, $2, $3, $4)
ON CONFLICT (commitment) DO UPDATE SET
size = excluded.size,
blob = excluded.blob`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO blobs (
commitment, proof, size, blob
) VALUES ($1, $2, $3, $4)`,
}),
blob.Commitment, blob.Proof, blob.Size, blob.Blob)
if err != nil {
return err
}
return nil
}

func InsertBlobAssignment(blobAssignment *dbtypes.BlobAssignment, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO blob_assignments (
root, commitment, slot
) VALUES ($1, $2, $3)
ON CONFLICT (root, commitment) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO blob_assignments (
root, commitment, slot
) VALUES ($1, $2, $3)`,
}),
blobAssignment.Root, blobAssignment.Commitment, blobAssignment.Slot)
if err != nil {
return err
}
return nil
}

func GetBlob(commitment []byte, withData bool) *dbtypes.Blob {
blob := dbtypes.Blob{}
var sql strings.Builder
fmt.Fprintf(&sql, `SELECT commitment, proof, size`)
if withData {
fmt.Fprintf(&sql, `, blob`)
}
fmt.Fprintf(&sql, ` FROM blobs WHERE commitment = $1`)
err := ReaderDb.Get(&blob, sql.String(), commitment)
if err != nil {
return nil
}
return &blob
}

func GetLatestBlobAssignment(commitment []byte) *dbtypes.BlobAssignment {
blobAssignment := dbtypes.BlobAssignment{}
err := ReaderDb.Get(&blobAssignment, "SELECT root, commitment, slot FROM blob_assignments WHERE commitment = $1 ORDER BY slot DESC LIMIT 1", commitment)
if err != nil {
return nil
}
return &blobAssignment
}
200 changes: 200 additions & 0 deletions db/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package db

import (
"embed"
"fmt"
"time"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
"github.com/sirupsen/logrus"

"github.com/ethpandaops/dora/dbtypes"
"github.com/ethpandaops/dora/types"
"github.com/ethpandaops/dora/utils"

"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib"
)

//go:embed schema/pgsql/*.sql
var EmbedPgsqlSchema embed.FS

//go:embed schema/sqlite/*.sql
var EmbedSqliteSchema embed.FS

var DBPGX *pgxpool.Conn

// DB is a pointer to the explorer-database
var DbEngine dbtypes.DBEngineType
var WriterDb *sqlx.DB
var ReaderDb *sqlx.DB

var logger = logrus.StandardLogger().WithField("module", "db")

func checkDbConn(dbConn *sqlx.DB, dataBaseName string) {
// The golang sql driver does not properly implement PingContext
// therefore we use a timer to catch db connection timeouts
dbConnectionTimeout := time.NewTimer(15 * time.Second)

go func() {
<-dbConnectionTimeout.C
logger.Fatalf("timeout while connecting to %s", dataBaseName)
}()

err := dbConn.Ping()
if err != nil {
logger.Fatalf("unable to Ping %s: %s", dataBaseName, err)
}

dbConnectionTimeout.Stop()
}

func mustInitSqlite(config *types.SqliteDatabaseConfig) (*sqlx.DB, *sqlx.DB) {
if config.MaxOpenConns == 0 {
config.MaxOpenConns = 50
}
if config.MaxIdleConns == 0 {
config.MaxIdleConns = 10
}
if config.MaxOpenConns < config.MaxIdleConns {
config.MaxIdleConns = config.MaxOpenConns
}

logger.Infof("initializing sqlite connection to %v with %v/%v conn limit", config.File, config.MaxIdleConns, config.MaxOpenConns)
dbConn, err := sqlx.Open("sqlite3", fmt.Sprintf("%s?cache=shared", config.File))
if err != nil {
utils.LogFatal(err, "error opening sqlite database", 0)
}

checkDbConn(dbConn, "database")
dbConn.SetConnMaxIdleTime(0)
dbConn.SetConnMaxLifetime(0)
dbConn.SetMaxOpenConns(config.MaxOpenConns)
dbConn.SetMaxIdleConns(config.MaxIdleConns)

dbConn.MustExec("PRAGMA journal_mode = WAL")

return dbConn, dbConn
}

func mustInitPgsql(writer *types.PgsqlDatabaseConfig, reader *types.PgsqlDatabaseConfig) (*sqlx.DB, *sqlx.DB) {
if writer.MaxOpenConns == 0 {
writer.MaxOpenConns = 50
}
if writer.MaxIdleConns == 0 {
writer.MaxIdleConns = 10
}
if writer.MaxOpenConns < writer.MaxIdleConns {
writer.MaxIdleConns = writer.MaxOpenConns
}

if reader.MaxOpenConns == 0 {
reader.MaxOpenConns = 50
}
if reader.MaxIdleConns == 0 {
reader.MaxIdleConns = 10
}
if reader.MaxOpenConns < reader.MaxIdleConns {
reader.MaxIdleConns = reader.MaxOpenConns
}

logger.Infof("initializing pgsql writer connection to %v with %v/%v conn limit", writer.Host, writer.MaxIdleConns, writer.MaxOpenConns)
dbConnWriter, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", writer.Username, writer.Password, writer.Host, writer.Port, writer.Name))
if err != nil {
utils.LogFatal(err, "error getting pgsql writer database", 0)
}

checkDbConn(dbConnWriter, "database")
dbConnWriter.SetConnMaxIdleTime(time.Second * 30)
dbConnWriter.SetConnMaxLifetime(time.Second * 60)
dbConnWriter.SetMaxOpenConns(writer.MaxOpenConns)
dbConnWriter.SetMaxIdleConns(writer.MaxIdleConns)

logger.Infof("initializing pgsql reader connection to %v with %v/%v conn limit", writer.Host, reader.MaxIdleConns, reader.MaxOpenConns)
dbConnReader, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", reader.Username, reader.Password, reader.Host, reader.Port, reader.Name))
if err != nil {
utils.LogFatal(err, "error getting pgsql reader database", 0)
}

checkDbConn(dbConnReader, "read replica database")
dbConnReader.SetConnMaxIdleTime(time.Second * 30)
dbConnReader.SetConnMaxLifetime(time.Second * 60)
dbConnReader.SetMaxOpenConns(reader.MaxOpenConns)
dbConnReader.SetMaxIdleConns(reader.MaxIdleConns)
return dbConnWriter, dbConnReader
}

func MustInitDB() {
if utils.Config.Database.Engine == "sqlite" {
sqliteConfig := (*types.SqliteDatabaseConfig)(&utils.Config.Database.Sqlite)
DbEngine = dbtypes.DBEngineSqlite
WriterDb, ReaderDb = mustInitSqlite(sqliteConfig)
} else if utils.Config.Database.Engine == "pgsql" {
readerConfig := (*types.PgsqlDatabaseConfig)(&utils.Config.Database.Pgsql)
writerConfig := (*types.PgsqlDatabaseConfig)(&utils.Config.Database.PgsqlWriter)
if writerConfig.Host == "" {
writerConfig = readerConfig
}
DbEngine = dbtypes.DBEnginePgsql
WriterDb, ReaderDb = mustInitPgsql(writerConfig, readerConfig)
} else {
logger.Fatalf("unknown database engine type: %s", utils.Config.Database.Engine)
}
}

func MustCloseDB() {
err := WriterDb.Close()
if err != nil {
logger.Errorf("Error closing writer db connection: %v", err)
}
err = ReaderDb.Close()
if err != nil {
logger.Errorf("Error closing reader db connection: %v", err)
}
}

func ApplyEmbeddedDbSchema(version int64) error {
var engineDialect string
var schemaDirectory string
switch DbEngine {
case dbtypes.DBEnginePgsql:
goose.SetBaseFS(EmbedPgsqlSchema)
engineDialect = "postgres"
schemaDirectory = "schema/pgsql"
case dbtypes.DBEngineSqlite:
goose.SetBaseFS(EmbedSqliteSchema)
engineDialect = "sqlite3"
schemaDirectory = "schema/sqlite"
default:
logger.Fatalf("unknown database engine")
}
if err := goose.SetDialect(engineDialect); err != nil {
return err
}

if version == -2 {
if err := goose.Up(WriterDb.DB, schemaDirectory); err != nil {
return err
}
} else if version == -1 {
if err := goose.UpByOne(WriterDb.DB, schemaDirectory); err != nil {
return err
}
} else {
if err := goose.UpTo(WriterDb.DB, schemaDirectory, version); err != nil {
return err
}
}

return nil
}

func EngineQuery(queryMap map[dbtypes.DBEngineType]string) string {
if queryMap[DbEngine] != "" {
return queryMap[DbEngine]
}
return queryMap[dbtypes.DBEngineAny]
}
Loading
Loading