Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): do not allow to expand checkpointed tipsets #6435

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions app/submodule/chain/chain_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/filecoin-project/venus/pkg/vmsupport"
v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
"github.com/filecoin-project/venus/venus-shared/types"
)

// ChainSubmodule enhances the `Node` with chain capabilities.
Expand All @@ -33,8 +32,7 @@ type ChainSubmodule struct { //nolint
SystemCall vm.SyscallsImpl
CirculatingSupplyCalculator *chain.CirculatingSupplyCalculator

CheckPoint types.TipSetKey
Drand beacon.Schedule
Drand beacon.Schedule

config chainConfig

Expand Down Expand Up @@ -92,7 +90,6 @@ func NewChainSubmodule(ctx context.Context,
Drand: drand,
config: config,
Waiter: waiter,
CheckPoint: chainStore.GetCheckPoint(),
}
err = store.ChainReader.Load(context.TODO())
if err != nil {
Expand Down
30 changes: 20 additions & 10 deletions pkg/chain/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Store struct {
// head is the tipset at the head of the best known chain.
head *types.TipSet

checkPoint types.TipSetKey
checkPoint *types.TipSet
// Protects head and genesisCid.
mu sync.RWMutex

Expand Down Expand Up @@ -143,7 +143,6 @@ func NewStore(chainDs repo.Datastore,
bsstore: bsstore,
headEvents: pubsub.New(64),

checkPoint: types.EmptyTSK,
genesis: genesisCid,
reorgNotifeeCh: make(chan ReorgNotifee),
tsCache: tsCache,
Expand All @@ -156,11 +155,22 @@ func NewStore(chainDs repo.Datastore,

val, err := store.ds.Get(context.TODO(), CheckPoint)
if err != nil {
store.checkPoint = types.NewTipSetKey(genesisCid)
store.checkPoint, err = store.GetTipSet(context.TODO(), types.NewTipSetKey(genesisCid))
if err != nil {
panic(fmt.Errorf("cannot get genesis tipset: %w", err))
}
} else {
_ = store.checkPoint.UnmarshalCBOR(bytes.NewReader(val)) //nolint:staticcheck
var checkPointTSK types.TipSetKey
err := checkPointTSK.UnmarshalCBOR(bytes.NewReader(val))
if err != nil {
panic(fmt.Errorf("cannot unmarshal checkpoint %s: %w", string(val), err))
}
store.checkPoint, err = store.GetTipSet(context.TODO(), checkPointTSK)
if err != nil {
panic(fmt.Errorf("cannot get checkpoint tipset: %w", err))
}
}
log.Infof("check point value: %v", store.checkPoint)
log.Infof("load check point height: %d, key: %v", store.checkPoint.Height(), store.checkPoint.Key())

store.reorgCh = store.reorgWorker(context.TODO())
return store
Expand Down Expand Up @@ -1112,8 +1122,8 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error {
return err
}

store.mu.RLock()
defer store.mu.RUnlock()
store.mu.Lock()
defer store.mu.Unlock()

finality := store.head.Height() - policy.ChainFinality
targetChain, currentChain := ts, store.head
Expand Down Expand Up @@ -1167,7 +1177,7 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error {
if err := store.ds.Put(ctx, CheckPoint, buf.Bytes()); err != nil {
return fmt.Errorf("checkpoint failed: failed to record checkpoint in the datastore: %w", err)
}
store.checkPoint = ts.Key()
store.checkPoint = ts

return nil
}
Expand All @@ -1187,7 +1197,7 @@ func (store *Store) IsAncestorOf(ctx context.Context, a, b *types.TipSet) (bool,
}

// GetCheckPoint get the check point from store or disk.
func (store *Store) GetCheckPoint() types.TipSetKey {
func (store *Store) GetCheckPoint() *types.TipSet {
store.mu.RLock()
defer store.mu.RUnlock()

Expand Down Expand Up @@ -1722,7 +1732,7 @@ func (store *Store) exceedsForkLength(ctx context.Context, synced, external *typ
}

// Now check to see if we've walked back to the checkpoint.
if synced.Key().Equals(store.checkPoint) {
if synced.Key().Equals(store.checkPoint.Key()) {
return true, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/chain/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (cbor *CborBlockStore) PutBlocks(ctx context.Context, blocks []*types.Block
func newChainStore(r repo.Repo, genTS *types.TipSet) *CborBlockStore {
tempBlock := r.Datastore()
cborStore := cbor.NewCborStore(tempBlock)
blkBytes, _ := genTS.Blocks()[0].ToStorageBlock()
_ = tempBlock.Put(context.Background(), blkBytes)
return &CborBlockStore{
Store: chain.NewStore(r.ChainDatastore(), tempBlock, genTS.At(0).Cid(), chainselector.Weight),
cborStore: cborStore,
Expand Down
110 changes: 69 additions & 41 deletions pkg/chainsync/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ErrNewChainTooLong = errors.New("input chain forked from best chain past finality limit")
// ErrUnexpectedStoreState indicates that the syncer's chain bsstore is violating expected invariants.
ErrUnexpectedStoreState = errors.New("the chain bsstore is in an unexpected state")
ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block")

logSyncer = logging.Logger("chainsync.syncer")
)
Expand Down Expand Up @@ -137,8 +138,7 @@ type Syncer struct {

clock clock.Clock

bsstore blockstoreutil.Blockstore
checkPoint types.TipSetKey
bsstore blockstoreutil.Blockstore

fork fork.IFork

Expand Down Expand Up @@ -199,45 +199,41 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next *types.TipSet) e
stopwatch := syncOneTimer.Start()
defer stopwatch(ctx)

var err error

if !parent.Key().Equals(syncer.checkPoint) {
var wg errgroup.Group
for i := 0; i < next.Len(); i++ {
blk := next.At(i)
wg.Go(func() error {
// Fetch the URL.
err := syncer.blockValidator.ValidateFullBlock(ctx, blk)
if err == nil {
if err := syncer.chainStore.AddToTipSetTracker(ctx, blk); err != nil {
return fmt.Errorf("failed to add validated header to tipset tracker: %w", err)
}
var wg errgroup.Group
for i := 0; i < next.Len(); i++ {
blk := next.At(i)
wg.Go(func() error {
// Fetch the URL.
err := syncer.blockValidator.ValidateFullBlock(ctx, blk)
if err == nil {
if err := syncer.chainStore.AddToTipSetTracker(ctx, blk); err != nil {
return fmt.Errorf("failed to add validated header to tipset tracker: %w", err)
}
return err
})
}
err = wg.Wait()
if err != nil {
var rootNotMatch bool // nolint

if merr, isok := err.(*multierror.Error); isok {
for _, e := range merr.Errors {
if isRootNotMatch(e) {
rootNotMatch = true
break
}
}
} else {
rootNotMatch = isRootNotMatch(err) // nolint
}
return err
})
}
err := wg.Wait()
if err != nil {
var rootNotMatch bool // nolint

if rootNotMatch { // nolint
// todo: should here rollback, and re-compute?
_ = syncer.stmgr.Rollback(ctx, parent, next)
if merr, isok := err.(*multierror.Error); isok {
for _, e := range merr.Errors {
if isRootNotMatch(e) {
rootNotMatch = true
break
}
}
} else {
rootNotMatch = isRootNotMatch(err) // nolint
}

return fmt.Errorf("validate mining failed %w", err)
if rootNotMatch { // nolint
// todo: should here rollback, and re-compute?
_ = syncer.stmgr.Rollback(ctx, parent, next)
}

return fmt.Errorf("validate mining failed %w", err)
}

syncer.chainStore.PersistTipSetKey(ctx, next.Key())
Expand Down Expand Up @@ -297,8 +293,25 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, target *syncTypes.Tar
return errors.New("do not sync to a target has synced before")
}

if target.Head.Height() == head.Height() {
// check if maybeHead is fully contained in headTipSet
// meaning we already synced all the blocks that are a part of maybeHead
// if that is the case, there is nothing for us to do
// we need to exit out early, otherwise checkpoint-fork logic might wrongly reject it
fullyContained := true
for _, c := range target.Head.Cids() {
if !head.Contains(c) {
fullyContained = false
break
}
}
if fullyContained {
return nil
}
}

syncer.exchangeClient.AddPeer(target.Sender)
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head)
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, false)
if err != nil {
return errors.Wrapf(err, "failure fetching or validating headers")
}
Expand Down Expand Up @@ -346,7 +359,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target
errProcessChan <- processErr
return
}
if !parent.Key().Equals(syncer.checkPoint) {
if !parent.Key().Equals(syncer.chainStore.GetCheckPoint().Key()) {
logSyncer.Debugf("set chain head, height:%d, blocks:%d", parent.Height(), parent.Len())
if err := syncer.chainStore.RefreshHeaviestTipSet(ctx, parent.Height()); err != nil {
errProcessChan <- err
Expand Down Expand Up @@ -374,7 +387,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target
// if local db not exist, get block from network(libp2p),
// if there is a fork, get the common root tipset of knowntip and targettip, and return the block data from root tipset to targettip
// local(···->A->B) + incoming(C->D->E) => ···->A->B->C->D->E
func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet) ([]*types.TipSet, error) {
func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) {
chainTipsets := []*types.TipSet{targetTip}
flushDB := func(saveTips []*types.TipSet) error {
bs := blockstoreutil.NewTemporary()
Expand Down Expand Up @@ -448,6 +461,13 @@ loop:
if err != nil {
return nil, fmt.Errorf("failed to load next local tipset: %w", err)
}

if !ignoreCheckpoint {
if chkpt := syncer.chainStore.GetCheckPoint(); chkpt != nil && base.Height() <= chkpt.Height() {
return nil, fmt.Errorf("merge point affecting the checkpoing: %w", ErrForkCheckpoint)
}
}

if base.IsChildOf(knownParent) {
// common case: receiving a block thats potentially part of the same tipset as our best block
chain.Reverse(chainTipsets)
Expand All @@ -456,7 +476,7 @@ loop:

logSyncer.Warnf("(fork detected) synced header chain, base: %v(%d), knownTip: %v(%d)", base.Key(), base.Height(),
knownTip.Key(), knownTip.Height())
fork, err := syncer.syncFork(ctx, base, knownTip)
fork, err := syncer.syncFork(ctx, base, knownTip, ignoreCheckpoint)
if err != nil {
if errors.Is(err, ErrForkTooLong) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
Expand Down Expand Up @@ -486,7 +506,15 @@ loop:
// D->E-F(targetTip)
// A => D->E>F
// B-C(knownTip)
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) {
var chkpt *types.TipSet
if !ignoreCheckpoint {
chkpt = syncer.chainStore.GetCheckPoint()
if known.Equals(chkpt) {
return nil, ErrForkCheckpoint
}
}

incomingParentsTsk := incoming.Parents()
commonParent := false
for _, incomingParent := range incomingParentsTsk.Cids() {
Expand Down Expand Up @@ -701,7 +729,7 @@ func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) e
if anc, err := syncer.chainStore.IsAncestorOf(ctx, ts, head); err != nil {
return fmt.Errorf("failed to walk the chain when checkpointing: %w", err)
} else if !anc {
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head)
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, true)
if err != nil {
return errors.Wrapf(err, "failure fetching or validating headers")
}
Expand Down
Loading