From 663608ee88e402711f287472be9975d7e249ad8d Mon Sep 17 00:00:00 2001 From: emailtovamos Date: Mon, 10 Jul 2023 15:08:22 +0100 Subject: [PATCH] core: tx_pool changes --- core/tx_pool.go | 209 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 161 insertions(+), 48 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index b5f1d3a2fd..9380a60663 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -17,7 +17,6 @@ package core import ( - "container/heap" "errors" "math" "math/big" @@ -55,6 +54,10 @@ const ( // txReannoMaxNum is the maximum number of transactions a reannounce action can include. txReannoMaxNum = 1024 + + // txWrapDataMax is the maximum size of an ssz encoded BlobTxWrapData (3 4-byte offsets + the + // raw data for each blob, each commitment, and each proof) + txWrapDataMax = 4 + 4 + 4 + params.MaxBlobsPerBlock*(params.FieldElementsPerBlob*32+48+48) ) var ( @@ -90,6 +93,12 @@ var ( // making the transaction invalid, rather a DOS protection. ErrOversizedData = errors.New("oversized data") + // ErrBadWrapData is returned if wrap data is not valid for the transaction + ErrBadWrapData = errors.New("bad wrap data") + + // ErrMissingWrapData is returned if wrap data is missing for the transaction + ErrMissingWrapData = errors.New("missing wrap data") + // ErrFutureReplacePending is returned if a future transaction replaces a pending // transaction. Future transactions should only be able to replace other future transactions. ErrFutureReplacePending = errors.New("future transaction tries to replace pending") @@ -263,10 +272,12 @@ type TxPool struct { istanbul bool // Fork indicator whether we are in the istanbul stage. eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions. eip1559 bool // Fork indicator whether we are using EIP-1559 type transactions. + eip4844 bool // Fork indicator whether we are using EIP-4844 type transactions. - currentState *state.StateDB // Current state in the blockchain head - pendingNonces *txNoncer // Pending state tracking virtual nonces - currentMaxGas uint64 // Current gas limit for transaction caps + currentState *state.StateDB // Current state in the blockchain head + pendingNonces *txNoncer // Pending state tracking virtual nonces + currentMaxGas uint64 // Current gas limit for transaction caps + currentExcessDataGas *big.Int // Current block excess_data_gas locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk @@ -302,22 +313,23 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.LatestSigner(chainconfig), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - initDoneCh: make(chan struct{}), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.LatestSigner(chainconfig), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + initDoneCh: make(chan struct{}), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), + currentExcessDataGas: new(big.Int), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -638,6 +650,9 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). +// +// This does NOT validate wrap-data of the transaction, other than ensuring the +// tx completeness and limited size checks. func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Accept only legacy transactions until EIP-2718/2930 activates. if !pool.eip2718 && tx.Type() != types.LegacyTxType { @@ -647,10 +662,18 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType { return ErrTxTypeNotSupported } + // Reject data blob transactions until data blob EIP activates. + if !pool.eip4844 && tx.Type() == types.BlobTxType { + return ErrTxTypeNotSupported + } // Reject transactions over defined size to prevent DOS attacks if uint64(tx.Size()) > txMaxSize { return ErrOversizedData } + // Reject transactions that have too much wrap data to prevent DOS attacks. + if uint64(tx.WrapDataSize()) > txWrapDataMax { + return ErrOversizedData + } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. if tx.Value().Sign() < 0 { @@ -713,31 +736,35 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { if tx.Gas() < intrGas { return ErrIntrinsicGas } + // TODO: Handle & Check DataGas limits + if tx.IsIncomplete() { + return ErrMissingWrapData + } return nil } -// add validates a transaction and inserts it into the non-executable queue for later -// pending promotion and execution. If the transaction is a replacement for an already -// pending or queued one, it overwrites the previous transaction if its price is higher. -// -// If a newly added transaction is marked as local, its sending account will be -// be added to the allowlist, preventing any associated transaction from being dropped -// out of the pool due to pricing constraints. +// deprecated: use addTxsLocked to add multiple txs at once, batching is encouraged to improve performance. func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { - // If the transaction is already known, discard it - hash := tx.Hash() - if pool.all.Get(hash) != nil { - //log.Trace("Discarding already known transaction", "hash", hash) - knownTxMeter.Mark(1) - return false, ErrAlreadyKnown + // reuse the addTxsLocked staged validation system + txs := []*types.Transaction{tx} + errs := []error{nil} + pool.filterKnownTxsLocked(txs, errs) + pool.filterInvalidTxsLocked(txs, errs, local) + pool.filterInvalidBlobTxsLocked(txs, errs) + if errs[0] == nil { + return pool.addValidTx(tx, local) } - // Make the local flag. If it's from local source or it's from the network but - // the sender is marked as local previously, treat it as the local transaction. - isLocal := local || pool.locals.containsTx(tx) + return false, errs[0] +} + +func (pool *TxPool) addValidTx(tx *types.Transaction, local bool) (replaced bool, err error) { + inLocalPool := pool.locals.containsTx(tx) + isLocal := local || inLocalPool + hash := tx.Hash() // If the transaction fails basic validation, discard it if err := pool.validateTx(tx, isLocal); err != nil { - //log.Trace("Discarding invalid transaction", "hash", hash, "err", err) + log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxMeter.Mark(1) return false, err } @@ -749,7 +776,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it if !isLocal && pool.priced.Underpriced(tx) { - //log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) + log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) return false, ErrUnderpriced } @@ -770,7 +797,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // Special case, we still can't make the room for the new remote one. if !isLocal && !success { - //log.Trace("Discarding overflown transaction", "hash", hash) + log.Trace("Discarding overflown transaction", "hash", hash) overflowedTxMeter.Mark(1) return false, ErrTxPoolOverflow } @@ -788,7 +815,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // Add all transactions back to the priced queue if replacesPending { for _, dropTx := range drop { - heap.Push(&pool.priced.urgent, dropTx) + pool.priced.Put(dropTx, false) } log.Trace("Discarding future transaction replacing pending tx", "hash", hash) return false, ErrFutureReplacePending @@ -797,7 +824,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // Kick out the underpriced remote transactions. for _, tx := range drop { - //log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) + log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) dropped := pool.removeTx(tx.Hash(), false) pool.changesSinceReorg += dropped @@ -822,7 +849,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e pool.priced.Put(tx, isLocal) pool.journalTx(from, tx) pool.queueTxEvent(tx) - //log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) + log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // Successful promotion, bump the heartbeat pool.beats[from] = time.Now() @@ -834,8 +861,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e return false, err } // Mark local addresses and journal local transactions - if local && !pool.locals.contains(from) { - //log.Info("Setting new local account", "address", from) + if local && !inLocalPool { + log.Info("Setting new local account", "address", from) pool.locals.add(from) pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time. } @@ -844,7 +871,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } pool.journalTx(from, tx) - //log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) + log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replaced, nil } @@ -1058,18 +1085,88 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // addTxsLocked attempts to queue a batch of transactions if they are valid. // The transaction pool lock must be held. +// +// It first validates the txs in stages, and then inserts the valid txs into +// the non-executable queue for later pending promotion and execution. +// +// If a transaction is a replacement for an already pending or queued one, +// it overwrites the previous transaction if its price is higher. +// +// If a newly added transaction is marked as local, its sending account will +// be added to the allowlist, preventing any associated transaction from +// being dropped out of the pool due to pricing constraints. func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { - dirty := newAccountSet(pool.signer) + // note: the transaction validation and adding happens in stages, so expensive work can be batched. errs := make([]error, len(txs)) + pool.filterKnownTxsLocked(txs, errs) + pool.filterInvalidTxsLocked(txs, errs, local) + pool.filterInvalidBlobTxsLocked(txs, errs) + dirty := pool.addValidTxsLocked(txs, errs, local) + return errs, dirty +} + +// filterKnownTxsLocked marks all known transactions with ErrAlreadyKnown +func (pool *TxPool) filterKnownTxsLocked(txs []*types.Transaction, errs []error) { + for i, tx := range txs { + if pool.Has(tx.Hash()) { + log.Trace("Discarding already known transaction", "hash", tx.Hash()) + knownTxMeter.Mark(1) + errs[i] = ErrAlreadyKnown + } + } +} + +// filterInvalidTxsLocked marks all invalid txs with respective error, this excludes blob validation +func (pool *TxPool) filterInvalidTxsLocked(txs []*types.Transaction, errs []error, local bool) { + for i, tx := range txs { + if errs[i] != nil { + continue + } + + // Make the local flag. If it's from local source or it's from the network but + // the sender is marked as local previously, treat it as the local transaction. + inLocalPool := pool.locals.containsTx(tx) + isLocal := local || inLocalPool + err := pool.validateTx(tx, isLocal) + if err != nil { + log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err) + invalidTxMeter.Mark(1) + errs[i] = err + } + } +} + +// filterInvalidBlobTxsLocked marks all blob txs (if any) with an error if the blobs or kzg commitments are invalid +func (pool *TxPool) filterInvalidBlobTxsLocked(txs []*types.Transaction, errs []error) { for i, tx := range txs { - replaced, err := pool.add(tx, local) + if errs[i] != nil { + continue + } + // all blobs within the tx can still be batched together + err := tx.VerifyBlobs() + if err != nil { + log.Trace("Discarding blob transaction", "hash", tx.Hash(), "err", err) + invalidTxMeter.Mark(1) + errs[i] = err + } + } +} + +// addValidTxsLocked adds all transactions to the pool that are not marked with an error. +func (pool *TxPool) addValidTxsLocked(txs []*types.Transaction, errs []error, local bool) *accountSet { + dirty := newAccountSet(pool.signer) + for i, tx := range txs { + if errs[i] != nil { + continue + } + replaced, err := pool.addValidTx(tx, local) errs[i] = err if err == nil && !replaced { dirty.addTx(tx) } } validTxMeter.Mark(int64(len(dirty.accounts))) - return errs, dirty + return dirty } // Status returns the status (unknown/pending/queued) of a batch of transactions @@ -1400,6 +1497,17 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) return } + // todo 4844 double check this + // transactions that contained blobs might not have the wrapData anymore + // if not retrieved from block cache since wrap data is not persisted. Discard those. + j := 0 + for _, tx := range discarded { + if !tx.IsIncomplete() { + discarded[j] = tx + j++ + } + } + discarded = discarded[:j] included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) @@ -1422,6 +1530,9 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.currentState = statedb pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit + if newHead.ExcessDataGas != nil { + pool.currentExcessDataGas.Set(newHead.ExcessDataGas) + } // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) @@ -1433,6 +1544,8 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.istanbul = pool.chainconfig.IsIstanbul(next) pool.eip2718 = pool.chainconfig.IsBerlin(next) pool.eip1559 = pool.chainconfig.IsLondon(next) + now := uint64(time.Now().Unix()) + pool.eip4844 = pool.chainconfig.IsCancun(now) } // promoteExecutables moves transactions that have become processable from the