Skip to content

Commit

Permalink
core: tx_pool changes
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos committed Jul 10, 2023
1 parent ed0ed98 commit 663608e
Showing 1 changed file with 161 additions and 48 deletions.
209 changes: 161 additions & 48 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package core

import (
"container/heap"
"errors"
"math"
"math/big"
Expand Down Expand Up @@ -55,6 +54,10 @@ const (

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024

// txWrapDataMax is the maximum size of an ssz encoded BlobTxWrapData (3 4-byte offsets + the
// raw data for each blob, each commitment, and each proof)
txWrapDataMax = 4 + 4 + 4 + params.MaxBlobsPerBlock*(params.FieldElementsPerBlob*32+48+48)
)

var (
Expand Down Expand Up @@ -90,6 +93,12 @@ var (
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")

// ErrBadWrapData is returned if wrap data is not valid for the transaction
ErrBadWrapData = errors.New("bad wrap data")

// ErrMissingWrapData is returned if wrap data is missing for the transaction
ErrMissingWrapData = errors.New("missing wrap data")

// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")
Expand Down Expand Up @@ -263,10 +272,12 @@ type TxPool struct {
istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 bool // Fork indicator whether we are using EIP-1559 type transactions.
eip4844 bool // Fork indicator whether we are using EIP-4844 type transactions.

currentState *state.StateDB // Current state in the blockchain head
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
currentExcessDataGas *big.Int // Current block excess_data_gas

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
Expand Down Expand Up @@ -302,22 +313,23 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block

// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
currentExcessDataGas: new(big.Int),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -638,6 +650,9 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
//
// This does NOT validate wrap-data of the transaction, other than ensuring the
// tx completeness and limited size checks.
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
Expand All @@ -647,10 +662,18 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
return ErrTxTypeNotSupported
}
// Reject data blob transactions until data blob EIP activates.
if !pool.eip4844 && tx.Type() == types.BlobTxType {
return ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if uint64(tx.Size()) > txMaxSize {
return ErrOversizedData
}
// Reject transactions that have too much wrap data to prevent DOS attacks.
if uint64(tx.WrapDataSize()) > txWrapDataMax {
return ErrOversizedData
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
Expand Down Expand Up @@ -713,31 +736,35 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
// TODO: Handle & Check DataGas limits
if tx.IsIncomplete() {
return ErrMissingWrapData
}
return nil
}

// add validates a transaction and inserts it into the non-executable queue for later
// pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
// be added to the allowlist, preventing any associated transaction from being dropped
// out of the pool due to pricing constraints.
// deprecated: use addTxsLocked to add multiple txs at once, batching is encouraged to improve performance.
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
//log.Trace("Discarding already known transaction", "hash", hash)
knownTxMeter.Mark(1)
return false, ErrAlreadyKnown
// reuse the addTxsLocked staged validation system
txs := []*types.Transaction{tx}
errs := []error{nil}
pool.filterKnownTxsLocked(txs, errs)
pool.filterInvalidTxsLocked(txs, errs, local)
pool.filterInvalidBlobTxsLocked(txs, errs)
if errs[0] == nil {
return pool.addValidTx(tx, local)
}
// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
return false, errs[0]
}

func (pool *TxPool) addValidTx(tx *types.Transaction, local bool) (replaced bool, err error) {
inLocalPool := pool.locals.containsTx(tx)
isLocal := local || inLocalPool
hash := tx.Hash()

// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, isLocal); err != nil {
//log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1)
return false, err
}
Expand All @@ -749,7 +776,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
//log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}
Expand All @@ -770,7 +797,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e

// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
//log.Trace("Discarding overflown transaction", "hash", hash)
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
Expand All @@ -788,7 +815,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// Add all transactions back to the priced queue
if replacesPending {
for _, dropTx := range drop {
heap.Push(&pool.priced.urgent, dropTx)
pool.priced.Put(dropTx, false)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
return false, ErrFutureReplacePending
Expand All @@ -797,7 +824,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e

// Kick out the underpriced remote transactions.
for _, tx := range drop {
//log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
dropped := pool.removeTx(tx.Hash(), false)
pool.changesSinceReorg += dropped
Expand All @@ -822,7 +849,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
//log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
Expand All @@ -834,8 +861,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
return false, err
}
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
//log.Info("Setting new local account", "address", from)
if local && !inLocalPool {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
Expand All @@ -844,7 +871,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
}
pool.journalTx(from, tx)

//log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
}

Expand Down Expand Up @@ -1058,18 +1085,88 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {

// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
//
// It first validates the txs in stages, and then inserts the valid txs into
// the non-executable queue for later pending promotion and execution.
//
// If a transaction is a replacement for an already pending or queued one,
// it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will
// be added to the allowlist, preventing any associated transaction from
// being dropped out of the pool due to pricing constraints.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
// note: the transaction validation and adding happens in stages, so expensive work can be batched.
errs := make([]error, len(txs))
pool.filterKnownTxsLocked(txs, errs)
pool.filterInvalidTxsLocked(txs, errs, local)
pool.filterInvalidBlobTxsLocked(txs, errs)
dirty := pool.addValidTxsLocked(txs, errs, local)
return errs, dirty
}

// filterKnownTxsLocked marks all known transactions with ErrAlreadyKnown
func (pool *TxPool) filterKnownTxsLocked(txs []*types.Transaction, errs []error) {
for i, tx := range txs {
if pool.Has(tx.Hash()) {
log.Trace("Discarding already known transaction", "hash", tx.Hash())
knownTxMeter.Mark(1)
errs[i] = ErrAlreadyKnown
}
}
}

// filterInvalidTxsLocked marks all invalid txs with respective error, this excludes blob validation
func (pool *TxPool) filterInvalidTxsLocked(txs []*types.Transaction, errs []error, local bool) {
for i, tx := range txs {
if errs[i] != nil {
continue
}

// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
inLocalPool := pool.locals.containsTx(tx)
isLocal := local || inLocalPool
err := pool.validateTx(tx, isLocal)
if err != nil {
log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1)
errs[i] = err
}
}
}

// filterInvalidBlobTxsLocked marks all blob txs (if any) with an error if the blobs or kzg commitments are invalid
func (pool *TxPool) filterInvalidBlobTxsLocked(txs []*types.Transaction, errs []error) {
for i, tx := range txs {
replaced, err := pool.add(tx, local)
if errs[i] != nil {
continue
}
// all blobs within the tx can still be batched together
err := tx.VerifyBlobs()
if err != nil {
log.Trace("Discarding blob transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1)
errs[i] = err
}
}
}

// addValidTxsLocked adds all transactions to the pool that are not marked with an error.
func (pool *TxPool) addValidTxsLocked(txs []*types.Transaction, errs []error, local bool) *accountSet {
dirty := newAccountSet(pool.signer)
for i, tx := range txs {
if errs[i] != nil {
continue
}
replaced, err := pool.addValidTx(tx, local)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
}
}
validTxMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty
return dirty
}

// Status returns the status (unknown/pending/queued) of a batch of transactions
Expand Down Expand Up @@ -1400,6 +1497,17 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
// todo 4844 double check this
// transactions that contained blobs might not have the wrapData anymore
// if not retrieved from block cache since wrap data is not persisted. Discard those.
j := 0
for _, tx := range discarded {
if !tx.IsIncomplete() {
discarded[j] = tx
j++
}
}
discarded = discarded[:j]
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
Expand All @@ -1422,6 +1530,9 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.currentState = statedb
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit
if newHead.ExcessDataGas != nil {
pool.currentExcessDataGas.Set(newHead.ExcessDataGas)
}

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
Expand All @@ -1433,6 +1544,8 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.istanbul = pool.chainconfig.IsIstanbul(next)
pool.eip2718 = pool.chainconfig.IsBerlin(next)
pool.eip1559 = pool.chainconfig.IsLondon(next)
now := uint64(time.Now().Unix())
pool.eip4844 = pool.chainconfig.IsCancun(now)
}

// promoteExecutables moves transactions that have become processable from the
Expand Down

0 comments on commit 663608e

Please sign in to comment.