Skip to content

Commit

Permalink
feat(indexer/postgres)!: add basic support for header, txs and events (
Browse files Browse the repository at this point in the history
…#22695)

(cherry picked from commit 332d0b1)

# Conflicts:
#	go.mod
#	go.sum
#	indexer/postgres/base_sql.go
#	indexer/postgres/listener.go
#	runtime/v2/app.go
#	runtime/v2/go.mod
#	runtime/v2/go.sum
#	schema/appdata/data.go
#	schema/decoding/middleware.go
#	server/v2/cometbft/go.mod
#	server/v2/cometbft/go.sum
#	server/v2/go.mod
#	server/v2/go.sum
#	server/v2/stf/go.mod
#	server/v2/stf/go.sum
#	server/v2/stf/stf.go
#	simapp/go.mod
#	simapp/go.sum
#	simapp/v2/go.mod
#	simapp/v2/go.sum
#	tests/go.sum
#	x/accounts/go.mod
#	x/accounts/go.sum
#	x/distribution/go.mod
#	x/distribution/go.sum
  • Loading branch information
facundomedica authored and mergify[bot] committed Dec 9, 2024
1 parent 10efb9c commit 88374d3
Show file tree
Hide file tree
Showing 34 changed files with 2,515 additions and 22 deletions.
4 changes: 4 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
if app.streamingManager.StopNodeOnErr {
err = fmt.Errorf("Commit listening hook failed: %w", err)
if blockHeight == 1 {
// can't rollback to height 0, so just return the error
return nil, fmt.Errorf("failed to commit block 1, can't automatically rollback: %w", err)
}
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
if rollbackErr != nil {
return nil, errors.Join(err, rollbackErr)
Expand Down
39 changes: 29 additions & 10 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
Expand All @@ -20,6 +21,7 @@ import (

"github.com/cosmos/cosmos-sdk/client/flags"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

const (
Expand Down Expand Up @@ -48,7 +50,7 @@ func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*stor
app.cms.AddListeners(exposedKeys)

app.streamingManager = storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener}},
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener, app.txDecoder}},
StopNodeOnErr: true,
}

Expand Down Expand Up @@ -144,9 +146,10 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
return exposeStoreKeys
}

func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
func eventToAppDataEvent(event abci.Event, height int64) (appdata.Event, error) {
appdataEvent := appdata.Event{
Type: event.Type,
BlockNumber: uint64(height),
Type: event.Type,
Attributes: func() ([]appdata.EventAttribute, error) {
attrs := make([]appdata.EventAttribute, len(event.Attributes))
for j, attr := range event.Attributes {
Expand Down Expand Up @@ -197,7 +200,8 @@ func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
}

type listenerWrapper struct {
listener appdata.Listener
listener appdata.Listener
txDecoder sdk.TxDecoder
}

// NewListenerWrapper creates a new listenerWrapper.
Expand All @@ -208,20 +212,35 @@ func NewListenerWrapper(listener appdata.Listener) listenerWrapper {

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
// clean up redundant data
reqWithoutTxs := req
reqWithoutTxs.Txs = nil

if err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: func() (json.RawMessage, error) {
return json.Marshal(reqWithoutTxs)
},
}); err != nil {
return err
}
}
if p.listener.OnTx != nil {
for i, tx := range req.Txs {
if err := p.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
BlockNumber: uint64(req.Height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: func() (json.RawMessage, error) {
sdkTx, err := p.txDecoder(tx)
if err != nil {
// if the transaction cannot be decoded, return the error as JSON
// as there are some txs that might not be decodeable by the txDecoder
return json.Marshal(err)
}
return json.Marshal(sdkTx)
},
}); err != nil {
return err
}
Expand All @@ -231,14 +250,14 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz
events := make([]appdata.Event, len(res.Events))
var err error
for i, event := range res.Events {
events[i], err = eventToAppDataEvent(event)
events[i], err = eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
}
for _, txResult := range res.TxResults {
for _, event := range txResult.Events {
appdataEvent, err := eventToAppDataEvent(event)
appdataEvent, err := eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions codec/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func protoCol(f protoreflect.FieldDescriptor) schema.Field {
col.Kind = schema.StringKind
case protoreflect.BytesKind:
col.Kind = schema.BytesKind
col.Nullable = true
case protoreflect.EnumKind:
// TODO: support enums
col.Kind = schema.EnumKind
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,16 @@ require (
// )
// TODO remove after all modules have their own go.mods
replace (
<<<<<<< HEAD
// pseudo version lower than the latest tag
cosmossdk.io/api => cosmossdk.io/api v0.7.3-0.20241127063259-f296a5005ce8 // main
// pseudo version lower than the latest tag
cosmossdk.io/store => cosmossdk.io/store v1.0.0-rc.0.0.20241204123127-eb3bf8b0469d // main
=======
cosmossdk.io/api => ./api
cosmossdk.io/schema => ./schema
cosmossdk.io/store => ./store
>>>>>>> 332d0b106 (feat(indexer/postgres)!: add basic support for header, txs and events (#22695))
cosmossdk.io/x/bank => ./x/bank
cosmossdk.io/x/staking => ./x/staking
)
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
<<<<<<< HEAD
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b h1:svpFdulZRrYz+RTHu2u9CeKkMKrIHx5354vjiHerovo=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/store v1.0.0-rc.0.0.20241204123127-eb3bf8b0469d h1:KQM4Q6kjwlM4HuDZRV8/ZDXX3whjfStndYNTsRrbboQ=
cosmossdk.io/store v1.0.0-rc.0.0.20241204123127-eb3bf8b0469d/go.mod h1:oZBBY4BrkYnghr6MFL0MP5mGqpkPedHcWkXwXddd6tU=
cosmossdk.io/x/tx v1.0.0-alpha.2 h1:UW80FMm7B0fiAMsrfe5+HabSJ3XBg+tQa6/GK9prqWk=
cosmossdk.io/x/tx v1.0.0-alpha.2/go.mod h1:r4yTKSJ7ZCCR95YbBfY3nfvbgNw6m9F6f25efWYYQWo=
=======
>>>>>>> 332d0b106 (feat(indexer/postgres)!: add basic support for header, txs and events (#22695))
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
35 changes: 35 additions & 0 deletions indexer/postgres/base_sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package postgres

// baseSQL is the base SQL that is always included in the schema.
const baseSQL = `
CREATE OR REPLACE FUNCTION nanos_to_timestamptz(nanos bigint) RETURNS timestamptz AS $$
SELECT to_timestamp(nanos / 1000000000) + (nanos / 1000000000) * INTERVAL '1 microsecond'
$$ LANGUAGE SQL IMMUTABLE;
CREATE TABLE IF NOT EXISTS block
(
number BIGINT NOT NULL PRIMARY KEY,
header JSONB NULL
);
CREATE TABLE IF NOT EXISTS tx
(
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
index_in_block BIGINT NOT NULL,
data JSONB NULL,
bytes BYTEA NULL
);
CREATE TABLE IF NOT EXISTS event
(
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
block_stage INTEGER NOT NULL,
tx_index BIGINT NOT NULL,
msg_index BIGINT NOT NULL,
event_index BIGINT NOT NULL,
type TEXT NULL,
data JSONB NULL
);
`
153 changes: 153 additions & 0 deletions indexer/postgres/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package postgres

import (
"encoding/json"
"fmt"

"cosmossdk.io/schema/appdata"
)

func (i *indexerImpl) listener() appdata.Listener {
return appdata.Listener{
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
moduleName := data.ModuleName
modSchema := data.Schema
_, ok := i.modules[moduleName]
if ok {
return fmt.Errorf("module %s already initialized", moduleName)
}

mm := newModuleIndexer(moduleName, modSchema, i.opts)
i.modules[moduleName] = mm

return mm.initializeSchema(i.ctx, i.tx)
},
StartBlock: func(data appdata.StartBlockData) error {
var (
headerBz []byte
err error
)

if data.HeaderJSON != nil {
headerBz, err = data.HeaderJSON()
if err != nil {
return err
}
} else if data.HeaderBytes != nil {
headerBz, err = data.HeaderBytes()
if err != nil {
return err
}
}

// TODO: verify the format of headerBz, otherwise we'll get `ERROR: invalid input syntax for type json (SQLSTATE 22P02)`
_, err = i.tx.Exec("INSERT INTO block (number, header) VALUES ($1, $2)", data.Height, headerBz)

return err
},
OnObjectUpdate: func(data appdata.ObjectUpdateData) error {
module := data.ModuleName
mod, ok := i.modules[module]
if !ok {
return fmt.Errorf("module %s not initialized", module)
}

for _, update := range data.Updates {
if i.logger != nil {
i.logger.Debug("OnObjectUpdate", "module", module, "type", update.TypeName, "key", update.Key, "delete", update.Delete, "value", update.Value)
}
tm, ok := mod.tables[update.TypeName]
if !ok {
return fmt.Errorf("object type %s not found in schema for module %s", update.TypeName, module)
}

var err error
if update.Delete {
err = tm.delete(i.ctx, i.tx, update.Key)
} else {
err = tm.insertUpdate(i.ctx, i.tx, update.Key, update.Value)
}
if err != nil {
return err
}
}
return nil
},
Commit: func(data appdata.CommitData) (func() error, error) {
err := i.tx.Commit()
if err != nil {
return nil, err
}

i.tx, err = i.db.BeginTx(i.ctx, nil)
return nil, err
},
OnTx: txListener(i),
OnEvent: eventListener(i),
}
}

func txListener(i *indexerImpl) func(data appdata.TxData) error {
return func(td appdata.TxData) error {
var bz []byte
if td.Bytes != nil {
var err error
bz, err = td.Bytes()
if err != nil {
return err
}
}

var jsonData json.RawMessage
if td.JSON != nil {
var err error
jsonData, err = td.JSON()
if err != nil {
return err
}
}

_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
td.BlockNumber, td.TxIndex, jsonData, bz)

return err
}
}

func eventListener(i *indexerImpl) func(data appdata.EventData) error {
return func(data appdata.EventData) error {
for _, e := range data.Events {
var jsonData json.RawMessage

if e.Data != nil {
var err error
jsonData, err = e.Data()
if err != nil {
return fmt.Errorf("failed to get event data: %w", err)
}
} else if e.Attributes != nil {
attrs, err := e.Attributes()
if err != nil {
return fmt.Errorf("failed to get event attributes: %w", err)
}

attrsMap := map[string]interface{}{}
for _, attr := range attrs {
attrsMap[attr.Key] = attr.Value
}

jsonData, err = json.Marshal(attrsMap)
if err != nil {
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
}

_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data) VALUES ($1, $2, $3, $4, $5, $6, $7)",
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData)
if err != nil {
return fmt.Errorf("failed to index event: %w", err)
}
}
return nil
}
}
Loading

0 comments on commit 88374d3

Please sign in to comment.