Skip to content

Commit

Permalink
feat: walk object concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
LinZexiao committed Dec 20, 2023
1 parent 6e956a7 commit dc9d2be
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
2 changes: 1 addition & 1 deletion venus-shared/blockstore/splitstore/compose_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (cs *ComposeStore) sync(ctx context.Context, c cid.Cid, b blocks.Block) {
var err error
b, err = cs.secondary.Get(ctx, c)
if err != nil {
log.Warnf("get block(%s) from secondary: %s", c, err)
// it is ok to ignore the err
return
}
}
Expand Down
40 changes: 27 additions & 13 deletions venus-shared/blockstore/splitstore/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/venus/venus-shared/actors/builtin"
"github.com/filecoin-project/venus/venus-shared/blockstore"
"github.com/filecoin-project/venus/venus-shared/types"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -64,6 +67,12 @@ func NewSyncVisitor() Visitor {
}

func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.Cid, v Visitor, depth abi.ChainEpoch) error {
start := time.Now()
defer func() {
log.Debugf("walk chain(%s) depth(%d) took %s", tipsetKey, depth, time.Since(start))
}()
log.Debugf("walk chain(%s) depth(%d)", tipsetKey, depth)

cst := cbor.NewCborStore(store)
var tsk types.TipSetKey
err := cst.Get(ctx, tipsetKey, &tsk)
Expand All @@ -78,8 +87,9 @@ func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.C

deepLimit := b.Height - depth

blockToWalk := make(chan cid.Cid, 8)
objectToWalk := make(chan cid.Cid, 64)
// todo: be careful about the size of channel
blockToWalk := make(chan cid.Cid)
objectToWalk := make(chan cid.Cid)
for _, c := range tsk.Cids() {
blockToWalk <- c
}
Expand All @@ -89,16 +99,20 @@ func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.C
}()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for c := range objectToWalk {
err := walkObject(ctx, store, c, v)
if err != nil {
log.Warnf("walkObject(%s): %s", c, err)

walkObjectWorkerCount := runtime.NumCPU()/4 + 1
wg.Add(walkObjectWorkerCount)
for i := 0; i < walkObjectWorkerCount; i++ {
go func() {
defer wg.Done()
for c := range objectToWalk {
err := walkObject(ctx, store, c, v)
if err != nil {
log.Warnf("walkObject(%s): %s", c, err)
}
}
}
}()
}()
}

for bCid := range blockToWalk {
if !v.Visit(bCid) {
Expand All @@ -111,7 +125,7 @@ func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.C
return err
}

if b.Height%500 == 0 {
if b.Height%builtin.EpochsInDay == 0 {
log.Debugf("walking block(%s, %d)", bCid, b.Height)
}

Expand Down Expand Up @@ -169,7 +183,7 @@ func walkObject(ctx context.Context, store blockstore.Blockstore, c cid.Cid, v V
err := walkObject(ctx, store, c, v)
if err != nil {
// try best to walk more object
log.Warnf("walkObject(%s): %s", c, err)
log.Debugf("walkObject(%s): %s", c, err)
}
}

Expand Down

0 comments on commit dc9d2be

Please sign in to comment.