Skip to content

Commit

Permalink
Merge pull request #6640 from multiversx/feat/mempool
Browse files Browse the repository at this point in the history
Mempool improvements
  • Loading branch information
andreibancioiu authored Dec 5, 2024
2 parents 5656a3e + 11fc7c6 commit 7c4667b
Show file tree
Hide file tree
Showing 71 changed files with 1,619 additions and 579 deletions.
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@
[TxDataPool]
Name = "TxDataPool"
Capacity = 600000
SizePerSender = 20000
SizePerSender = 5001
SizeInBytes = 419430400 #400MB
SizeInBytesPerSender = 12288000
SizeInBytesPerSender = 12288000 #12MB
Type = "TxCache"
Shards = 16

Expand Down
1 change: 1 addition & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func attachFileLogger(log logger.Logger, flagsConfig *config.ContextFlagsConfig)
logger.ToggleCorrelation(flagsConfig.EnableLogCorrelation)
logger.ToggleLoggerName(flagsConfig.EnableLogName)
logLevelFlagValue := flagsConfig.LogLevel

err = logger.SetLogLevel(logLevelFlagValue)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byt
)

alarmsToCancel := make([]string, 0)
dbb.mutDataForBroadcast.RLock()
dbb.mutDataForBroadcast.Lock()
for i, broadcastData := range dbb.valHeaderBroadcastData {
samePrevRandSeed := bytes.Equal(broadcastData.header.GetPrevRandSeed(), headerHandler.GetPrevRandSeed())
sameRound := broadcastData.header.GetRound() == headerHandler.GetRound()
Expand All @@ -663,7 +663,7 @@ func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byt
}
}

dbb.mutDataForBroadcast.RUnlock()
dbb.mutDataForBroadcast.Unlock()

for _, alarmID := range alarmsToCancel {
dbb.alarm.Cancel(alarmID)
Expand Down
3 changes: 0 additions & 3 deletions dataRetriever/constants.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package dataRetriever

// TxPoolNumSendersToPreemptivelyEvict instructs tx pool eviction algorithm to remove this many senders when eviction takes place
const TxPoolNumSendersToPreemptivelyEvict = uint32(100)

// UnsignedTxPoolName defines the name of the unsigned transactions pool
const UnsignedTxPoolName = "uTxPool"

Expand Down
3 changes: 0 additions & 3 deletions dataRetriever/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ var ErrCacheConfigInvalidSize = errors.New("cache parameter [size] is not valid,
// ErrCacheConfigInvalidShards signals that the cache parameter "shards" is invalid
var ErrCacheConfigInvalidShards = errors.New("cache parameter [shards] is not valid, it must be a positive number")

// ErrCacheConfigInvalidEconomics signals that an economics parameter required by the cache is invalid
var ErrCacheConfigInvalidEconomics = errors.New("cache-economics parameter is not valid")

// ErrCacheConfigInvalidSharding signals that a sharding parameter required by the cache is invalid
var ErrCacheConfigInvalidSharding = errors.New("cache-sharding parameter is not valid")

Expand Down
6 changes: 5 additions & 1 deletion dataRetriever/factory/dataPoolFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func NewDataPoolFromConfig(args ArgsDataPool) (dataRetriever.PoolsHolder, error)
if check.IfNil(args.ShardCoordinator) {
return nil, dataRetriever.ErrNilShardCoordinator
}
if check.IfNil(args.Marshalizer) {
return nil, dataRetriever.ErrNilMarshalizer
}
if check.IfNil(args.PathManager) {
return nil, dataRetriever.ErrNilPathManager
}
Expand All @@ -61,9 +64,10 @@ func NewDataPoolFromConfig(args ArgsDataPool) (dataRetriever.PoolsHolder, error)

txPool, err := txpool.NewShardedTxPool(txpool.ArgShardedTxPool{
Config: factory.GetCacherFromConfig(mainConfig.TxDataPool),
TxGasHandler: args.EconomicsData,
Marshalizer: args.Marshalizer,
NumberOfShards: args.ShardCoordinator.NumberOfShards(),
SelfShardID: args.ShardCoordinator.SelfId(),
TxGasHandler: args.EconomicsData,
})
if err != nil {
return nil, fmt.Errorf("%w while creating the cache for the transactions", err)
Expand Down
19 changes: 6 additions & 13 deletions dataRetriever/factory/dataPoolFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func TestNewDataPoolFromConfig_MissingDependencyShouldErr(t *testing.T) {
require.Nil(t, holder)
require.Equal(t, dataRetriever.ErrNilShardCoordinator, err)

args = getGoodArgs()
args.Marshalizer = nil
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
require.Equal(t, dataRetriever.ErrNilMarshalizer, err)

args = getGoodArgs()
args.PathManager = nil
holder, err = NewDataPoolFromConfig(args)
Expand Down Expand Up @@ -76,43 +82,30 @@ func TestNewDataPoolFromConfig_BadConfigShouldErr(t *testing.T) {
args.Config.HeadersPoolConfig.MaxHeadersPerShard = 0
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
fmt.Println(err)
require.True(t, errors.Is(err, headersCache.ErrInvalidHeadersCacheParameter))
require.True(t, strings.Contains(err.Error(), "the cache for the headers"))

args = getGoodArgs()
args.Config.TxBlockBodyDataPool.Capacity = 0
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
fmt.Println(err)
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "must provide a positive size while creating the cache for the miniblocks"))

args = getGoodArgs()
args.Config.PeerBlockBodyDataPool.Capacity = 0
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
fmt.Println(err)
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "must provide a positive size while creating the cache for the peer mini block body"))

args = getGoodArgs()
args.Config.TrieSyncStorage.Capacity = 0
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
fmt.Println(err)
require.True(t, errors.Is(err, storage.ErrCacheSizeInvalid))
require.True(t, strings.Contains(err.Error(), "the cache for the trie nodes"))

args = getGoodArgs()
args.Config.TrieSyncStorage.EnableDB = true
args.Config.TrieSyncStorage.DB.Type = "invalid DB type"
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
fmt.Println(err)
require.True(t, errors.Is(err, storage.ErrNotSupportedDBType))
require.True(t, strings.Contains(err.Error(), "the db for the trie nodes"))

args = getGoodArgs()
args.Config.TrieNodesChunksDataPool.Type = "invalid cache type"
holder, err = NewDataPoolFromConfig(args)
Expand Down
32 changes: 19 additions & 13 deletions dataRetriever/requestHandlers/requestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/epochStart"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-logger-go"
logger "github.com/multiversx/mx-chain-logger-go"
)

var _ epochStart.RequestHandler = (*resolverRequestHandler)(nil)
Expand Down Expand Up @@ -571,10 +571,12 @@ func (rrh *resolverRequestHandler) RequestValidatorInfo(hash []byte) {
return
}

epoch := rrh.getEpoch()

log.Debug("requesting validator info messages from network",
"topic", common.ValidatorInfoTopic,
"hash", hash,
"epoch", rrh.epoch,
"epoch", epoch,
)

requester, err := rrh.requestersFinder.MetaChainRequester(common.ValidatorInfoTopic)
Expand All @@ -583,20 +585,20 @@ func (rrh *resolverRequestHandler) RequestValidatorInfo(hash []byte) {
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"hash", hash,
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}

rrh.whiteList.Add([][]byte{hash})

err = requester.RequestDataFromHash(hash, rrh.epoch)
err = requester.RequestDataFromHash(hash, epoch)
if err != nil {
log.Debug("RequestValidatorInfo.RequestDataFromHash",
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"hash", hash,
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand All @@ -611,10 +613,12 @@ func (rrh *resolverRequestHandler) RequestValidatorsInfo(hashes [][]byte) {
return
}

epoch := rrh.getEpoch()

log.Debug("requesting validator info messages from network",
"topic", common.ValidatorInfoTopic,
"num hashes", len(unrequestedHashes),
"epoch", rrh.epoch,
"epoch", epoch,
)

requester, err := rrh.requestersFinder.MetaChainRequester(common.ValidatorInfoTopic)
Expand All @@ -623,7 +627,7 @@ func (rrh *resolverRequestHandler) RequestValidatorsInfo(hashes [][]byte) {
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"num hashes", len(unrequestedHashes),
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand All @@ -636,13 +640,13 @@ func (rrh *resolverRequestHandler) RequestValidatorsInfo(hashes [][]byte) {

rrh.whiteList.Add(unrequestedHashes)

err = validatorInfoRequester.RequestDataFromHashArray(unrequestedHashes, rrh.epoch)
err = validatorInfoRequester.RequestDataFromHashArray(unrequestedHashes, epoch)
if err != nil {
log.Debug("RequestValidatorInfo.RequestDataFromHash",
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"num hashes", len(unrequestedHashes),
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand Down Expand Up @@ -827,11 +831,13 @@ func (rrh *resolverRequestHandler) GetNumPeersToQuery(key string) (int, int, err

// RequestPeerAuthenticationsByHashes asks for peer authentication messages from specific peers hashes
func (rrh *resolverRequestHandler) RequestPeerAuthenticationsByHashes(destShardID uint32, hashes [][]byte) {
epoch := rrh.getEpoch()

log.Debug("requesting peer authentication messages from network",
"topic", common.PeerAuthenticationTopic,
"shard", destShardID,
"num hashes", len(hashes),
"epoch", rrh.epoch,
"epoch", epoch,
)

requester, err := rrh.requestersFinder.MetaChainRequester(common.PeerAuthenticationTopic)
Expand All @@ -840,7 +846,7 @@ func (rrh *resolverRequestHandler) RequestPeerAuthenticationsByHashes(destShardI
"error", err.Error(),
"topic", common.PeerAuthenticationTopic,
"shard", destShardID,
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand All @@ -851,13 +857,13 @@ func (rrh *resolverRequestHandler) RequestPeerAuthenticationsByHashes(destShardI
return
}

err = peerAuthRequester.RequestDataFromHashArray(hashes, rrh.epoch)
err = peerAuthRequester.RequestDataFromHashArray(hashes, epoch)
if err != nil {
log.Debug("RequestPeerAuthenticationsByHashes.RequestDataFromHashArray",
"error", err.Error(),
"topic", common.PeerAuthenticationTopic,
"shard", destShardID,
"epoch", rrh.epoch,
"epoch", epoch,
)
}
}
16 changes: 14 additions & 2 deletions dataRetriever/shardedData/shardedData.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/counting"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/dataRetriever"
Expand Down Expand Up @@ -52,7 +53,7 @@ func NewShardedData(name string, config storageunit.CacheConfig) (*shardedData,
NumChunks: config.Shards,
MaxNumItems: config.Capacity,
MaxNumBytes: uint32(config.SizeInBytes),
NumItemsToPreemptivelyEvict: storage.TxPoolNumTxsToPreemptivelyEvict,
NumItemsToPreemptivelyEvict: storage.ShardedDataNumItemsToPreemptivelyEvict,
}

err := configPrototype.Verify()
Expand Down Expand Up @@ -161,14 +162,25 @@ func (sd *shardedData) RemoveSetOfDataFromPool(keys [][]byte, cacheID string) {
return
}

stopWatch := core.NewStopWatch()
stopWatch.Start("removal")

numRemoved := 0
for _, key := range keys {
if store.cache.RemoveWithResult(key) {
numRemoved++
}
}

log.Trace("shardedData.removeTxBulk()", "name", sd.name, "cacheID", cacheID, "numToRemove", len(keys), "numRemoved", numRemoved)
stopWatch.Stop("removal")

log.Debug("shardedData.removeTxBulk",
"name", sd.name,
"cacheID", cacheID,
"numToRemove", len(keys),
"numRemoved", numRemoved,
"duration", stopWatch.GetMeasurement("removal"),
)
}

// ImmunizeSetOfDataAgainstEviction marks the items as non-evictable
Expand Down
20 changes: 5 additions & 15 deletions dataRetriever/txpool/argShardedTxPool.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package txpool

import (
"encoding/json"
"fmt"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/storage/storageunit"
"github.com/multiversx/mx-chain-go/storage/txcache"
)

// ArgShardedTxPool is the argument for ShardedTxPool's constructor
type ArgShardedTxPool struct {
Config storageunit.CacheConfig
TxGasHandler txcache.TxGasHandler
TxGasHandler txGasHandler
Marshalizer marshal.Marshalizer
NumberOfShards uint32
SelfShardID uint32
}
Expand All @@ -40,22 +40,12 @@ func (args *ArgShardedTxPool) verify() error {
if check.IfNil(args.TxGasHandler) {
return fmt.Errorf("%w: TxGasHandler is not valid", dataRetriever.ErrNilTxGasHandler)
}
if args.TxGasHandler.MinGasPrice() == 0 {
return fmt.Errorf("%w: MinGasPrice is not valid", dataRetriever.ErrCacheConfigInvalidEconomics)
if check.IfNil(args.Marshalizer) {
return fmt.Errorf("%w: Marshalizer is not valid", dataRetriever.ErrNilMarshalizer)
}
if args.NumberOfShards == 0 {
return fmt.Errorf("%w: NumberOfShards is not valid", dataRetriever.ErrCacheConfigInvalidSharding)
}

return nil
}

// String returns a readable representation of the object
func (args *ArgShardedTxPool) String() string {
bytes, err := json.Marshal(args)
if err != nil {
log.Error("ArgShardedTxPool.String()", "err", err)
}

return string(bytes)
}
8 changes: 8 additions & 0 deletions dataRetriever/txpool/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package txpool

import (
"math/big"

"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-go/storage"
"github.com/multiversx/mx-chain-go/storage/txcache"
)
Expand All @@ -17,3 +20,8 @@ type txCache interface {
Diagnose(deep bool)
GetTransactionsPoolForSender(sender string) []*txcache.WrappedTransaction
}

type txGasHandler interface {
ComputeTxFee(tx data.TransactionWithFeeHandler) *big.Int
IsInterfaceNil() bool
}
Loading

0 comments on commit 7c4667b

Please sign in to comment.