Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Introduced public keys registry abstraction #12732

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cl/persistence/state/state_accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ func IncrementHistoricalSummariesTable(tx kv.RwTx, state *state.CachingBeaconSta
return nil
}

func ReadPublicKeyByIndexNoCopy(tx kv.Tx, index uint64) ([]byte, error) {
var pks []byte
var err error
key := base_encoding.Encode64ToBytes4(index)
if pks, err = tx.GetOne(kv.ValidatorPublicKeys, key); err != nil {
return nil, err
}
return pks, err
}

func ReadPublicKeyByIndex(tx kv.Tx, index uint64) (libcommon.Bytes48, error) {
var pks []byte
var err error
Expand Down
44 changes: 13 additions & 31 deletions cl/phase1/forkchoice/checkpoint_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/monitor/shuffling_metrics"
"github.com/erigontech/erigon/cl/phase1/core/state/shuffling"
"github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry"

"github.com/Giulio2002/bls"
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/length"

"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
Expand All @@ -44,17 +43,18 @@ type checkpointState struct {
shuffledSet []uint64 // shuffled set of active validators
// validator data
balances []uint64
// These are flattened to save memory and anchor public keys are static and shared.
anchorPublicKeys []byte // flattened base public keys
publicKeys []byte // flattened public keys
actives []byte
slasheds []byte
// bitlists of active indexes and slashed indexes
actives []byte
slasheds []byte

publicKeysRegistry public_keys_registry.PublicKeyRegistry

validatorSetSize int
// fork data
genesisValidatorsRoot libcommon.Hash
fork *cltypes.Fork
activeBalance, epoch uint64 // current active balance and epoch
checkpoint solid.Checkpoint
}

func writeToBitset(bitset []byte, i int, value bool) {
Expand All @@ -73,9 +73,8 @@ func readFromBitset(bitset []byte, i int) bool {
return (bitset[sliceIndex] & (1 << uint(bitIndex))) > 0
}

func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKeys []byte, validatorSet []solid.Validator, randaoMixes solid.HashVectorSSZ,
genesisValidatorsRoot libcommon.Hash, fork *cltypes.Fork, activeBalance, epoch uint64) *checkpointState {
publicKeys := make([]byte, (len(validatorSet)-(len(anchorPublicKeys)/length.Bytes48))*length.Bytes48)
func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, publicKeysRegistry public_keys_registry.PublicKeyRegistry, validatorSet []solid.Validator, randaoMixes solid.HashVectorSSZ,
genesisValidatorsRoot libcommon.Hash, fork *cltypes.Fork, activeBalance, epoch uint64, checkpoint solid.Checkpoint) *checkpointState {
balances := make([]uint64, len(validatorSet))

bitsetSize := (len(validatorSet) + 7) / 8
Expand All @@ -86,11 +85,6 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe
writeToBitset(actives, i, validatorSet[i].Active(epoch))
writeToBitset(slasheds, i, validatorSet[i].Slashed())
}
// Add the post-anchor public keys as surplus
for i := len(anchorPublicKeys) / length.Bytes48; i < len(validatorSet); i++ {
pos := i - len(anchorPublicKeys)/length.Bytes48
copy(publicKeys[pos*length.Bytes48:(pos+1)*length.Bytes48], validatorSet[i].PublicKeyBytes())
}

mixes := solid.NewHashVector(randaoMixesLength)
randaoMixes.CopyTo(mixes)
Expand All @@ -100,16 +94,15 @@ func newCheckpointState(beaconConfig *clparams.BeaconChainConfig, anchorPublicKe
beaconConfig: beaconConfig,
randaoMixes: mixes,
balances: balances,
anchorPublicKeys: anchorPublicKeys,
publicKeys: publicKeys,
genesisValidatorsRoot: genesisValidatorsRoot,
fork: fork,
activeBalance: activeBalance,
slasheds: slasheds,
actives: actives,
validatorSetSize: len(validatorSet),

epoch: epoch,
checkpoint: checkpoint,
epoch: epoch,
publicKeysRegistry: publicKeysRegistry,
}
mixPosition := (epoch + beaconConfig.EpochsPerHistoricalVector - beaconConfig.MinSeedLookahead - 1) %
beaconConfig.EpochsPerHistoricalVector
Expand Down Expand Up @@ -196,17 +189,6 @@ func (c *checkpointState) isValidIndexedAttestation(att *cltypes.IndexedAttestat
return false, errors.New("isValidIndexedAttestation: attesting indices are not sorted or are null")
}

pks := [][]byte{}
inds.Range(func(_ int, v uint64, _ int) bool {
if v < uint64(len(c.anchorPublicKeys))/length.Bytes48 {
pks = append(pks, c.anchorPublicKeys[v*length.Bytes48:(v+1)*length.Bytes48])
} else {
offset := uint64(len(c.anchorPublicKeys) / length.Bytes48)
pks = append(pks, c.publicKeys[(v-offset)*length.Bytes48:(v-offset+1)*length.Bytes48])
}
return true
})

domain, err := c.getDomain(c.beaconConfig.DomainBeaconAttester, att.Data.Target.Epoch)
if err != nil {
return false, fmt.Errorf("unable to get the domain: %v", err)
Expand All @@ -217,7 +199,7 @@ func (c *checkpointState) isValidIndexedAttestation(att *cltypes.IndexedAttestat
return false, fmt.Errorf("unable to get signing root: %v", err)
}

valid, err := bls.VerifyAggregate(att.Signature[:], signingRoot[:], pks)
valid, err := c.publicKeysRegistry.VerifyAggregateSignature(c.checkpoint, inds, signingRoot[:], att.Signature)
if err != nil {
return false, fmt.Errorf("error while validating signature: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cl/phase1/forkchoice/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
"github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph"
"github.com/erigontech/erigon/cl/phase1/forkchoice/public_keys_registry"
"github.com/erigontech/erigon/cl/pool"
"github.com/erigontech/erigon/cl/transition"

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestForkChoiceBasic(t *testing.T) {
pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig)
emitters := beaconevents.NewEventEmitter()
validatorMonitor := monitor.NewValidatorMonitor(false, nil, nil, nil)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor, false)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor, public_keys_registry.NewInMemoryPublicKeysRegistry(), false)
require.NoError(t, err)
// first steps
store.OnTick(0)
Expand Down Expand Up @@ -150,7 +151,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) {
sd := synced_data.NewSyncedDataManager(&clparams.MainnetBeaconConfig, true, 0)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{
Beacon: true,
}, emitters), emitters, sd, nil, nil, false)
}, emitters), emitters, sd, nil, nil, public_keys_registry.NewInMemoryPublicKeysRegistry(), false)
store.OnTick(2000)
require.NoError(t, err)
for _, block := range blocks {
Expand Down
9 changes: 6 additions & 3 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package fork_graph

import (
"bytes"
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/golang/snappy"
"github.com/spf13/afero"

libcommon "github.com/erigontech/erigon-lib/common"
Expand Down Expand Up @@ -125,8 +125,9 @@ type forkGraphDisk struct {
lightClientUpdates sync.Map // period -> lightclientupdate

// reusable buffers
sszBuffer bytes.Buffer
sszSnappyBuffer bytes.Buffer
sszBuffer []byte
sszSnappyWriter *snappy.Writer
sszSnappyReader *snappy.Reader

rcfg beacon_router_configuration.RouterConfiguration
emitter *beaconevents.EventEmitter
Expand Down Expand Up @@ -161,6 +162,8 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r
f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader)

f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true)
// preallocate buffer
f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2)
return f
}

Expand Down
82 changes: 34 additions & 48 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package fork_graph

import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
Expand All @@ -41,50 +40,42 @@ func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string {
func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) {
var file afero.File
file, err = f.fs.Open(getBeaconStateFilename(blockRoot))

if err != nil {
return
}
defer file.Close()

if f.sszSnappyReader == nil {
f.sszSnappyReader = snappy.NewReader(file)
} else {
f.sszSnappyReader.Reset(file)
}
// Read the version
v := []byte{0}
if _, err := file.Read(v); err != nil {
if _, err := f.sszSnappyReader.Read(v); err != nil {
return nil, fmt.Errorf("failed to read hard fork version: %w, root: %x", err, blockRoot)
}
// Read the length
lengthBytes := make([]byte, 8)
var n int
n, err = io.ReadFull(file, lengthBytes)
n, err = io.ReadFull(f.sszSnappyReader, lengthBytes)
if err != nil {
return nil, fmt.Errorf("failed to read length: %w, root: %x", err, blockRoot)
}
if n != 8 {
return nil, fmt.Errorf("failed to read length: %d, want 8, root: %x", n, blockRoot)
}
// Grow the snappy buffer
f.sszSnappyBuffer.Grow(int(binary.BigEndian.Uint64(lengthBytes)))
// Read the snappy buffer
sszSnappyBuffer := f.sszSnappyBuffer.Bytes()
sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)]
n, err = io.ReadFull(file, sszSnappyBuffer)
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}

decLen, err := snappy.DecodedLen(sszSnappyBuffer[:n])
if err != nil {
return nil, fmt.Errorf("failed to get decoded length: %w, root: %x, len: %d", err, blockRoot, n)
}
// Grow the plain ssz buffer
f.sszBuffer.Grow(decLen)
sszBuffer := f.sszBuffer.Bytes()
sszBuffer, err = snappy.Decode(sszBuffer, sszSnappyBuffer[:n])
f.sszBuffer = f.sszBuffer[:binary.BigEndian.Uint64(lengthBytes)]
n, err = io.ReadFull(f.sszSnappyReader, f.sszBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decode snappy buffer: %w, root: %x, len: %d, decLen: %d", err, blockRoot, n, decLen)
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}
f.sszBuffer = f.sszBuffer[:n]

bs = state.New(f.beaconCfg)
if err = bs.DecodeSSZ(sszBuffer, int(v[0])); err != nil {
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, decLen, bs)
if err = bs.DecodeSSZ(f.sszBuffer, int(v[0])); err != nil {
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, len(f.sszBuffer), bs)
}
// decode the cache file
cacheFile, err := f.fs.Open(getBeaconStateCacheFilename(blockRoot))
Expand All @@ -106,47 +97,42 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat
return
}
// Truncate and then grow the buffer to the size of the state.
encodingSizeSSZ := bs.EncodingSizeSSZ()
f.sszBuffer.Grow(encodingSizeSSZ)
f.sszBuffer.Reset()

sszBuffer := f.sszBuffer.Bytes()
sszBuffer, err = bs.EncodeSSZ(sszBuffer)
f.sszBuffer, err = bs.EncodeSSZ(f.sszBuffer[:0])
if err != nil {
return
}
// Grow the snappy buffer
f.sszSnappyBuffer.Grow(snappy.MaxEncodedLen(len(sszBuffer)))
// Compress the ssz buffer
sszSnappyBuffer := f.sszSnappyBuffer.Bytes()
sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)]
sszSnappyBuffer = snappy.Encode(sszSnappyBuffer, sszBuffer)

var dumpedFile afero.File
dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755)
if err != nil {
return
}
defer dumpedFile.Close()

if f.sszSnappyWriter == nil {
f.sszSnappyWriter = snappy.NewBufferedWriter(dumpedFile)
} else {
f.sszSnappyWriter.Reset(dumpedFile)
}

// First write the hard fork version
_, err = dumpedFile.Write([]byte{byte(bs.Version())})
if err != nil {
return
if _, err := f.sszSnappyWriter.Write([]byte{byte(bs.Version())}); err != nil {
return err
}
// Second write the length
length := make([]byte, 8)
binary.BigEndian.PutUint64(length, uint64(len(sszSnappyBuffer)))
_, err = dumpedFile.Write(length)
if err != nil {
return
binary.BigEndian.PutUint64(length, uint64(len(f.sszBuffer)))
if _, err := f.sszSnappyWriter.Write(length); err != nil {
return err
}
// Lastly dump the state
_, err = dumpedFile.Write(sszSnappyBuffer)
if err != nil {
if _, err := f.sszSnappyWriter.Write(f.sszBuffer); err != nil {
return err
}
if err = f.sszSnappyWriter.Flush(); err != nil {
return
}

err = dumpedFile.Sync()
if err != nil {
if err = dumpedFile.Sync(); err != nil {
return
}

Expand Down
Loading
Loading