Skip to content

Commit

Permalink
Merge branch 'main' of github.com:ledgerwatch/erigon into move-tx-poo…
Browse files Browse the repository at this point in the history
…l-types
  • Loading branch information
taratorio committed Nov 15, 2024
2 parents ed3986f + adfdcbd commit a3f7142
Show file tree
Hide file tree
Showing 30 changed files with 960 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-cd-main-branch-docker-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ run-name: "Commit id ${{ github.sha }}: CI-CD build and deploy docker images bas

env:
APPLICATION: "erigon"
BUILDER_IMAGE: "golang:1.23.1-alpine3.20"
BUILDER_IMAGE: "golang:1.23.3-alpine3.20"
TARGET_BASE_IMAGE: "alpine:3.20.3"
APP_REPO: "erigontech/erigon"
CHECKOUT_REF: "main"
Expand Down
2 changes: 1 addition & 1 deletion cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
return nil
}

if utils.IsOverlappingBitlist(att.AggregationBits.Bytes(), inAtt.AggregationBits.Bytes()) {
if utils.IsOverlappingSSZBitlist(att.AggregationBits.Bytes(), inAtt.AggregationBits.Bytes()) {
// the on bit is already set, so ignore
return ErrIsSuperset
}
Expand Down
6 changes: 6 additions & 0 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func (a *Antiquary) Loop() error {
if err := a.sn.OpenFolder(); err != nil {
return err
}
if a.stateSn != nil {
if err := a.stateSn.OpenFolder(); err != nil {
return err
}
}

defer logInterval.Stop()
if from != a.sn.BlocksAvailable() && a.sn.BlocksAvailable() != 0 {
a.logger.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable())
Expand Down
6 changes: 5 additions & 1 deletion cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,15 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err != nil {
return err
}

log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
if s.snapgen {
if s.stateSn != nil {
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
}

if s.snapgen {

// Keep gnosis out for a bit
if s.currentState.BeaconConfig().ConfigName == "gnosis" {
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func (a *ApiHandler) findBestAttestationsForBlockProduction(
candidateAggregationBits := candidate.AggregationBits.Bytes()
for _, curAtt := range hashToAtts[dataRoot] {
currAggregationBitsBytes := curAtt.AggregationBits.Bytes()
if !utils.IsOverlappingBitlist(currAggregationBitsBytes, candidateAggregationBits) {
if !utils.IsOverlappingSSZBitlist(currAggregationBitsBytes, candidateAggregationBits) {
// merge signatures
candidateSig := candidate.Signature
curSig := curAtt.Signature
Expand Down
9 changes: 6 additions & 3 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package fork_graph

import (
"bytes"
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/golang/snappy"
"github.com/spf13/afero"

libcommon "github.com/erigontech/erigon-lib/common"
Expand Down Expand Up @@ -125,8 +125,9 @@ type forkGraphDisk struct {
lightClientUpdates sync.Map // period -> lightclientupdate

// reusable buffers
sszBuffer bytes.Buffer
sszSnappyBuffer bytes.Buffer
sszBuffer []byte
sszSnappyWriter *snappy.Writer
sszSnappyReader *snappy.Reader

rcfg beacon_router_configuration.RouterConfiguration
emitter *beaconevents.EventEmitter
Expand Down Expand Up @@ -161,6 +162,8 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r
f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader)

f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true)
// preallocate buffer
f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2)
return f
}

Expand Down
82 changes: 34 additions & 48 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package fork_graph

import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
Expand All @@ -41,50 +40,42 @@ func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string {
func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) {
var file afero.File
file, err = f.fs.Open(getBeaconStateFilename(blockRoot))

if err != nil {
return
}
defer file.Close()

if f.sszSnappyReader == nil {
f.sszSnappyReader = snappy.NewReader(file)
} else {
f.sszSnappyReader.Reset(file)
}
// Read the version
v := []byte{0}
if _, err := file.Read(v); err != nil {
if _, err := f.sszSnappyReader.Read(v); err != nil {
return nil, fmt.Errorf("failed to read hard fork version: %w, root: %x", err, blockRoot)
}
// Read the length
lengthBytes := make([]byte, 8)
var n int
n, err = io.ReadFull(file, lengthBytes)
n, err = io.ReadFull(f.sszSnappyReader, lengthBytes)
if err != nil {
return nil, fmt.Errorf("failed to read length: %w, root: %x", err, blockRoot)
}
if n != 8 {
return nil, fmt.Errorf("failed to read length: %d, want 8, root: %x", n, blockRoot)
}
// Grow the snappy buffer
f.sszSnappyBuffer.Grow(int(binary.BigEndian.Uint64(lengthBytes)))
// Read the snappy buffer
sszSnappyBuffer := f.sszSnappyBuffer.Bytes()
sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)]
n, err = io.ReadFull(file, sszSnappyBuffer)
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}

decLen, err := snappy.DecodedLen(sszSnappyBuffer[:n])
if err != nil {
return nil, fmt.Errorf("failed to get decoded length: %w, root: %x, len: %d", err, blockRoot, n)
}
// Grow the plain ssz buffer
f.sszBuffer.Grow(decLen)
sszBuffer := f.sszBuffer.Bytes()
sszBuffer, err = snappy.Decode(sszBuffer, sszSnappyBuffer[:n])
f.sszBuffer = f.sszBuffer[:binary.BigEndian.Uint64(lengthBytes)]
n, err = io.ReadFull(f.sszSnappyReader, f.sszBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decode snappy buffer: %w, root: %x, len: %d, decLen: %d", err, blockRoot, n, decLen)
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}
f.sszBuffer = f.sszBuffer[:n]

bs = state.New(f.beaconCfg)
if err = bs.DecodeSSZ(sszBuffer, int(v[0])); err != nil {
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, decLen, bs)
if err = bs.DecodeSSZ(f.sszBuffer, int(v[0])); err != nil {
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, len(f.sszBuffer), bs)
}
// decode the cache file
cacheFile, err := f.fs.Open(getBeaconStateCacheFilename(blockRoot))
Expand All @@ -106,47 +97,42 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat
return
}
// Truncate and then grow the buffer to the size of the state.
encodingSizeSSZ := bs.EncodingSizeSSZ()
f.sszBuffer.Grow(encodingSizeSSZ)
f.sszBuffer.Reset()

sszBuffer := f.sszBuffer.Bytes()
sszBuffer, err = bs.EncodeSSZ(sszBuffer)
f.sszBuffer, err = bs.EncodeSSZ(f.sszBuffer[:0])
if err != nil {
return
}
// Grow the snappy buffer
f.sszSnappyBuffer.Grow(snappy.MaxEncodedLen(len(sszBuffer)))
// Compress the ssz buffer
sszSnappyBuffer := f.sszSnappyBuffer.Bytes()
sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)]
sszSnappyBuffer = snappy.Encode(sszSnappyBuffer, sszBuffer)

var dumpedFile afero.File
dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755)
if err != nil {
return
}
defer dumpedFile.Close()

if f.sszSnappyWriter == nil {
f.sszSnappyWriter = snappy.NewBufferedWriter(dumpedFile)
} else {
f.sszSnappyWriter.Reset(dumpedFile)
}

// First write the hard fork version
_, err = dumpedFile.Write([]byte{byte(bs.Version())})
if err != nil {
return
if _, err := f.sszSnappyWriter.Write([]byte{byte(bs.Version())}); err != nil {
return err
}
// Second write the length
length := make([]byte, 8)
binary.BigEndian.PutUint64(length, uint64(len(sszSnappyBuffer)))
_, err = dumpedFile.Write(length)
if err != nil {
return
binary.BigEndian.PutUint64(length, uint64(len(f.sszBuffer)))
if _, err := f.sszSnappyWriter.Write(length); err != nil {
return err
}
// Lastly dump the state
_, err = dumpedFile.Write(sszSnappyBuffer)
if err != nil {
if _, err := f.sszSnappyWriter.Write(f.sszBuffer); err != nil {
return err
}
if err = f.sszSnappyWriter.Flush(); err != nil {
return
}

err = dumpedFile.Sync()
if err != nil {
if err = dumpedFile.Sync(); err != nil {
return
}

Expand Down
5 changes: 4 additions & 1 deletion cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
// The background checks above are enough for now.
return g.blobService.ProcessMessage(ctx, data.SubnetId, blobSideCar)
case gossip.IsTopicSyncCommittee(data.Name):
msg := &cltypes.SyncCommitteeMessageWithGossipData{}
msg := &cltypes.SyncCommitteeMessageWithGossipData{
GossipData: copyOfSentinelData(data),
SyncCommitteeMessage: &cltypes.SyncCommitteeMessage{},
}
if err := msg.SyncCommitteeMessage.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
batchSignatureVerificationThreshold = 300
batchSignatureVerificationThreshold = 50
reservedSize = 512
)

Expand Down Expand Up @@ -154,7 +154,6 @@ func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificat
if b.sentinel != nil && v.GossipData != nil {
if _, err := b.sentinel.PublishGossip(b.ctx, v.GossipData); err != nil {
log.Debug("failed to publish gossip", "err", err)
return err
}
}
}
Expand Down
29 changes: 14 additions & 15 deletions cl/phase1/network/services/blob_sidecar_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/Giulio2002/bls"
Expand All @@ -47,8 +46,8 @@ type blobSidecarService struct {
ethClock eth_clock.EthereumClock
emitters *beaconevents.EventEmitter

blobSidecarsScheduledForLaterExecution sync.Map
test bool
// blobSidecarsScheduledForLaterExecution sync.Map
test bool
}

type blobSidecarJob struct {
Expand Down Expand Up @@ -115,7 +114,7 @@ func (b *blobSidecarService) ProcessMessage(ctx context.Context, subnetId *uint6

parentHeader, has := b.forkchoiceStore.GetHeader(msg.SignedBlockHeader.Header.ParentRoot)
if !has {
b.scheduleBlobSidecarForLaterExecution(msg)
//b.scheduleBlobSidecarForLaterExecution(msg)
return ErrIgnore
}
if msg.SignedBlockHeader.Header.Slot <= parentHeader.Slot {
Expand Down Expand Up @@ -195,17 +194,17 @@ func (b *blobSidecarService) verifySidecarsSignature(header *cltypes.SignedBeaco
return nil
}

func (b *blobSidecarService) scheduleBlobSidecarForLaterExecution(blobSidecar *cltypes.BlobSidecar) {
blobSidecarJob := &blobSidecarJob{
blobSidecar: blobSidecar,
creationTime: time.Now(),
}
blobSidecarHash, err := blobSidecar.HashSSZ()
if err != nil {
return
}
b.blobSidecarsScheduledForLaterExecution.Store(blobSidecarHash, blobSidecarJob)
}
// func (b *blobSidecarService) scheduleBlobSidecarForLaterExecution(blobSidecar *cltypes.BlobSidecar) {
// blobSidecarJob := &blobSidecarJob{
// blobSidecar: blobSidecar,
// creationTime: time.Now(),
// }
// blobSidecarHash, err := blobSidecar.HashSSZ()
// if err != nil {
// return
// }
// b.blobSidecarsScheduledForLaterExecution.Store(blobSidecarHash, blobSidecarJob)
// }

// // loop is the main loop of the block service
// func (b *blobSidecarService) loop(ctx context.Context) {
Expand Down
30 changes: 28 additions & 2 deletions cl/utils/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,42 @@ func IsNonStrictSupersetBitlist(a, b []byte) bool {
return true
}

func IsOverlappingBitlist(a, b []byte) bool {
// IsOverlappingSSZBitlist checks if bitlist 'a' and bitlist 'b' have any overlapping bits
// However, it ignores the last bits in the last byte.
func IsOverlappingSSZBitlist(a, b []byte) bool {
length := min(len(a), len(b))
for i := range length {

if a[i]&b[i] != 0 {
return true
if i != length-1 {
return true
}
var foundOverlap bool
// check the overlap bit by bit
for j := 0; j < 8; j++ {
if (a[i]>>j)&(b[i]>>j)&1 == 1 {
if foundOverlap {
return true
}
foundOverlap = true
}
}
}
}
return false

}

// func IsOverlappingBitlist(a, b []byte) bool {
// length := min(len(a), len(b))
// for i := range length {
// if a[i]&b[i] != 0 {
// return true
// }
// }
// return false
// }

func BitsOnCount(b []byte) int {
count := 0
for _, v := range b {
Expand Down
2 changes: 0 additions & 2 deletions cl/validator/committee_subscription/committee_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func NewCommitteeSubscribeManagement(
netConfig *clparams.NetworkConfig,
ethClock eth_clock.EthereumClock,
sentinel sentinel.SentinelClient,
state *state.CachingBeaconState,
aggregationPool aggregation.AggregationPool,
syncedData *synced_data.SyncedDataManager,
) *CommitteeSubscribeMgmt {
Expand All @@ -79,7 +78,6 @@ func NewCommitteeSubscribeManagement(
netConfig: netConfig,
ethClock: ethClock,
sentinel: sentinel,
state: state,
aggregationPool: aggregationPool,
syncedData: syncedData,
validatorSubs: make(map[uint64]*validatorSub),
Expand Down
2 changes: 1 addition & 1 deletion cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi
return err
}
beaconRpc := rpc.NewBeaconRpcP2P(ctx, sentinel, beaconConfig, ethClock)
committeeSub := committee_subscription.NewCommitteeSubscribeManagement(ctx, indexDB, beaconConfig, networkConfig, ethClock, sentinel, state, aggregationPool, syncedDataManager)
committeeSub := committee_subscription.NewCommitteeSubscribeManagement(ctx, indexDB, beaconConfig, networkConfig, ethClock, sentinel, aggregationPool, syncedDataManager)
batchSignatureVerifier := services.NewBatchSignatureVerifier(ctx, sentinel)
// Define gossip services
blockService := services.NewBlockService(ctx, indexDB, forkChoice, syncedDataManager, ethClock, beaconConfig, emitters)
Expand Down
Loading

0 comments on commit a3f7142

Please sign in to comment.