Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node v2 StoreChunks and GetChunks endpoints #893

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/mock/v2/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package v2

import (
"context"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

// MockShardValidator is a mock implementation of ShardValidator
type MockShardValidator struct {
mock.Mock
}

var _ corev2.ShardValidator = (*MockShardValidator)(nil)

func NewMockShardValidator() *MockShardValidator {
return &MockShardValidator{}
}

func (v *MockShardValidator) ValidateBatchHeader(ctx context.Context, header *corev2.BatchHeader, blobCerts []*corev2.BlobCertificate) error {
args := v.Called()
return args.Error(0)
}

func (v *MockShardValidator) ValidateBlobs(ctx context.Context, blobs []*corev2.BlobShard, pool common.WorkerPool, state *core.OperatorState) error {
args := v.Called()
return args.Error(0)
}
2 changes: 1 addition & 1 deletion core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func checkBatchByUniversalVerifier(

for id := range state.IndexedOperators {

val := corev2.NewShardValidator(v, cst, id)
val := corev2.NewShardValidator(v, id)

blobs := packagedBlobs[id]

Expand Down
20 changes: 20 additions & 0 deletions core/v2/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -292,6 +294,24 @@ func DeserializeBatchHeader(data []byte) (*BatchHeader, error) {
return &h, nil
}

func BuildMerkleTree(certs []*BlobCertificate) (*merkletree.MerkleTree, error) {
leafs := make([][]byte, len(certs))
for i, cert := range certs {
leaf, err := cert.Hash()
if err != nil {
return nil, fmt.Errorf("failed to compute blob header hash: %w", err)
}
leafs[i] = leaf[:]
}

tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New()))
if err != nil {
return nil, err
}

return tree, nil
}

func encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
47 changes: 37 additions & 10 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -15,27 +16,32 @@ var (
ErrBlobQuorumSkip = errors.New("blob skipped for a quorum before verification")
)

type ShardValidator interface {
ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error
ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error
}

type BlobShard struct {
*BlobCertificate
Bundles core.Bundles
}

// shardValidator implements the validation logic that a DA node should apply to its received data
type ShardValidator struct {
type shardValidator struct {
verifier encoding.Verifier
chainState core.ChainState
operatorID core.OperatorID
}

func NewShardValidator(v encoding.Verifier, cst core.ChainState, operatorID core.OperatorID) *ShardValidator {
return &ShardValidator{
var _ ShardValidator = (*shardValidator)(nil)

func NewShardValidator(v encoding.Verifier, operatorID core.OperatorID) *shardValidator {
return &shardValidator{
verifier: v,
chainState: cst,
operatorID: operatorID,
}
}

func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) {
func (v *shardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) {

// Check if the operator is a member of the quorum
if _, ok := operatorState.Operators[quorum]; !ok {
Expand Down Expand Up @@ -72,7 +78,28 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
return chunks, &assignment, nil
}

func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error {
func (v *shardValidator) ValidateBatchHeader(ctx context.Context, header *BatchHeader, blobCerts []*BlobCertificate) error {
if header == nil {
return fmt.Errorf("batch header is nil")
}

if len(blobCerts) == 0 {
return fmt.Errorf("no blob certificates")
}

tree, err := BuildMerkleTree(blobCerts)
if err != nil {
return fmt.Errorf("failed to build merkle tree: %v", err)
}

if !bytes.Equal(tree.Root(), header.BatchRoot[:]) {
return fmt.Errorf("batch root does not match")
}

return nil
}

func (v *shardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error {
var err error
subBatchMap := make(map[encoding.EncodingParams]*encoding.SubBatch)
blobCommitmentList := make([]encoding.BlobCommitments, len(blobs))
Expand Down Expand Up @@ -152,7 +179,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
for _, blobCommitments := range blobCommitmentList {
blobCommitments := blobCommitments
pool.Submit(func() {
v.VerifyBlobLengthWorker(blobCommitments, out)
v.verifyBlobLengthWorker(blobCommitments, out)
})
}
// check if commitments are equivalent
Expand All @@ -171,7 +198,7 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
return nil
}

func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) {
func (v *shardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) {

err := v.verifier.UniversalVerifySubBatch(params, subBatch.Samples, subBatch.NumBlobs)
if err != nil {
Expand All @@ -182,7 +209,7 @@ func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, s
out <- nil
}

func (v *ShardValidator) VerifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) {
func (v *shardValidator) verifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) {
err := v.verifier.VerifyBlobLength(blobCommitments)
if err != nil {
out <- err
Expand Down
22 changes: 1 addition & 21 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
)

var errNoBlobsToDispatch = errors.New("no blobs to dispatch")
Expand Down Expand Up @@ -292,7 +290,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
ReferenceBlockNumber: referenceBlockNumber,
}

tree, err := BuildMerkleTree(certs)
tree, err := corev2.BuildMerkleTree(certs)
if err != nil {
return nil, fmt.Errorf("failed to build merkle tree: %w", err)
}
Expand Down Expand Up @@ -397,21 +395,3 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKe
}
return nil
}

func BuildMerkleTree(certs []*corev2.BlobCertificate) (*merkletree.MerkleTree, error) {
leafs := make([][]byte, len(certs))
for i, cert := range certs {
leaf, err := cert.Hash()
if err != nil {
return nil, fmt.Errorf("failed to compute blob header hash: %w", err)
}
leafs[i] = leaf[:]
}

tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New()))
if err != nil {
return nil, err
}

return tree, nil
}
4 changes: 2 additions & 2 deletions disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDispatcherHandleBatch(t *testing.T) {
ctx := context.Background()

// Get batch header hash to mock signatures
merkleTree, err := controller.BuildMerkleTree(objs.blobCerts)
merkleTree, err := corev2.BuildMerkleTree(objs.blobCerts)
require.NoError(t, err)
require.NotNil(t, merkleTree)
require.NotNil(t, merkleTree.Root())
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestDispatcherBuildMerkleTree(t *testing.T) {
RelayKeys: []corev2.RelayKey{0, 1, 2},
},
}
merkleTree, err := controller.BuildMerkleTree(certs)
merkleTree, err := corev2.BuildMerkleTree(certs)
require.NoError(t, err)
require.NotNil(t, merkleTree)
require.NotNil(t, merkleTree.Root())
Expand Down
101 changes: 100 additions & 1 deletion node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package grpc

import (
"context"
"encoding/hex"
"fmt"
"runtime"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/kvstore"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/shirou/gopsutil/mem"
Expand Down Expand Up @@ -53,5 +58,99 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
return &pb.StoreChunksReply{}, api.NewErrorUnimplemented()
batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
}

batchHeaderHash, err := batch.BatchHeader.Hash()
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("invalid batch header: %v", err))
}

operatorState, err := s.node.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), s.node.Config.ID)
if err != nil {
return nil, err
}

blobShards, rawBundles, err := s.node.DownloadBundles(ctx, batch, operatorState)
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to download batch: %v", err))
}

type storeResult struct {
keys []kvstore.Key
err error
}
storeChan := make(chan storeResult)
go func() {
keys, err := s.node.StoreV2.StoreBatch(batch, rawBundles)
if err != nil {
storeChan <- storeResult{
keys: nil,
err: fmt.Errorf("failed to store batch: %v", err),
}
return
}

storeChan <- storeResult{
keys: keys,
err: nil,
}
}()

err = s.node.ValidateBatchV2(ctx, batch, blobShards, operatorState)
if err != nil {
res := <-storeChan
if len(res.keys) > 0 {
if deleteErr := s.node.StoreV2.DeleteKeys(res.keys); deleteErr != nil {
s.logger.Error("failed to delete keys", "err", deleteErr, "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:]))
}
}
return nil, api.NewErrorInternal(fmt.Sprintf("failed to validate batch: %v", err))
}

res := <-storeChan
if res.err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to store batch: %v", res.err))
}

sig := s.node.KeyPair.SignMessage(batchHeaderHash).Bytes()
return &pb.StoreChunksReply{
Signature: sig[:],
}, nil
}

// validateStoreChunksRequest validates the StoreChunksRequest and returns deserialized batch in the request
func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*corev2.Batch, error) {
if req.GetBatch() == nil {
return nil, api.NewErrorInvalidArg("missing batch in request")
}

batch, err := corev2.BatchFromProtobuf(req.GetBatch())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to deserialize batch: %v", err))
}

return batch, nil
}

func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.GetChunksReply, error) {
blobKey, err := corev2.BytesToBlobKey(in.GetBlobKey())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid blob key: %v", err))
}

if corev2.MaxQuorumID < in.GetQuorumId() {
return nil, api.NewErrorInvalidArg("invalid quorum ID")
}
quorumID := core.QuorumID(in.GetQuorumId())
chunks, err := s.node.StoreV2.GetChunks(blobKey, quorumID)
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to get chunks: %v", err))
}

return &pb.GetChunksReply{
Chunks: chunks,
}, nil
}
Loading
Loading