Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

fix(mempool): Mark remote txs correctly #1471

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cosmos/runtime/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ func (ah *Provider) AnteHandler() func(
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
// If the transaction contains a single EVM transaction, use the EVM ante handler
if len(tx.GetMsgs()) == 1 {
if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ethTx, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
ctx.Logger().Info("running evm ante handler for eth tx", "hash", ethTx.Unwrap().Hash())
return ah.evmAnteHandler(ctx, tx, simulate)
} else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok {
if ctx.ExecMode() != sdk.ExecModeCheck {
ctx.Logger().Info("running evm ante handler for payload tx")
return ctx, nil
}
ctx.Logger().Error("running evm ante handler for payload tx in check tx")
return ctx, errors.New("payload envelope is not supported in CheckTx")
}
}
Expand Down
7 changes: 5 additions & 2 deletions cosmos/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error {
clientCtx.TxConfig, evmtypes.WrapPayload))

// Initialize the txpool with a new transaction serializer.
p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction](
clientCtx.TxConfig, evmtypes.WrapTx))
p.WrappedTxPool.Init(
p.logger, clientCtx,
libtx.NewSerializer[*ethtypes.Transaction](clientCtx.TxConfig, evmtypes.WrapTx),
clientCtx.Client,
)

// Register services with Polaris.
p.RegisterLifecycles([]node.Lifecycle{
Expand Down
28 changes: 21 additions & 7 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,48 +44,60 @@ func (m *Mempool) AnteHandle(
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate)

// TODO: Record the time it takes to build a payload.

// We only want to eject transactions from comet on recheck.
if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
ctx.Logger().Info("AnteHandle in Check/Recheck tx")
if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok {
ethTx := wet.Unwrap()
ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash(), "mode", ctx.ExecMode())
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
ctx, ethTx,
); shouldEject {
ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash())
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash())
}
}
return next(ctx, tx, simulate)
}

// shouldEject returns true if the transaction should be ejected from the CometBFT mempool.
func (m *Mempool) shouldEjectFromCometMempool(
currentTime int64, tx *ethtypes.Transaction,
ctx sdk.Context, tx *ethtypes.Transaction,
) bool {
defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject)
if tx == nil {
ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil")
return false
}

// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
if m.validateStateless(ctx, tx) {
ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash())
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
return m.validateStateful(ctx, tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool {
txHash := tx.Hash()
currentTime := ctx.BlockTime().Unix()
ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime)

// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
ctx.Logger().Info("validateStateless", "currentTime", currentTime, "timeFirstSeen", m.crc.TimeFirstSeen(txHash), "expired", expired)
priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0

if expired {
Expand All @@ -95,20 +107,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64)
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit)

return expired || priceLeLimit
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
ctx.Logger().Info("validateStateful", "included", included)
return included
}
3 changes: 2 additions & 1 deletion cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
return ok
}

// Record the time the tx was inserted from Comet successfully.
// Record the first time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
// TODO: only insert a new timestamp if not already seen.
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
}
Expand Down
33 changes: 27 additions & 6 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package txpool

import (
"context"
"errors"
"sync/atomic"
"time"

coretypes "github.com/cometbft/cometbft/rpc/core/types"

"cosmossdk.io/log"

"github.com/cosmos/cosmos-sdk/telemetry"
Expand All @@ -40,7 +43,7 @@
// size of tx pool.
const (
txChanSize = 4096
maxRetries = 5
maxRetries = 0
retryDelay = 50 * time.Millisecond
statPeriod = 60 * time.Second
)
Expand All @@ -67,6 +70,16 @@
BroadcastTxSync(txBytes []byte) (res *sdk.TxResponse, err error)
}

type TxSearcher interface {
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error)
}

// Subscription represents a subscription to the txpool.
type Subscription interface {
event.Subscription
Expand All @@ -85,6 +98,7 @@
logger log.Logger
clientCtx TxBroadcaster
serializer TxSerializer
searcher TxSearcher
crc CometRemoteCache

// Ethereum
Expand All @@ -100,13 +114,14 @@

// newHandler creates a new handler.
func newHandler(
clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer,
clientCtx TxBroadcaster, txSearcher TxSearcher, txPool TxSubProvider, serializer TxSerializer,
crc CometRemoteCache, logger log.Logger,
) *handler {
h := &handler{
logger: logger,
clientCtx: clientCtx,
serializer: serializer,
searcher: txSearcher,
crc: crc,
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
Expand All @@ -122,7 +137,7 @@
return errors.New("handler already started")
}
go h.mainLoop()
go h.failedLoop() // Start the retry policy
// go h.failedLoop() // Start the retry policy
go h.statLoop()
return nil
}
Expand Down Expand Up @@ -162,7 +177,7 @@
}

// failedLoop will periodically attempt to re-broadcast failed transactions.
func (h *handler) failedLoop() {

Check failure on line 180 in cosmos/runtime/txpool/handler.go

View workflow job for this annotation

GitHub Actions / ci (lint, polaris-linux-latest, 1.21.6)

func `(*handler).failedLoop` is unused (unused)
for {
select {
case <-h.stopCh:
Expand All @@ -172,6 +187,7 @@
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries)
telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry)
h.broadcastTransaction(failed.tx, failed.retries-1)
}
Expand Down Expand Up @@ -225,11 +241,12 @@
numBroadcasted := 0
for _, signedEthTx := range txs {
if !h.crc.IsRemoteTx(signedEthTx.Hash()) {
h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex())
h.broadcastTransaction(signedEthTx, maxRetries)
numBroadcasted++
}
}
h.logger.Debug(
h.logger.Info(
"broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted,
)
}
Expand All @@ -242,6 +259,8 @@
return
}

h.logger.Info("broadcasting to comet", "ethTx", tx.Hash(), "sdkTx", txBytes)

// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)
Expand All @@ -254,21 +273,23 @@
// If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually
// the desired behaviour.
if rsp == nil || rsp.Code == 0 || rsp.Code == 1 {
h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code)
return
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyMempoolFull)
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
case sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure)
}

h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries)

h.failedTxs <- &failedTx{tx: tx, retries: retries}
}
2 changes: 1 addition & 1 deletion cosmos/runtime/txpool/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var _ = Describe("", func() {
subprovider = mocks.NewTxSubProvider(t)
subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription)
serializer = mocks.NewTxSerializer(t)
h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t))
h = newHandler(broadcaster, nil, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t))
err := h.Start()
Expect(err).NotTo(HaveOccurred())
for !h.Running() {
Expand Down
27 changes: 19 additions & 8 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"math/big"
"sync"
"time"

"cosmossdk.io/log"

Expand Down Expand Up @@ -92,8 +93,9 @@ func (m *Mempool) Init(
logger log.Logger,
txBroadcaster TxBroadcaster,
txSerializer TxSerializer,
txSearcher TxSearcher,
) {
m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger)
m.handler = newHandler(txBroadcaster, txSearcher, m.TxPool, txSerializer, m.crc, logger)
}

// Start starts the Mempool TxHandler.
Expand All @@ -111,12 +113,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
sCtx := sdk.UnwrapSDKContext(ctx)
msgs := sdkTx.GetMsgs()
if len(msgs) != 1 {
sCtx.Logger().Error("mempool insert: only one message is supported")
return errors.New("only one message is supported")
}

wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0])
if !ok {
// We have to return nil for non-ethereum transactions as to not fail check-tx.
sCtx.Logger().Info("mempool insert: not an ethereum transaction")
return nil
}

Expand All @@ -127,17 +131,24 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
m.blockBuilderMu.RLock()
errs := m.TxPool.Add([]*ethtypes.Transaction{ethTx}, false, false)
m.blockBuilderMu.RUnlock()
if len(errs) > 0 {
// Handle case where a node broadcasts to itself, we don't want it to fail CheckTx.
if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) &&
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
return nil
}

// Handle case where a node broadcasts to itself, we don't want it to fail CheckTx.
// Note: it's safe to check errs[0] because geth returns `errs` of length 1.
if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) &&
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())
return nil
}
if errs[0] != nil {
return errs[0]
}

// Add the eth tx to the remote cache.
sCtx.Logger().Info(
"mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(),
"is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()),
)
m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
Expand Down
Loading