From dc9d2be4f106cb3f47660855826ef91f24631226 Mon Sep 17 00:00:00 2001 From: tanlang Date: Tue, 28 Nov 2023 11:36:31 +0800 Subject: [PATCH] feat: walk object concurrently --- .../blockstore/splitstore/compose_store.go | 2 +- venus-shared/blockstore/splitstore/walk.go | 40 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/venus-shared/blockstore/splitstore/compose_store.go b/venus-shared/blockstore/splitstore/compose_store.go index e2d9adf8e8..73bbe54918 100644 --- a/venus-shared/blockstore/splitstore/compose_store.go +++ b/venus-shared/blockstore/splitstore/compose_store.go @@ -198,7 +198,7 @@ func (cs *ComposeStore) sync(ctx context.Context, c cid.Cid, b blocks.Block) { var err error b, err = cs.secondary.Get(ctx, c) if err != nil { - log.Warnf("get block(%s) from secondary: %s", c, err) + // it is ok to ignore the err return } } diff --git a/venus-shared/blockstore/splitstore/walk.go b/venus-shared/blockstore/splitstore/walk.go index 570dfd5323..c366678ced 100644 --- a/venus-shared/blockstore/splitstore/walk.go +++ b/venus-shared/blockstore/splitstore/walk.go @@ -4,9 +4,12 @@ import ( "bytes" "context" "fmt" + "runtime" "sync" + "time" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/actors/builtin" "github.com/filecoin-project/venus/venus-shared/blockstore" "github.com/filecoin-project/venus/venus-shared/types" cid "github.com/ipfs/go-cid" @@ -64,6 +67,12 @@ func NewSyncVisitor() Visitor { } func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.Cid, v Visitor, depth abi.ChainEpoch) error { + start := time.Now() + defer func() { + log.Debugf("walk chain(%s) depth(%d) took %s", tipsetKey, depth, time.Since(start)) + }() + log.Debugf("walk chain(%s) depth(%d)", tipsetKey, depth) + cst := cbor.NewCborStore(store) var tsk types.TipSetKey err := cst.Get(ctx, tipsetKey, &tsk) @@ -78,8 +87,9 @@ func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.C deepLimit := b.Height - depth - blockToWalk := make(chan cid.Cid, 8) - objectToWalk := make(chan cid.Cid, 64) + // todo: be careful about the size of channel + blockToWalk := make(chan cid.Cid) + objectToWalk := make(chan cid.Cid) for _, c := range tsk.Cids() { blockToWalk <- c } @@ -89,16 +99,20 @@ func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.C }() wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - for c := range objectToWalk { - err := walkObject(ctx, store, c, v) - if err != nil { - log.Warnf("walkObject(%s): %s", c, err) + + walkObjectWorkerCount := runtime.NumCPU()/4 + 1 + wg.Add(walkObjectWorkerCount) + for i := 0; i < walkObjectWorkerCount; i++ { + go func() { + defer wg.Done() + for c := range objectToWalk { + err := walkObject(ctx, store, c, v) + if err != nil { + log.Warnf("walkObject(%s): %s", c, err) + } } - } - }() + }() + } for bCid := range blockToWalk { if !v.Visit(bCid) { @@ -111,7 +125,7 @@ func WalkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.C return err } - if b.Height%500 == 0 { + if b.Height%builtin.EpochsInDay == 0 { log.Debugf("walking block(%s, %d)", bCid, b.Height) } @@ -169,7 +183,7 @@ func walkObject(ctx context.Context, store blockstore.Blockstore, c cid.Cid, v V err := walkObject(ctx, store, c, v) if err != nil { // try best to walk more object - log.Warnf("walkObject(%s): %s", c, err) + log.Debugf("walkObject(%s): %s", c, err) } }