Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Reuse same state for reorg + alloc reduction in forkchoice #12742

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cl/cltypes/solid/hash_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (h *hashVector) DecodeSSZ(buf []byte, version int) error {
if len(buf) < h.Length()*length.Hash {
return ssz.ErrBadDynamicLength
}
h.u.MerkleTree = nil
copy(h.u.u, buf)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions cl/cltypes/solid/validator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (v *ValidatorSet) DecodeSSZ(buf []byte, _ int) error {
}
v.expandBuffer(len(buf) / validatorSize)
copy(v.buffer, buf)
v.MerkleTree = nil
v.l = len(buf) / validatorSize
v.phase0Data = make([]Phase0Data, v.l)
v.attesterBits = make([]byte, v.l)
Expand Down
1 change: 1 addition & 0 deletions cl/phase1/core/state/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (b *CachingBeaconState) InitBeaconState() error {
if b.Version() >= clparams.Phase0Version {
return b._initializeValidatorsPhase0()
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion cl/phase1/core/state/raw/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ func (b *BeaconState) getSchema() []interface{} {

func (b *BeaconState) DecodeSSZ(buf []byte, version int) error {
b.version = clparams.StateVersion(version)
if len(buf) < b.EncodingSizeSSZ() {
if len(buf) < int(b.baseOffsetSSZ()) {
return fmt.Errorf("[BeaconState] err: %s", ssz.ErrLowBufferSize)
}
if version >= int(clparams.BellatrixVersion) {
b.latestExecutionPayloadHeader = &cltypes.Eth1Header{}
}
if err := ssz2.UnmarshalSSZ(buf, version, b.getSchema()...); err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions cl/phase1/core/state/raw/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ func New(cfg *clparams.BeaconChainConfig) *BeaconState {
}

func (b *BeaconState) init() error {
if b.touchedLeaves == nil {
b.touchedLeaves = make([]atomic.Uint32, StateLeafSize)
}
b.touchedLeaves = make([]atomic.Uint32, StateLeafSize)
return nil
}

Expand Down
34 changes: 19 additions & 15 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,8 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock,
f.badBlocks.Store(libcommon.Hash(blockRoot), struct{}{})
return nil, BelowAnchor, nil
}
// Check if block being process right now was marked as invalid.
if _, ok := f.badBlocks.Load(libcommon.Hash(blockRoot)); ok {
log.Debug("block has invalid parent", "slot", block.Slot, "hash", libcommon.Hash(blockRoot))
return nil, InvalidBlock, nil
}

newState, err := f.GetState(block.ParentRoot, false)
newState, err := f.GetState(block.ParentRoot, f.currentState, true)
if err != nil {
return nil, LogisticError, fmt.Errorf("AddChainSegment: %w, parentRoot; %x", err, block.ParentRoot)
}
Expand Down Expand Up @@ -324,8 +319,8 @@ func (f *forkGraphDisk) getBlock(blockRoot libcommon.Hash) (*cltypes.SignedBeaco
return obj.(*cltypes.SignedBeaconBlock), true
}

func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error) {
if f.currentState != nil && !alwaysCopy {
func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, out *state.CachingBeaconState, fallBackTopState bool) (*state.CachingBeaconState, error) {
if fallBackTopState && f.currentState != nil {
currentStateBlockRoot, err := f.currentState.BlockRoot()
if err != nil {
return nil, err
Expand All @@ -334,6 +329,15 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st
return f.currentState, nil
}
}
if out != nil {
currentStateBlockRoot, err := out.BlockRoot()
if err != nil {
return nil, err
}
if currentStateBlockRoot == blockRoot {
return out, nil
}
}

// collect all blocks beetwen greatest extending node path and block.
blocksInTheWay := []*cltypes.SignedBeaconBlock{}
Expand All @@ -348,7 +352,7 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st
// check if it is in the header
bHeader, ok := f.GetHeader(currentIteratorRoot)
if ok && bHeader.Slot%dumpSlotFrequency == 0 {
copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot)
copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot, out)
if err != nil {
log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot, "err", err)
copyReferencedState = nil
Expand All @@ -359,7 +363,7 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st
return nil, nil
}
if block.Block.Slot%dumpSlotFrequency == 0 {
copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot)
copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot, out)
if err != nil {
log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot, "err", err)
}
Expand All @@ -368,13 +372,13 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st
}
}
blocksInTheWay = append(blocksInTheWay, block)

currentIteratorRoot = block.Block.ParentRoot
}

// Traverse the blocks from top to bottom.
for i := len(blocksInTheWay) - 1; i >= 0; i-- {
if err := transition.TransitionState(copyReferencedState, blocksInTheWay[i], nil, false); err != nil {
return nil, err
return nil, fmt.Errorf("GetState: %w, blockRoot; %x", err, blockRoot)
}
}
return copyReferencedState, nil
Expand Down Expand Up @@ -489,7 +493,7 @@ func (f *forkGraphDisk) GetLightClientUpdate(period uint64) (*cltypes.LightClien
}

func (f *forkGraphDisk) GetBalances(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) {
st, err := f.GetState(blockRoot, true)
st, err := f.GetState(blockRoot, nil, false)
if err != nil {
return nil, err
}
Expand All @@ -500,7 +504,7 @@ func (f *forkGraphDisk) GetBalances(blockRoot libcommon.Hash) (solid.Uint64ListS
}

func (f *forkGraphDisk) GetInactivitiesScores(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) {
st, err := f.GetState(blockRoot, true)
st, err := f.GetState(blockRoot, nil, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -535,7 +539,7 @@ func (f *forkGraphDisk) GetCurrentParticipationIndicies(blockRoot libcommon.Hash
}

func (f *forkGraphDisk) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) {
st, err := f.GetState(blockRoot, true)
st, err := f.GetState(blockRoot, nil, false)
if err != nil {
return nil, err
}
Expand Down
10 changes: 7 additions & 3 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string {
return fmt.Sprintf("%x.cache", blockRoot)
}

func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) {
func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash, out *state.CachingBeaconState) (bs *state.CachingBeaconState, err error) {
var file afero.File
file, err = f.fs.Open(getBeaconStateFilename(blockRoot))
if err != nil {
Expand Down Expand Up @@ -72,9 +72,13 @@ func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *s
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}
f.sszBuffer = f.sszBuffer[:n]

bs = state.New(f.beaconCfg)
if out == nil {
bs = state.New(f.beaconCfg)
} else {
bs = out
}
if err = bs.DecodeSSZ(f.sszBuffer, int(v[0])); err != nil {
fmt.Println(err)
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, len(f.sszBuffer), bs)
}
// decode the cache file
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/forkchoice/fork_graph/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
type ForkGraph interface {
AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, fullValidation bool) (*state.CachingBeaconState, ChainSegmentInsertionResult, error)
GetHeader(blockRoot libcommon.Hash) (*cltypes.BeaconBlockHeader, bool)
GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, error)
GetState(blockRoot libcommon.Hash, out *state.CachingBeaconState, fallbackTopState bool) (*state.CachingBeaconState, error)
GetCurrentJustifiedCheckpoint(blockRoot libcommon.Hash) (solid.Checkpoint, bool)
GetFinalizedCheckpoint(blockRoot libcommon.Hash) (solid.Checkpoint, bool)
GetSyncCommittees(period uint64) (*solid.SyncCommittee, *solid.SyncCommittee, bool)
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/forkchoice/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (f *ForkChoiceStore) GetStateAtBlockRoot(blockRoot libcommon.Hash, alwaysCo
f.mu.RLock()
defer f.mu.RUnlock()
}
return f.forkGraph.GetState(blockRoot, alwaysCopy)
return f.forkGraph.GetState(blockRoot, nil, !alwaysCopy)
}

func (f *ForkChoiceStore) PreverifiedValidator(blockRoot libcommon.Hash) uint64 {
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/forkchoice/on_attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (f *ForkChoiceStore) OnAttesterSlashing(attesterSlashing *cltypes.AttesterS
defer f.mu.Unlock()

if f.syncedDataManager.Syncing() {
s, err := f.forkGraph.GetState(f.justifiedCheckpoint.Load().(solid.Checkpoint).Root, false)
s, err := f.forkGraph.GetState(f.justifiedCheckpoint.Load().(solid.Checkpoint).Root, nil, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/forkchoice/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (f *ForkChoiceStore) getCheckpointState(checkpoint solid.Checkpoint) (*chec
}

// If it is not in cache compute it and then put in cache.
baseState, err := f.forkGraph.GetState(checkpoint.Root, true)
baseState, err := f.forkGraph.GetState(checkpoint.Root, nil, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
committeeIndex := aggregateAndProof.SignedAggregateAndProof.Message.Aggregate.Data.CommitteeIndex

if aggregateData.Slot > a.syncedDataManager.HeadSlot() {
a.scheduleAggregateForLaterProcessing(aggregateAndProof)
//a.scheduleAggregateForLaterProcessing(aggregateAndProof)
return ErrIgnore
}

Expand Down
4 changes: 3 additions & 1 deletion cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificat

// we could locate failing signature with binary search but for now let's choose simplicity over optimisation.
func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerificationData []*AggregateVerificationData) {
alreadyBanned := false
for _, v := range aggregateVerificationData {
valid, err := blsVerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks)
if err != nil {
Expand All @@ -173,12 +174,13 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification
}

if !valid {
if v.GossipData == nil {
if v.GossipData == nil || alreadyBanned {
continue
}
log.Debug("[BatchVerifier] received invalid signature on the gossip", "topic", v.GossipData.Name)
if b.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil {
b.sentinel.BanPeer(b.ctx, v.GossipData.Peer)
alreadyBanned = true
}
continue
}
Expand Down
31 changes: 19 additions & 12 deletions cl/phase1/network/services/sync_committee_messages_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type seenSyncCommitteeMessage struct {
}

type syncCommitteeMessagesService struct {
seenSyncCommitteeMessages map[seenSyncCommitteeMessage]struct{}
seenSyncCommitteeMessages sync.Map
syncedDataManager *synced_data.SyncedDataManager
beaconChainCfg *clparams.BeaconChainConfig
syncContributionPool sync_contribution_pool.SyncContributionPool
Expand All @@ -60,13 +60,12 @@ func NewSyncCommitteeMessagesService(
test bool,
) SyncCommitteeMessagesService {
return &syncCommitteeMessagesService{
seenSyncCommitteeMessages: make(map[seenSyncCommitteeMessage]struct{}),
ethClock: ethClock,
syncedDataManager: syncedDataManager,
beaconChainCfg: beaconChainCfg,
syncContributionPool: syncContributionPool,
batchSignatureVerifier: batchSignatureVerifier,
test: test,
ethClock: ethClock,
syncedDataManager: syncedDataManager,
beaconChainCfg: beaconChainCfg,
syncContributionPool: syncContributionPool,
batchSignatureVerifier: batchSignatureVerifier,
test: test,
}
}

Expand Down Expand Up @@ -96,7 +95,8 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne
return fmt.Errorf("validator is not into any subnet %d", *subnet)
}
// [IGNORE] There has been no other valid sync committee message for the declared slot for the validator referenced by sync_committee_message.validator_index.
if _, ok := s.seenSyncCommitteeMessages[seenSyncCommitteeMessageIdentifier]; ok {

if _, ok := s.seenSyncCommitteeMessages.Load(seenSyncCommitteeMessageIdentifier); ok {
return ErrIgnore
}
// [REJECT] The signature is valid for the message beacon_block_root for the validator referenced by validator_index
Expand All @@ -110,7 +110,7 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne
Pks: [][]byte{pubKey},
GossipData: msg.GossipData,
F: func() {
s.seenSyncCommitteeMessages[seenSyncCommitteeMessageIdentifier] = struct{}{}
s.seenSyncCommitteeMessages.Store(seenSyncCommitteeMessageIdentifier, struct{}{})
s.cleanupOldSyncCommitteeMessages() // cleanup old messages
// Aggregate the message
s.syncContributionPool.AddSyncCommitteeMessage(headState, *subnet, msg.SyncCommitteeMessage)
Expand All @@ -135,10 +135,17 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne
// cleanupOldSyncCommitteeMessages removes old sync committee messages from the cache
func (s *syncCommitteeMessagesService) cleanupOldSyncCommitteeMessages() {
headSlot := s.syncedDataManager.HeadSlot()
for k := range s.seenSyncCommitteeMessages {

entriesToRemove := []seenSyncCommitteeMessage{}
s.seenSyncCommitteeMessages.Range(func(key, value interface{}) bool {
k := key.(seenSyncCommitteeMessage)
if headSlot > k.slot+1 {
delete(s.seenSyncCommitteeMessages, k)
entriesToRemove = append(entriesToRemove, k)
}
return true
})
for _, k := range entriesToRemove {
s.seenSyncCommitteeMessages.Delete(k)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cl/spectest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ clean:
rm -rf tests

mainnet:
CGO_CFLAGS=-D__BLST_PORTABLE__ go test -tags=spectest -run=/mainnet/altair/ -v --timeout 30m
CGO_CFLAGS=-D__BLST_PORTABLE__ go test -tags=spectest -run=/mainnet/ -v --timeout 30m
Loading