Skip to content

Commit

Permalink
Node v2 StoreChunks and GetChunks endpoints (#893)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 15, 2024
1 parent 23c9718 commit 5c260a4
Show file tree
Hide file tree
Showing 15 changed files with 924 additions and 320 deletions.
31 changes: 31 additions & 0 deletions core/mock/v2/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package v2

import (
"context"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

// MockShardValidator is a mock implementation of ShardValidator
type MockShardValidator struct {
mock.Mock
}

var _ corev2.ShardValidator = (*MockShardValidator)(nil)

func NewMockShardValidator() *MockShardValidator {
return &MockShardValidator{}
}

func (v *MockShardValidator) ValidateBatchHeader(ctx context.Context, header *corev2.BatchHeader, blobCerts []*corev2.BlobCertificate) error {
args := v.Called()
return args.Error(0)
}

func (v *MockShardValidator) ValidateBlobs(ctx context.Context, blobs []*corev2.BlobShard, pool common.WorkerPool, state *core.OperatorState) error {
args := v.Called()
return args.Error(0)
}
2 changes: 1 addition & 1 deletion core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func checkBatchByUniversalVerifier(

for id := range state.IndexedOperators {

val := corev2.NewShardValidator(v, cst, id)
val := corev2.NewShardValidator(v, id)

blobs := packagedBlobs[id]

Expand Down
20 changes: 20 additions & 0 deletions core/v2/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -292,6 +294,24 @@ func DeserializeBatchHeader(data []byte) (*BatchHeader, error) {
return &h, nil
}

func BuildMerkleTree(certs []*BlobCertificate) (*merkletree.MerkleTree, error) {
leafs := make([][]byte, len(certs))
for i, cert := range certs {
leaf, err := cert.Hash()
if err != nil {
return nil, fmt.Errorf("failed to compute blob header hash: %w", err)
}
leafs[i] = leaf[:]
}

tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New()))
if err != nil {
return nil, err
}

return tree, nil
}

func encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
47 changes: 37 additions & 10 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -15,27 +16,32 @@ var (
ErrBlobQuorumSkip = errors.New("blob skipped for a quorum before verification")
)

type ShardValidator interface {
ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error
ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error
}

type BlobShard struct {
*BlobCertificate
Bundles core.Bundles
}

// shardValidator implements the validation logic that a DA node should apply to its received data
type ShardValidator struct {
type shardValidator struct {
verifier encoding.Verifier
chainState core.ChainState
operatorID core.OperatorID
}

func NewShardValidator(v encoding.Verifier, cst core.ChainState, operatorID core.OperatorID) *ShardValidator {
return &ShardValidator{
var _ ShardValidator = (*shardValidator)(nil)

func NewShardValidator(v encoding.Verifier, operatorID core.OperatorID) *shardValidator {
return &shardValidator{
verifier: v,
chainState: cst,
operatorID: operatorID,
}
}

func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) {
func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) {

// Check if the operator is a member of the quorum
if _, ok := operatorState.Operators[quorum]; !ok {
Expand Down Expand Up @@ -72,7 +78,28 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
return chunks, &assignment, nil
}

func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error {
func (v *shardValidator) ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error {
if header == nil {
return fmt.Errorf("batch header is nil")
}

if len(blobCerts) == 0 {
return fmt.Errorf("no blob certificates")
}

tree, err := BuildMerkleTree(blobCerts)
if err != nil {
return fmt.Errorf("failed to build merkle tree: %v", err)
}

if !bytes.Equal(tree.Root(), header.BatchRoot[:]) {
return fmt.Errorf("batch root does not match")
}

return nil
}

func (v *shardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error {
var err error
subBatchMap := make(map[encoding.EncodingParams]*encoding.SubBatch)
blobCommitmentList := make([]encoding.BlobCommitments, len(blobs))
Expand Down Expand Up @@ -152,7 +179,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
for _, blobCommitments := range blobCommitmentList {
blobCommitments := blobCommitments
pool.Submit(func() {
v.VerifyBlobLengthWorker(blobCommitments, out)
v.verifyBlobLengthWorker(blobCommitments, out)
})
}
// check if commitments are equivalent
Expand All @@ -171,7 +198,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
return nil
}

func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) {
func (v *shardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) {

err := v.verifier.UniversalVerifySubBatch(params, subBatch.Samples, subBatch.NumBlobs)
if err != nil {
Expand All @@ -182,7 +209,7 @@ func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, s
out <- nil
}

func (v *ShardValidator) VerifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) {
func (v *shardValidator) verifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) {
err := v.verifier.VerifyBlobLength(blobCommitments)
if err != nil {
out <- err
Expand Down
22 changes: 1 addition & 21 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
)

var errNoBlobsToDispatch = errors.New("no blobs to dispatch")
Expand Down Expand Up @@ -292,7 +290,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
ReferenceBlockNumber: referenceBlockNumber,
}

tree, err := BuildMerkleTree(certs)
tree, err := corev2.BuildMerkleTree(certs)
if err != nil {
return nil, fmt.Errorf("failed to build merkle tree: %w", err)
}
Expand Down Expand Up @@ -397,21 +395,3 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKe
}
return nil
}

func BuildMerkleTree(certs []*corev2.BlobCertificate) (*merkletree.MerkleTree, error) {
leafs := make([][]byte, len(certs))
for i, cert := range certs {
leaf, err := cert.Hash()
if err != nil {
return nil, fmt.Errorf("failed to compute blob header hash: %w", err)
}
leafs[i] = leaf[:]
}

tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New()))
if err != nil {
return nil, err
}

return tree, nil
}
4 changes: 2 additions & 2 deletions disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDispatcherHandleBatch(t *testing.T) {
ctx := context.Background()

// Get batch header hash to mock signatures
merkleTree, err := controller.BuildMerkleTree(objs.blobCerts)
merkleTree, err := corev2.BuildMerkleTree(objs.blobCerts)
require.NoError(t, err)
require.NotNil(t, merkleTree)
require.NotNil(t, merkleTree.Root())
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestDispatcherBuildMerkleTree(t *testing.T) {
RelayKeys: []corev2.RelayKey{0, 1, 2},
},
}
merkleTree, err := controller.BuildMerkleTree(certs)
merkleTree, err := corev2.BuildMerkleTree(certs)
require.NoError(t, err)
require.NotNil(t, merkleTree)
require.NotNil(t, merkleTree.Root())
Expand Down
101 changes: 100 additions & 1 deletion node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package grpc

import (
"context"
"encoding/hex"
"fmt"
"runtime"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/kvstore"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/shirou/gopsutil/mem"
Expand Down Expand Up @@ -53,5 +58,99 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
return &pb.StoreChunksReply{}, api.NewErrorUnimplemented()
batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
}

batchHeaderHash, err := batch.BatchHeader.Hash()
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("invalid batch header: %v", err))
}

operatorState, err := s.node.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), s.node.Config.ID)
if err != nil {
return nil, err
}

blobShards, rawBundles, err := s.node.DownloadBundles(ctx, batch, operatorState)
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to download batch: %v", err))
}

type storeResult struct {
keys []kvstore.Key
err error
}
storeChan := make(chan storeResult)
go func() {
keys, err := s.node.StoreV2.StoreBatch(batch, rawBundles)
if err != nil {
storeChan <- storeResult{
keys: nil,
err: fmt.Errorf("failed to store batch: %v", err),
}
return
}

storeChan <- storeResult{
keys: keys,
err: nil,
}
}()

err = s.node.ValidateBatchV2(ctx, batch, blobShards, operatorState)
if err != nil {
res := <-storeChan
if len(res.keys) > 0 {
if deleteErr := s.node.StoreV2.DeleteKeys(res.keys); deleteErr != nil {
s.logger.Error("failed to delete keys", "err", deleteErr, "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:]))
}
}
return nil, api.NewErrorInternal(fmt.Sprintf("failed to validate batch: %v", err))
}

res := <-storeChan
if res.err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to store batch: %v", res.err))
}

sig := s.node.KeyPair.SignMessage(batchHeaderHash).Bytes()
return &pb.StoreChunksReply{
Signature: sig[:],
}, nil
}

// validateStoreChunksRequest validates the StoreChunksRequest and returns deserialized batch in the request
func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*corev2.Batch, error) {
if req.GetBatch() == nil {
return nil, api.NewErrorInvalidArg("missing batch in request")
}

batch, err := corev2.BatchFromProtobuf(req.GetBatch())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to deserialize batch: %v", err))
}

return batch, nil
}

func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.GetChunksReply, error) {
blobKey, err := corev2.BytesToBlobKey(in.GetBlobKey())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid blob key: %v", err))
}

if corev2.MaxQuorumID < in.GetQuorumId() {
return nil, api.NewErrorInvalidArg("invalid quorum ID")
}
quorumID := core.QuorumID(in.GetQuorumId())
chunks, err := s.node.StoreV2.GetChunks(blobKey, quorumID)
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get chunks: %v", err))
}

return &pb.GetChunksReply{
Chunks: chunks,
}, nil
}
Loading

0 comments on commit 5c260a4

Please sign in to comment.