From fdd044a50dfee36c2f96d18a123db2a97f4bd7eb Mon Sep 17 00:00:00 2001 From: Artem Date: Fri, 15 Oct 2021 13:24:31 +0300 Subject: [PATCH] Fix: mempool deadlock --- .vscode/launch.json | 2 +- cmd/mempool/handlers.go | 21 +++++++++ cmd/mempool/indexer.go | 61 +++++++++++++------------- cmd/mempool/manager.go | 93 ---------------------------------------- cmd/mempool/tzkt/tzkt.go | 2 + go.mod | 2 +- go.sum | 4 +- 7 files changed, 57 insertions(+), 128 deletions(-) delete mode 100644 cmd/mempool/manager.go diff --git a/.vscode/launch.json b/.vscode/launch.json index 766f8fe..42a6174 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,7 +5,7 @@ "version": "0.2.0", "configurations": [ { - "name": "Launch Indexer", + "name": "Launch Mempool", "type": "go", "request": "launch", "mode": "debug", diff --git a/cmd/mempool/handlers.go b/cmd/mempool/handlers.go index 1d8fdfa..f6cd2f4 100644 --- a/cmd/mempool/handlers.go +++ b/cmd/mempool/handlers.go @@ -18,6 +18,10 @@ import ( ) func (indexer *Indexer) handleBlock(block tzkt.BlockMessage) error { + if err := indexer.processOldOperations(); err != nil { + return err + } + switch block.Type { case events.MessageTypeState: if indexer.state.Level < block.Level { @@ -48,6 +52,23 @@ func (indexer *Indexer) handleBlock(block tzkt.BlockMessage) error { return indexer.branches.Add(block) } +func (indexer *Indexer) processOldOperations() error { + return indexer.db.Transaction(func(tx *gorm.DB) error { + if err := models.DeleteOldOperations(tx, indexer.keepInChain, models.StatusInChain, indexer.filters.Kinds...); err != nil { + return errors.Wrap(err, "DeleteOldOperations in_chain") + } + if err := models.DeleteOldOperations(tx, indexer.keepOperations, "", indexer.filters.Kinds...); err != nil { + return errors.Wrap(err, "DeleteOldOperations") + } + if indexer.hasManager { + if err := models.DeleteOldGasStats(tx, indexer.gasStatsLifetime); err != nil { + return errors.Wrap(err, "DeleteOldGasStats") + } + } + return nil + }) +} + func (indexer *Indexer) handleInChain(operations tzkt.OperationMessage) error { return indexer.db.Transaction(func(tx *gorm.DB) error { operations.Hash.Range(func(_, operation interface{}) bool { diff --git a/cmd/mempool/indexer.go b/cmd/mempool/indexer.go index 95ce73c..4fce294 100644 --- a/cmd/mempool/indexer.go +++ b/cmd/mempool/indexer.go @@ -21,21 +21,23 @@ import ( // Indexer - type Indexer struct { - db *gorm.DB - tzkt *tzkt.TzKT - mempool *receiver.Receiver - manager *Manager - branches *BlockQueue - cache *ccache.Cache - delegates *CachedDelegates - state state.State - filters config.Filters - network string - indexName string - chainID string - stop chan struct{} - threadsCount int - hasManager bool + db *gorm.DB + tzkt *tzkt.TzKT + mempool *receiver.Receiver + branches *BlockQueue + cache *ccache.Cache + delegates *CachedDelegates + state state.State + filters config.Filters + network string + indexName string + chainID string + keepInChain uint64 + keepOperations uint64 + gasStatsLifetime uint64 + stop chan struct{} + threadsCount int + hasManager bool wg sync.WaitGroup } @@ -80,15 +82,18 @@ func NewIndexer(network string, indexerCfg config.Indexer, database generalConfi } indexer := &Indexer{ - db: db, - network: network, - chainID: head.ChainID, - indexName: models.MempoolIndexName(network), - filters: indexerCfg.Filters, - tzkt: tzkt.NewTzKT(indexerCfg.DataSource.Tzkt, indexerCfg.Filters.Accounts, indexerCfg.Filters.Kinds), - mempool: memInd, - cache: ccache.New(ccache.Configure().MaxSize(2 ^ 13)), - threadsCount: 1, + db: db, + network: network, + chainID: head.ChainID, + indexName: models.MempoolIndexName(network), + filters: indexerCfg.Filters, + tzkt: tzkt.NewTzKT(indexerCfg.DataSource.Tzkt, indexerCfg.Filters.Accounts, indexerCfg.Filters.Kinds), + mempool: memInd, + cache: ccache.New(ccache.Configure().MaxSize(2 ^ 13)), + threadsCount: 1, + keepInChain: uint64(constants.TimeBetweenBlocks[0]) * settings.KeepInChainBlocks, + keepOperations: uint64(constants.TimeBetweenBlocks[0]) * settings.ExpiredAfter, + gasStatsLifetime: settings.GasStatsLifetime, } indexer.state = state.State{ @@ -103,7 +108,6 @@ func NewIndexer(network string, indexerCfg config.Indexer, database generalConfi break } } - indexer.manager = NewManager(db, settings, uint64(constants.TimeBetweenBlocks[0]), indexer.hasManager, indexerCfg.Filters.Kinds...) indexer.branches = newBlockQueue(expiredAfter, indexer.onPopBlockQueue, indexer.onRollbackBlockQueue) for _, kind := range indexer.filters.Kinds { @@ -146,8 +150,7 @@ func (indexer *Indexer) Start() error { return err } - go indexer.mempool.Start() - indexer.manager.Start() + indexer.mempool.Start() return nil } @@ -184,10 +187,6 @@ func (indexer *Indexer) Close() error { } indexer.wg.Wait() - if err := indexer.manager.Close(); err != nil { - return err - } - if err := indexer.tzkt.Close(); err != nil { return err } diff --git a/cmd/mempool/manager.go b/cmd/mempool/manager.go deleted file mode 100644 index bdc0da9..0000000 --- a/cmd/mempool/manager.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "sync" - "time" - - "github.com/dipdup-net/mempool/cmd/mempool/config" - "github.com/dipdup-net/mempool/cmd/mempool/models" - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" - "gorm.io/gorm" -) - -// Manager - -type Manager struct { - db *gorm.DB - - keepOperations uint64 - keepInChain uint64 - lostAfterSeconds uint64 - blockTime uint64 - gasStatsLifetime uint64 - kinds []string - stop chan struct{} - hasManager bool - - wg sync.WaitGroup -} - -// NewManager - -func NewManager(db *gorm.DB, settings config.Settings, blockTime uint64, hasManager bool, kinds ...string) *Manager { - return &Manager{ - db: db, - keepOperations: settings.KeepOperations, - gasStatsLifetime: settings.GasStatsLifetime, - keepInChain: blockTime * settings.KeepInChainBlocks, - lostAfterSeconds: blockTime * settings.ExpiredAfter, - blockTime: blockTime, - stop: make(chan struct{}, 1), - kinds: kinds, - hasManager: hasManager, - } -} - -// Start - -func (manager *Manager) Start() { - manager.wg.Add(1) - go manager.work() -} - -// Close - -func (manager *Manager) Close() error { - manager.stop <- struct{}{} - manager.wg.Wait() - - close(manager.stop) - return nil -} - -func (manager *Manager) work() { - defer manager.wg.Done() - - blockTicker := time.NewTicker(time.Second * time.Duration(manager.blockTime)) - defer blockTicker.Stop() - - for { - select { - case <-manager.stop: - return - case <-blockTicker.C: - err := manager.db.Transaction(func(tx *gorm.DB) error { - if err := models.DeleteOldOperations(tx, manager.keepInChain, models.StatusInChain, manager.kinds...); err != nil { - return errors.Wrap(err, "DeleteOldOperations in_chain") - } - if err := models.DeleteOldOperations(tx, manager.keepOperations, "", manager.kinds...); err != nil { - return errors.Wrap(err, "DeleteOldOperations") - } - if manager.hasManager { - if err := models.DeleteOldGasStats(tx, manager.gasStatsLifetime); err != nil { - return errors.Wrap(err, "DeleteOldGasStats") - } - } - return nil - }) - - if err != nil { - log.Error(err) - continue - } - - } - } -} diff --git a/cmd/mempool/tzkt/tzkt.go b/cmd/mempool/tzkt/tzkt.go index f646b75..e6d93f3 100644 --- a/cmd/mempool/tzkt/tzkt.go +++ b/cmd/mempool/tzkt/tzkt.go @@ -502,6 +502,8 @@ func (tzkt *TzKT) getTableData(table *tableState, indexerState, headLevel uint64 return getOperations(table, filters, tzkt.api.GetReveals) case api.KindTransaction: return getOperations(table, filters, tzkt.api.GetTransactions) + case api.KindRegisterGlobalConstant: + return getOperations(table, filters, tzkt.api.GetRegisterConstants) default: return errors.Wrap(ErrUnknownOperationKind, table.Table) } diff --git a/go.mod b/go.mod index f9544b0..e874301 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.15 require ( github.com/btcsuite/btcutil v1.0.2 - github.com/dipdup-net/go-lib v0.1.31 + github.com/dipdup-net/go-lib v0.1.32 github.com/json-iterator/go v1.1.12 // indirect github.com/karlseguin/ccache v2.0.3+incompatible github.com/karlseguin/expect v1.0.8 // indirect diff --git a/go.sum b/go.sum index 3160d81..002612f 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/denisenkom/go-mssqldb v0.10.0 h1:QykgLZBorFE95+gO3u9esLd0BmbvpWp0/waNNZfHBM8= github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dipdup-net/go-lib v0.1.31 h1:+m2W74TfC5SYIN13obXvm9CUx6aQ4ZDl3QBheHrgXU4= -github.com/dipdup-net/go-lib v0.1.31/go.mod h1:i3p/qLTxQ43aiHxlg30FTLeysryjPlk5sT8fY3PxgUY= +github.com/dipdup-net/go-lib v0.1.32 h1:BQZK0sECtpL8wz7xiuunrNQaB2OEzJ/jA11O/QC7UmE= +github.com/dipdup-net/go-lib v0.1.32/go.mod h1:i3p/qLTxQ43aiHxlg30FTLeysryjPlk5sT8fY3PxgUY= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=