From e6948eeda8770e280001b079639fb80d59436778 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 12 Dec 2024 11:42:57 +0100 Subject: [PATCH] fix(stf,server/v2/cometbft): fix default events + improve codec handling (#22837) --- server/v2/cometbft/abci.go | 23 +++++++---------- server/v2/cometbft/abci_test.go | 16 ++++++------ server/v2/cometbft/grpc.go | 41 +++++++++++++++--------------- server/v2/cometbft/query.go | 2 +- server/v2/cometbft/server.go | 23 +++++++++++------ server/v2/stf/stf.go | 43 ++++++++++++++++++++++++++++++++ server/v2/stf/stf_test.go | 23 +++++++++++++---- simapp/v2/simdv2/cmd/commands.go | 9 ++++--- tests/systemtests/tx_test.go | 2 +- 9 files changed, 123 insertions(+), 59 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 0114d933567..f97d6fa43b3 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -15,7 +15,6 @@ import ( "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/collections" - addresscodec "cosmossdk.io/core/address" appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/comet" corecontext "cosmossdk.io/core/context" @@ -35,8 +34,6 @@ import ( "cosmossdk.io/server/v2/streaming" "cosmossdk.io/store/v2/snapshots" consensustypes "cosmossdk.io/x/consensus/types" - - "github.com/cosmos/cosmos-sdk/codec" ) const ( @@ -52,13 +49,12 @@ type consensus[T transaction.Tx] struct { logger log.Logger appName, version string app appmanager.AppManager[T] - appCodec codec.Codec - txCodec transaction.Codec[T] store types.Store listener *appdata.Listener snapshotManager *snapshots.Manager streamingManager streaming.Manager mempool mempool.Mempool[T] + appCodecs AppCodecs[T] cfg Config chainID string @@ -84,16 +80,15 @@ type consensus[T transaction.Tx] struct { addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID - queryHandlersMap map[string]appmodulev2.Handler - getProtoRegistry func() (*protoregistry.Files, error) - consensusAddressCodec addresscodec.Codec - cfgMap server.ConfigMap + queryHandlersMap map[string]appmodulev2.Handler + getProtoRegistry func() (*protoregistry.Files, error) + cfgMap server.ConfigMap } // CheckTx implements types.Application. // It is called by cometbft to verify transaction validity func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) { - decodedTx, err := c.txCodec.Decode(req.Tx) + decodedTx, err := c.appCodecs.TxCodec.Decode(req.Tx) if err != nil { return nil, err } @@ -325,7 +320,7 @@ func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe ctx, br, req.AppStateBytes, - c.txCodec) + c.appCodecs.TxCodec) if err != nil { return nil, fmt.Errorf("genesis state init failure: %w", err) } @@ -392,7 +387,7 @@ func (c *consensus[T]) PrepareProposal( LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit), }) - txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req) + txs, err := c.prepareProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req) if err != nil { return nil, err } @@ -438,7 +433,7 @@ func (c *consensus[T]) ProcessProposal( LastCommit: toCoreCommitInfo(req.ProposedLastCommit), }) - err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req) + err := c.processProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req) if err != nil { c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err) return &abciproto.ProcessProposalResponse{ @@ -567,7 +562,7 @@ func (c *consensus[T]) internalFinalizeBlock( // TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case, // considering that prepare and process always decode txs, assuming they're the ones providing txs we should never // have a tx that fails decoding. - decodedTxs, err := decodeTxs(c.logger, req.Txs, c.txCodec) + decodedTxs, err := decodeTxs(c.logger, req.Txs, c.appCodecs.TxCodec) if err != nil { return nil, nil, nil, err } diff --git a/server/v2/cometbft/abci_test.go b/server/v2/cometbft/abci_test.go index 17ff917c651..b35c71a7c24 100644 --- a/server/v2/cometbft/abci_test.go +++ b/server/v2/cometbft/abci_test.go @@ -886,13 +886,15 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock. } return &consensus[mock.Tx]{ - logger: log.NewNopLogger(), - appName: "testing-app", - app: am, - mempool: mempool, - store: mockStore, - cfg: Config{AppTomlConfig: DefaultAppTomlConfig()}, - txCodec: mock.TxCodec{}, + logger: log.NewNopLogger(), + appName: "testing-app", + app: am, + mempool: mempool, + store: mockStore, + cfg: Config{AppTomlConfig: DefaultAppTomlConfig()}, + appCodecs: AppCodecs[mock.Tx]{ + TxCodec: mock.TxCodec{}, + }, chainID: "test", getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), queryHandlersMap: queryHandler, diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index cfc682e47f3..1f59a4771cf 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -27,7 +27,6 @@ import ( nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" - "github.com/cosmos/cosmos-sdk/std" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/types/query" @@ -117,12 +116,13 @@ func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockW } decodeTxAt := func(i uint64) error { tx := blockTxs[i] - txb, err := t.clientCtx.TxConfig.TxDecoder()(tx) - fmt.Println("TxDecoder", txb, err) + txb, err := t.txCodec.Decode(tx) if err != nil { return err } - p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() + + // txServer works only with sdk.Tx + p, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() if err != nil { return err } @@ -256,6 +256,11 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest) msgResponses = append(msgResponses, anyMsg) } + event, err := intoABCIEvents(txResult.Events, map[string]struct{}{}, false) + if err != nil { + return nil, status.Errorf(codes.Unknown, "failed to convert events: %v", err) + } + return &txtypes.SimulateResponse{ GasInfo: &sdk.GasInfo{ GasUsed: txResult.GasUsed, @@ -263,6 +268,7 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest) }, Result: &sdk.Result{ MsgResponses: msgResponses, + Events: event, }, }, nil } @@ -273,15 +279,17 @@ func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest) return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes") } - txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes) + txb, err := t.txCodec.Decode(req.TxBytes) if err != nil { return nil, err } - tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also + // txServer works only with sdk.Tx + tx, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() if err != nil { return nil, err } + return &txtypes.TxDecodeResponse{ Tx: tx, }, nil @@ -350,7 +358,7 @@ func (t txServer[T]) TxEncodeAmino(_ context.Context, req *txtypes.TxEncodeAmino var stdTx legacytx.StdTx err := t.clientCtx.LegacyAmino.UnmarshalJSON([]byte(req.AminoJson), &stdTx) if err != nil { - return nil, err + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("invalid request %s", err)) } encodedBytes, err := t.clientCtx.LegacyAmino.Marshal(stdTx) @@ -466,7 +474,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress) - cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) + cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.appCodecs.ConsensusAddressCodec) paths := strings.Split(req.Path, "/") if len(paths) <= 2 { return nil, fmt.Errorf("invalid request path: %s", req.Path) @@ -516,27 +524,18 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc // Handle tx service if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") { - // init simple client context - amino := codec.NewLegacyAmino() - std.RegisterLegacyAminoCodec(amino) - txConfig := authtx.NewTxConfig( - c.appCodec, - c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(), - c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(), - authtx.DefaultSignModes, - ) rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address) + // init simple client context clientCtx := client.Context{}. - WithLegacyAmino(amino). - WithCodec(c.appCodec). - WithTxConfig(txConfig). + WithLegacyAmino(c.appCodecs.LegacyAmino.(*codec.LegacyAmino)). + WithCodec(c.appCodecs.AppCodec). WithNodeURI(c.cfg.AppTomlConfig.Address). WithClient(rpcClient) txService := txServer[T]{ clientCtx: clientCtx, - txCodec: c.txCodec, + txCodec: c.appCodecs.TxCodec, app: c.app, consensus: c, } diff --git a/server/v2/cometbft/query.go b/server/v2/cometbft/query.go index bf6bcfe02eb..0bdb9e40f06 100644 --- a/server/v2/cometbft/query.go +++ b/server/v2/cometbft/query.go @@ -51,7 +51,7 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a switch path[1] { case "simulate": - tx, err := c.txCodec.Decode(req.Data) + tx, err := c.appCodecs.TxCodec.Decode(req.Data) if err != nil { return nil, errorsmod.Wrap(err, "failed to decode tx") } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 8789b7830f9..1182924a4f1 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -25,6 +25,7 @@ import ( addresscodec "cosmossdk.io/core/address" appmodulev2 "cosmossdk.io/core/appmodule/v2" + "cosmossdk.io/core/registry" "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" "cosmossdk.io/log" @@ -66,14 +67,24 @@ type CometBFTServer[T transaction.Tx] struct { store types.Store } +// AppCodecs contains all codecs that the CometBFT server requires +// provided by the application. They are extracted in struct to not be API +// breaking once amino is completely deprecated or new codecs should be added. +type AppCodecs[T transaction.Tx] struct { + TxCodec transaction.Codec[T] + + // The following codecs are only required for the gRPC services + AppCodec codec.Codec + LegacyAmino registry.AminoRegistrar + ConsensusAddressCodec addresscodec.Codec +} + func New[T transaction.Tx]( logger log.Logger, appName string, store types.Store, app appmanager.AppManager[T], - appCodec codec.Codec, - txCodec transaction.Codec[T], - consensusAddressCodec addresscodec.Codec, + appCodecs AppCodecs[T], queryHandlers map[string]appmodulev2.Handler, decoderResolver decoding.DecoderResolver, serverOptions ServerOptions[T], @@ -84,7 +95,7 @@ func New[T transaction.Tx]( serverOptions: serverOptions, cfgOptions: cfgOptions, app: app, - txCodec: txCodec, + txCodec: appCodecs.TxCodec, store: store, } srv.logger = logger.With(log.ModuleKey, srv.Name()) @@ -172,8 +183,7 @@ func New[T transaction.Tx]( cfg: srv.config, store: store, logger: logger, - txCodec: txCodec, - appCodec: appCodec, + appCodecs: appCodecs, listener: listener, snapshotManager: snapshotManager, streamingManager: srv.serverOptions.StreamingManager, @@ -192,7 +202,6 @@ func New[T transaction.Tx]( addrPeerFilter: srv.serverOptions.AddrPeerFilter, idPeerFilter: srv.serverOptions.IdPeerFilter, cfgMap: cfg, - consensusAddressCodec: consensusAddressCodec, } c.optimisticExec = oe.NewOptimisticExecution( diff --git a/server/v2/stf/stf.go b/server/v2/stf/stf.go index 4411a0f5fe4..4b85c5ffe49 100644 --- a/server/v2/stf/stf.go +++ b/server/v2/stf/stf.go @@ -2,8 +2,10 @@ package stf import ( "context" + "encoding/json" "errors" "fmt" + "strings" appmodulev2 "cosmossdk.io/core/appmodule/v2" corecontext "cosmossdk.io/core/context" @@ -358,12 +360,53 @@ func (s STF[T]) runTxMsgs( e.EventIndex = int32(j + 1) events = append(events, e) } + + // add message event + events = append(events, createMessageEvent(msg, int32(i+1), int32(len(execCtx.events)+1))) } consumed := execCtx.meter.Limit() - execCtx.meter.Remaining() return msgResps, consumed, events, nil } +// Create a message event, with two kv: action, the type url of the message +// and module, the module of the message. +func createMessageEvent(msg transaction.Msg, msgIndex, eventIndex int32) event.Event { + // Assumes that module name is the second element of the msg type URL + // e.g. "cosmos.bank.v1beta1.MsgSend" => "bank" + // It returns an empty string if the input is not a valid type URL + getModuleNameFromTypeURL := func(input string) string { + moduleName := strings.Split(input, ".") + if len(moduleName) > 1 { + return moduleName[1] + } + + return "" + } + + return event.Event{ + MsgIndex: msgIndex, + EventIndex: eventIndex, + Type: "message", + Attributes: func() ([]appdata.EventAttribute, error) { + typeURL := msgTypeURL(msg) + return []appdata.EventAttribute{ + {Key: "action", Value: "/" + typeURL}, + {Key: "module", Value: getModuleNameFromTypeURL(typeURL)}, + }, nil + }, + Data: func() (json.RawMessage, error) { + typeURL := msgTypeURL(msg) + attrs := []appdata.EventAttribute{ + {Key: "action", Value: "/" + typeURL}, + {Key: "module", Value: getModuleNameFromTypeURL(typeURL)}, + } + + return json.Marshal(attrs) + }, + } +} + // preBlock executes the pre block logic. func (s STF[T]) preBlock( ctx *executionContext, diff --git a/server/v2/stf/stf_test.go b/server/v2/stf/stf_test.go index b4c84a62ff9..04ae8bf5859 100644 --- a/server/v2/stf/stf_test.go +++ b/server/v2/stf/stf_test.go @@ -225,8 +225,8 @@ func TestSTF(t *testing.T) { } // check TxEvents events := txResult.Events - if len(events) != 6 { - t.Fatalf("Expected 6 TxEvents, got %d", len(events)) + if len(events) != 7 { + t.Fatalf("Expected 7 TxEvents, got %d", len(events)) } for i, event := range events { if event.BlockStage != appdata.TxProcessingStage { @@ -235,7 +235,8 @@ func TestSTF(t *testing.T) { if event.TxIndex != 1 { t.Errorf("Expected TxIndex 1, got %d", event.TxIndex) } - if event.EventIndex != int32(i%2+1) { + if event.EventIndex != int32(i%2+1) && + (event.Type == "message" && event.EventIndex != 3) { // special case for message event type as it happens in the msg handling flow t.Errorf("Expected EventIndex %d, got %d", i%2+1, event.EventIndex) } @@ -247,7 +248,7 @@ func TestSTF(t *testing.T) { t.Errorf("Expected 1 or 2 attributes, got %d", len(attrs)) } - if len(attrs) == 2 { + if len(attrs) == 2 && event.Type != "message" { if attrs[1].Key != "index" || attrs[1].Value != "2" { t.Errorf("Expected attribute key 'index' and value '2', got key '%s' and value '%s'", attrs[1].Key, attrs[1].Value) } @@ -273,7 +274,19 @@ func TestSTF(t *testing.T) { if attrs[0].Key != "msg" || attrs[0].Value != "&BoolValue{Value:true,XXX_unrecognized:[],}" { t.Errorf("Expected msg attribute with value '&BoolValue{Value:true,XXX_unrecognized:[],}', got '%s'", attrs[0].Value) } - case 4, 5: + case 4: + if event.Type != "message" { + t.Errorf("Expected event type 'message', got %s", event.Type) + } + + if event.MsgIndex != 1 { + t.Errorf("Expected MsgIndex 1, got %d", event.MsgIndex) + } + + if attrs[0].Key != "action" || attrs[0].Value != "/google.protobuf.BoolValue" { + t.Errorf("Expected msg attribute with value '/google.protobuf.BoolValue', got '%s'", attrs[0].Value) + } + case 5, 6: if event.Type != "post-tx-exec" { t.Errorf("Expected event type 'post-tx-exec', got %s", event.Type) } diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index 4b8cb10ab0c..2a7634ffea9 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -115,9 +115,12 @@ func InitRootCmd[T transaction.Tx]( simApp.Name(), simApp.Store(), simApp.App.AppManager, - simApp.AppCodec(), - &client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig}, - deps.ClientContext.ConsensusAddressCodec, + cometbft.AppCodecs[T]{ + AppCodec: simApp.AppCodec(), + TxCodec: &client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig}, + LegacyAmino: deps.ClientContext.LegacyAmino, + ConsensusAddressCodec: deps.ClientContext.ConsensusAddressCodec, + }, simApp.App.QueryHandlers(), simApp.App.SchemaDecoderResolver(), initCometOptions[T](), diff --git a/tests/systemtests/tx_test.go b/tests/systemtests/tx_test.go index 84a0c9fc20c..a1c13201f1b 100644 --- a/tests/systemtests/tx_test.go +++ b/tests/systemtests/tx_test.go @@ -122,7 +122,7 @@ func TestSimulateTx_GRPC(t *testing.T) { require.NoError(t, err) // Check the result and gas used are correct. // - // The 12 events are: + // The 10 events are: // - Sending Fee to the pool: coin_spent, coin_received and transfer // - tx.* events: tx.fee, tx.acc_seq, tx.signature // - Sending Amount to recipient: coin_spent, coin_received and transfer