diff --git a/gossip/store_migration.go b/gossip/store_migration.go index 3d94ea824..5c46b0e88 100644 --- a/gossip/store_migration.go +++ b/gossip/store_migration.go @@ -3,18 +3,11 @@ package gossip import ( "errors" "fmt" - "sync" - "sync/atomic" - "time" "github.com/Fantom-foundation/lachesis-base/hash" - "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/Fantom-foundation/go-opera/gossip/evmstore" "github.com/Fantom-foundation/go-opera/inter" "github.com/Fantom-foundation/go-opera/inter/iblockproc" "github.com/Fantom-foundation/go-opera/utils/migration" @@ -35,9 +28,6 @@ func (s *Store) migrateData() error { } err := s.migrations().Exec(versions, s.flushDBs) - if err != nil { - panic(err) - } return err } @@ -53,8 +43,7 @@ func (s *Store) migrations() *migration.Migration { Next("erase gossip-async db", s.eraseGossipAsyncDB). Next("erase SFC API table", s.eraseSfcApiTable). Next("erase legacy genesis DB", s.eraseGenesisDB). - Next("calculate upgrade heights", s.calculateUpgradeHeights). - Next("EVM TxPosition.BlockOffset fix", s.fixTxPositionBlockOffset) + Next("calculate upgrade heights", s.calculateUpgradeHeights) } func unsupportedMigration() error { @@ -152,117 +141,3 @@ func (s *Store) calculateUpgradeHeights() error { } return nil } - -func (s *Store) fixTxPositionBlockOffset() (err error) { - const ( - parallels = 10 - ) - receiptsTable, _ := s.dbs.OpenDB("evm/r") - txPositionsTable, _ := s.dbs.OpenDB("evm/x") - - // for each block's receipts - var ( - wg sync.WaitGroup - items = new(uint32) - ) - processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) { - defer wg.Done() - pos := new(evmstore.TxPosition) - for rr := range input { - for _, r := range rr { - key := r.TxHash.Bytes() - got := s.rlp.Get(txPositionsTable, key, pos) - if got == nil { - continue - } - pos.BlockOffset = uint32(r.TransactionIndex) - s.rlp.Set(txPositionsTable, key, pos) - - atomic.AddUint32(items, 1) - } - } - } - - // for each block - var ( - blocks = new(uint32) - ) - processBlocksRange := func(from, to idx.Block) { - defer wg.Done() - wg.Add(parallels) - threads := make([]chan []*types.ReceiptForStorage, parallels) - for i := range threads { - threads[i] = make(chan []*types.ReceiptForStorage, 10) - go processBlockReceipts(threads[i]) - } - - it := receiptsTable.NewIterator(nil, from.Bytes()) - defer it.Release() - for n := 0; it.Next(); n++ { - if idx.BytesToBlock(it.Key()) > to { - break - } - atomic.AddUint32(blocks, 1) - - var receiptsStorage []*types.ReceiptForStorage - err := rlp.DecodeBytes(it.Value(), &receiptsStorage) - if err != nil { - s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value())) - } - threads[n%parallels] <- receiptsStorage - } - for i := range threads { - close(threads[i]) - } - } - - // status log - var ( - done = make(chan struct{}) - ) - defer close(done) - go func() { - var ( - start = time.Now() - prevFlushTime = time.Now() - ) - for again := true; again; { - select { - case <-time.After(s.cfg.MaxNonFlushedPeriod / 5): - again = true - case <-done: - again = false - } - s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "blocks", atomic.LoadUint32(blocks), "items", atomic.LoadUint32(items)) - if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod { - prevFlushTime = time.Now() - err = s.flushDBs() - if err != nil { - break - } - } - } - }() - - // params - firstBlock := func() idx.Block { - it := receiptsTable.NewIterator(nil, nil) - defer it.Release() - if it.Next() { - return idx.BytesToBlock(it.Key()) - } - return 0 - }() - lastBlock := s.GetBlockState().LastBlock.Idx - - // main start - s.Log.Debug("processBlocksRange", "from", firstBlock, "to", lastBlock) - step := (lastBlock - firstBlock) / parallels - wg.Add(parallels + 1) - for i := idx.Block(0); i <= parallels; i++ { - go processBlocksRange(firstBlock+i*step, firstBlock+(i+1)*step-1) - } - wg.Wait() - - return -}