Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

protecccc validators bls #1473

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cosmos/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ func readConfigFromAppOptsParser(parser AppOptionsParser) (*Config, error) {
return nil, err
}

if conf.Polar.IsValidator, err = parser.GetBool(flags.IsValidator); err != nil {
return nil, err
}

if conf.Polar.ValidatorJSONRPCEndpoint, err =
parser.GetString(flags.ValidatorJSONRPCEndpoint); err != nil {
return nil, err
}

if conf.Polar.ForceForwardReCheckTxs, err = parser.GetBool(
flags.ForceForwardReCheckTxs); err != nil {
return nil, err
}

// Polar Miner settings
if conf.Polar.Miner.Etherbase, err =
parser.GetCommonAddress(flags.MinerEtherbase); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions cosmos/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ const (
OptimisticExecution = "polaris.optimistic-execution"

// Polar Root.
RPCEvmTimeout = "polaris.polar.rpc-evm-timeout"
RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap"
RPCGasCap = "polaris.polar.rpc-gas-cap"
RPCEvmTimeout = "polaris.polar.rpc-evm-timeout"
RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap"
RPCGasCap = "polaris.polar.rpc-gas-cap"
IsValidator = "polaris.polar.is-validator"
ValidatorJSONRPCEndpoint = "polaris.polar.validator-jsonrpc-endpoint"
ForceForwardReCheckTxs = "polaris.polar.force-forward-recheck-txs"
itsdevbear marked this conversation as resolved.
Show resolved Hide resolved

// Miner.
MinerEtherbase = "polaris.polar.miner.etherbase"
Expand Down
9 changes: 9 additions & 0 deletions cosmos/config/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ rpc-evm-timeout = "{{ .Polaris.Polar.RPCEVMTimeout }}"
# Transaction fee cap for RPC requests
rpc-tx-fee-cap = "{{ .Polaris.Polar.RPCTxFeeCap }}"

# JSON-RPC endpoint for forwarding ethereum transactions directly to validators.
validator-jsonrpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}"

# Whether the node is a validator
is-validator = {{ .Polaris.Polar.IsValidator }}

# If we want to force forwarding on ReCheckTxs
force-forward-recheck-txs = {{ .Polaris.Polar.ForceForwardReCheckTxs }}

# Chain config
[polaris.polar.chain]
chain-id = "{{ .Polaris.Polar.Chain.ChainID }}"
Expand Down
9 changes: 7 additions & 2 deletions cosmos/runtime/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import (
type Provider struct {
evmAnteHandler sdk.AnteHandler // Ante handler for EVM transactions
cosmosAnteHandler sdk.AnteHandler // Ante handler for Cosmos transactions
isValidator bool
}

// NewAnteHandler creates a new Provider with a mempool and Cosmos ante handler.
// It sets up the EVM ante handler with the necessary decorators.
func NewAnteHandler(
mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler,
mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler, isValidator bool,
) *Provider {
evmAnteDecorators := []sdk.AnteDecorator{
ante.NewSetUpContextDecorator(), // Set up the context decorator for the EVM ante handler
Expand All @@ -55,6 +56,7 @@ func NewAnteHandler(
return &Provider{
evmAnteHandler: sdk.ChainAnteDecorators(evmAnteDecorators...),
cosmosAnteHandler: cosmosAnteHandler,
isValidator: isValidator,
}
}

Expand All @@ -65,8 +67,11 @@ func (ah *Provider) AnteHandler() func(
) (sdk.Context, error) {
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
// If the transaction contains a single EVM transaction, use the EVM ante handler
if len(tx.GetMsgs()) == 1 {
if len(tx.GetMsgs()) == 1 { //nolint:nestif // todo:fix.
if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ah.isValidator {
return ctx, errors.New("validator cannot accept EVM from comet")
}
return ah.evmAnteHandler(ctx, tx, simulate)
} else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok {
if ctx.ExecMode() != sdk.ExecModeCheck {
Expand Down
13 changes: 10 additions & 3 deletions cosmos/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Polaris struct {
// into the txpool are happening during this process. The mempool object then read locks for
// adding transactions into the txpool.
blockBuilderMu sync.RWMutex

cfg *eth.Config
}

// New creates a new Polaris runtime from the provided dependencies.
Expand All @@ -105,6 +107,7 @@ func New(
var err error
p := &Polaris{
logger: logger,
cfg: cfg,
}

ctx := sdk.Context{}.
Expand All @@ -124,11 +127,15 @@ func New(

priceLimit := big.NewInt(0).SetUint64(cfg.Polar.LegacyTxPool.PriceLimit)
p.WrappedTxPool = txpool.New(
p.logger,
p.ExecutionLayer.Backend().Blockchain(),
p.ExecutionLayer.Backend().TxPool(),
int64(cfg.Polar.LegacyTxPool.Lifetime),
cfg.Polar.LegacyTxPool.Lifetime,
&p.blockBuilderMu,
priceLimit,
p.cfg.Polar.IsValidator,
p.cfg.Polar.ForceForwardReCheckTxs,
p.cfg.Polar.ValidatorJSONRPCEndpoint,
)

return p
Expand Down Expand Up @@ -163,7 +170,7 @@ func (p *Polaris) Build(
}

app.SetAnteHandler(
antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler).AnteHandler(),
antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler, p.cfg.Polar.IsValidator).AnteHandler(),
)

return nil
Expand All @@ -177,7 +184,7 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error {
clientCtx.TxConfig, evmtypes.WrapPayload))

// Initialize the txpool with a new transaction serializer.
p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction](
p.WrappedTxPool.Init(clientCtx, libtx.NewSerializer[*ethtypes.Transaction](
clientCtx.TxConfig, evmtypes.WrapTx))

// Register services with Polaris.
Expand Down
65 changes: 28 additions & 37 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,25 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
)

// AnteHandle implements sdk.AnteHandler.
// It is used to determine whether transactions should be ejected
// from the comet mempool.
// AnteHandle implements sdk.AnteHandler. It is used to determine whether transactions should be
// ejected from the comet mempool.
func (m *Mempool) AnteHandle(
ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler,
) (sdk.Context, error) {
// The transaction put into this function by CheckTx
// is a transaction from CometBFT mempool.
// The transaction put into this function by CheckTx is a transaction from CometBFT mempool.
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

// TODO: Record the time it takes to build a payload.

// We only want to eject transactions from comet on recheck.
if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok {
ethTx := wet.Unwrap()
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
); shouldEject {
if shouldEject := m.shouldEjectFromCometMempool(ethTx); shouldEject {
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
} else if ctx.ExecMode() == sdk.ExecModeReCheck && m.forceBroadcastOnRecheck {
// We optionally force a re-broadcast.
m.ForwardToValidator(ethTx)
}
}
}
Expand All @@ -63,51 +60,45 @@ func (m *Mempool) AnteHandle(

// shouldEject returns true if the transaction should be ejected from the CometBFT mempool.
func (m *Mempool) shouldEjectFromCometMempool(
currentTime int64, tx *ethtypes.Transaction,
tx *ethtypes.Transaction,
) bool {
defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject)
if tx == nil {
return false
}

// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
return m.isInvalidLocal(tx) || m.includedCanonicalChain(tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
txHash := tx.Hash()
// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
// isInvalidLocal returns whether the tx is invalid for the local node's mempool settings.
func (m *Mempool) isInvalidLocal(tx *ethtypes.Transaction) bool {
// 1. If the transaction's gas params are less than or equal to the configured limit.
priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0
if priceLeLimit {
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

// If the transaction was not seen before, it is valid for now.
timeFirstSeen, notSeen := m.crc.TimeFirstSeen(tx.Hash())
if !notSeen {
return false
}

// 2. If the transaction has been in the mempool for longer than the configured lifetime.
expired := time.Since(timeFirstSeen) > m.lifetime
if expired {
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx)
}
if priceLeLimit {
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

return expired || priceLeLimit
return priceLeLimit || expired
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
func (m *Mempool) includedCanonicalChain(tx *ethtypes.Transaction) bool {
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
if included {
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
}
return included
}
14 changes: 6 additions & 8 deletions cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,17 @@ const (
type CometRemoteCache interface {
IsRemoteTx(txHash common.Hash) bool
MarkRemoteSeen(txHash common.Hash) bool
TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp
TimeFirstSeen(txHash common.Hash) (time.Time, bool)
}

// Thread-safe implementation of CometRemoteCache.
type cometRemoteCache struct {
timeInserted *lru.Cache[common.Hash, int64]
timeInserted *lru.Cache[common.Hash, time.Time]
}

func newCometRemoteCache() *cometRemoteCache {
return &cometRemoteCache{

timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize),
timeInserted: lru.NewCache[common.Hash, time.Time](defaultCacheSize),
}
}

Expand All @@ -58,13 +57,12 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
// Record the time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool {
if !crc.timeInserted.Contains(txHash) {
crc.timeInserted.Add(txHash, time.Now().Unix())
crc.timeInserted.Add(txHash, time.Now())
return true
}
return false
}

func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 {
i, _ := crc.timeInserted.Get(txHash)
return i
func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) (time.Time, bool) {
return crc.timeInserted.Get(txHash)
}
28 changes: 19 additions & 9 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,35 @@ type handler struct {

// Queue for failed transactions
failedTxs chan *failedTx

isValidator bool
}

// newHandler creates a new handler.
func newHandler(
clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer,
crc CometRemoteCache, logger log.Logger,
crc CometRemoteCache, logger log.Logger, isValidator bool,
) *handler {
h := &handler{
logger: logger,
clientCtx: clientCtx,
serializer: serializer,
crc: crc,
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
logger: logger,
clientCtx: clientCtx,
serializer: serializer,
crc: crc,
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
isValidator: isValidator,
}
return h
}

// Start starts the handler.
func (h *handler) Start() error {
if h.isValidator {
return nil
}

if h.running.Load() {
return errors.New("handler already started")
}
Expand All @@ -129,6 +136,9 @@ func (h *handler) Start() error {

// Stop stops the handler.
func (h *handler) Stop() error {
if h.isValidator {
return nil
}
if !h.Running() {
return errors.New("handler already stopped")
}
Expand Down
4 changes: 3 additions & 1 deletion cosmos/runtime/txpool/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ var _ = Describe("", func() {
subprovider = mocks.NewTxSubProvider(t)
subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription)
serializer = mocks.NewTxSerializer(t)
h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t))
h = newHandler(
broadcaster, subprovider, serializer,
newCometRemoteCache(), log.NewTestLogger(t), false)
err := h.Start()
Expect(err).NotTo(HaveOccurred())
for !h.Running() {
Expand Down
Loading
Loading