Skip to content

Commit

Permalink
parallel processBlocksRange() run
Browse files Browse the repository at this point in the history
  • Loading branch information
rus-alex committed Nov 1, 2023
1 parent f89d9bf commit 8221efa
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions gossip/store_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/Fantom-foundation/lachesis-base/kvdb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -153,6 +154,9 @@ func (s *Store) calculateUpgradeHeights() error {
}

func (s *Store) fixTxPositionBlockOffset() (err error) {
const (
parallels = 10
)
receiptsTable, _ := s.dbs.OpenDB("evm/r")
txPositionsTable, _ := s.dbs.OpenDB("evm/x")

Expand Down Expand Up @@ -183,21 +187,21 @@ func (s *Store) fixTxPositionBlockOffset() (err error) {
var (
blocks = new(uint32)
)
processBlocksRange := func() {
processBlocksRange := func(from, to idx.Block) {
defer wg.Done()
const (
parallels = 10
)
wg.Add(parallels)
threads := make([]chan []*types.ReceiptForStorage, parallels)
for i := range threads {
threads[i] = make(chan []*types.ReceiptForStorage, 10)
go processBlockReceipts(threads[i])
}

it := receiptsTable.NewIterator(nil, nil)
it := receiptsTable.NewIterator(nil, from.Bytes())
defer it.Release()
for n := 0; it.Next(); n++ {
if idx.BytesToBlock(it.Key()) > to {
break
}
atomic.AddUint32(blocks, 1)

var receiptsStorage []*types.ReceiptForStorage
Expand All @@ -216,6 +220,7 @@ func (s *Store) fixTxPositionBlockOffset() (err error) {
var (
done = make(chan struct{})
)
defer close(done)
go func() {
var (
start = time.Now()
Expand All @@ -239,11 +244,25 @@ func (s *Store) fixTxPositionBlockOffset() (err error) {
}
}()

// params
firstBlock := func() idx.Block {
it := receiptsTable.NewIterator(nil, nil)
defer it.Release()
if it.Next() {
return idx.BytesToBlock(it.Key())
}
return 0
}()
lastBlock := s.GetBlockState().LastBlock.Idx

// main start
wg.Add(1)
go processBlocksRange()
s.Log.Debug("processBlocksRange", "from", firstBlock, "to", lastBlock)
step := (lastBlock - firstBlock) / parallels
wg.Add(parallels + 1)
for i := idx.Block(0); i <= parallels; i++ {
go processBlocksRange(firstBlock+i*step, firstBlock+(i+1)*step-1)
}
wg.Wait()
close(done)

return
}

0 comments on commit 8221efa

Please sign in to comment.