Skip to content

Commit

Permalink
Fix: mempool deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Oct 15, 2021
1 parent 393647c commit fdd044a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 128 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"version": "0.2.0",
"configurations": [
{
"name": "Launch Indexer",
"name": "Launch Mempool",
"type": "go",
"request": "launch",
"mode": "debug",
Expand Down
21 changes: 21 additions & 0 deletions cmd/mempool/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
)

func (indexer *Indexer) handleBlock(block tzkt.BlockMessage) error {
if err := indexer.processOldOperations(); err != nil {
return err
}

switch block.Type {
case events.MessageTypeState:
if indexer.state.Level < block.Level {
Expand Down Expand Up @@ -48,6 +52,23 @@ func (indexer *Indexer) handleBlock(block tzkt.BlockMessage) error {
return indexer.branches.Add(block)
}

func (indexer *Indexer) processOldOperations() error {
return indexer.db.Transaction(func(tx *gorm.DB) error {
if err := models.DeleteOldOperations(tx, indexer.keepInChain, models.StatusInChain, indexer.filters.Kinds...); err != nil {
return errors.Wrap(err, "DeleteOldOperations in_chain")
}
if err := models.DeleteOldOperations(tx, indexer.keepOperations, "", indexer.filters.Kinds...); err != nil {
return errors.Wrap(err, "DeleteOldOperations")
}
if indexer.hasManager {
if err := models.DeleteOldGasStats(tx, indexer.gasStatsLifetime); err != nil {
return errors.Wrap(err, "DeleteOldGasStats")
}
}
return nil
})
}

func (indexer *Indexer) handleInChain(operations tzkt.OperationMessage) error {
return indexer.db.Transaction(func(tx *gorm.DB) error {
operations.Hash.Range(func(_, operation interface{}) bool {
Expand Down
61 changes: 30 additions & 31 deletions cmd/mempool/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@ import (

// Indexer -
type Indexer struct {
db *gorm.DB
tzkt *tzkt.TzKT
mempool *receiver.Receiver
manager *Manager
branches *BlockQueue
cache *ccache.Cache
delegates *CachedDelegates
state state.State
filters config.Filters
network string
indexName string
chainID string
stop chan struct{}
threadsCount int
hasManager bool
db *gorm.DB
tzkt *tzkt.TzKT
mempool *receiver.Receiver
branches *BlockQueue
cache *ccache.Cache
delegates *CachedDelegates
state state.State
filters config.Filters
network string
indexName string
chainID string
keepInChain uint64
keepOperations uint64
gasStatsLifetime uint64
stop chan struct{}
threadsCount int
hasManager bool

wg sync.WaitGroup
}
Expand Down Expand Up @@ -80,15 +82,18 @@ func NewIndexer(network string, indexerCfg config.Indexer, database generalConfi
}

indexer := &Indexer{
db: db,
network: network,
chainID: head.ChainID,
indexName: models.MempoolIndexName(network),
filters: indexerCfg.Filters,
tzkt: tzkt.NewTzKT(indexerCfg.DataSource.Tzkt, indexerCfg.Filters.Accounts, indexerCfg.Filters.Kinds),
mempool: memInd,
cache: ccache.New(ccache.Configure().MaxSize(2 ^ 13)),
threadsCount: 1,
db: db,
network: network,
chainID: head.ChainID,
indexName: models.MempoolIndexName(network),
filters: indexerCfg.Filters,
tzkt: tzkt.NewTzKT(indexerCfg.DataSource.Tzkt, indexerCfg.Filters.Accounts, indexerCfg.Filters.Kinds),
mempool: memInd,
cache: ccache.New(ccache.Configure().MaxSize(2 ^ 13)),
threadsCount: 1,
keepInChain: uint64(constants.TimeBetweenBlocks[0]) * settings.KeepInChainBlocks,
keepOperations: uint64(constants.TimeBetweenBlocks[0]) * settings.ExpiredAfter,
gasStatsLifetime: settings.GasStatsLifetime,
}

indexer.state = state.State{
Expand All @@ -103,7 +108,6 @@ func NewIndexer(network string, indexerCfg config.Indexer, database generalConfi
break
}
}
indexer.manager = NewManager(db, settings, uint64(constants.TimeBetweenBlocks[0]), indexer.hasManager, indexerCfg.Filters.Kinds...)
indexer.branches = newBlockQueue(expiredAfter, indexer.onPopBlockQueue, indexer.onRollbackBlockQueue)

for _, kind := range indexer.filters.Kinds {
Expand Down Expand Up @@ -146,8 +150,7 @@ func (indexer *Indexer) Start() error {
return err
}

go indexer.mempool.Start()
indexer.manager.Start()
indexer.mempool.Start()

return nil
}
Expand Down Expand Up @@ -184,10 +187,6 @@ func (indexer *Indexer) Close() error {
}
indexer.wg.Wait()

if err := indexer.manager.Close(); err != nil {
return err
}

if err := indexer.tzkt.Close(); err != nil {
return err
}
Expand Down
93 changes: 0 additions & 93 deletions cmd/mempool/manager.go

This file was deleted.

2 changes: 2 additions & 0 deletions cmd/mempool/tzkt/tzkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ func (tzkt *TzKT) getTableData(table *tableState, indexerState, headLevel uint64
return getOperations(table, filters, tzkt.api.GetReveals)
case api.KindTransaction:
return getOperations(table, filters, tzkt.api.GetTransactions)
case api.KindRegisterGlobalConstant:
return getOperations(table, filters, tzkt.api.GetRegisterConstants)
default:
return errors.Wrap(ErrUnknownOperationKind, table.Table)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/btcsuite/btcutil v1.0.2
github.com/dipdup-net/go-lib v0.1.31
github.com/dipdup-net/go-lib v0.1.32
github.com/json-iterator/go v1.1.12 // indirect
github.com/karlseguin/ccache v2.0.3+incompatible
github.com/karlseguin/expect v1.0.8 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/denisenkom/go-mssqldb v0.10.0 h1:QykgLZBorFE95+gO3u9esLd0BmbvpWp0/waNNZfHBM8=
github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dipdup-net/go-lib v0.1.31 h1:+m2W74TfC5SYIN13obXvm9CUx6aQ4ZDl3QBheHrgXU4=
github.com/dipdup-net/go-lib v0.1.31/go.mod h1:i3p/qLTxQ43aiHxlg30FTLeysryjPlk5sT8fY3PxgUY=
github.com/dipdup-net/go-lib v0.1.32 h1:BQZK0sECtpL8wz7xiuunrNQaB2OEzJ/jA11O/QC7UmE=
github.com/dipdup-net/go-lib v0.1.32/go.mod h1:i3p/qLTxQ43aiHxlg30FTLeysryjPlk5sT8fY3PxgUY=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
Expand Down

0 comments on commit fdd044a

Please sign in to comment.