diff --git a/go.mod b/go.mod index db0e122a1d37..288892e7479a 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/prometheus/tsdb v0.7.1 github.com/rjeczalik/notify v0.9.1 github.com/rs/cors v1.7.0 - github.com/scroll-tech/da-codec v0.1.3-0.20241218102542-9852fa4e1be5 + github.com/scroll-tech/da-codec v0.1.3-0.20250122041800-4ef7bfc6b634 github.com/scroll-tech/zktrie v0.8.4 github.com/shirou/gopsutil v3.21.11+incompatible github.com/sourcegraph/conc v0.3.0 diff --git a/go.sum b/go.sum index 47f631beb92e..0f05327e769b 100644 --- a/go.sum +++ b/go.sum @@ -398,6 +398,12 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/scroll-tech/da-codec v0.1.3-0.20241218102542-9852fa4e1be5 h1:vZ75srkZCStjDWq/kqZGLoucf7Y7qXC13nKjQVZ0zp8= github.com/scroll-tech/da-codec v0.1.3-0.20241218102542-9852fa4e1be5/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs= +github.com/scroll-tech/da-codec v0.1.3-0.20250121050419-8c2a5ccc1b2e h1:Sp1RjVsK9PLVW5zMlwMUNegsDpYmVN8noT/C4Bjro0U= +github.com/scroll-tech/da-codec v0.1.3-0.20250121050419-8c2a5ccc1b2e/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs= +github.com/scroll-tech/da-codec v0.1.3-0.20250122003441-91171709155b h1:DWiVtzXK/3lXK3+/aaAeorurjj88ITO17hhLECIS/0g= +github.com/scroll-tech/da-codec v0.1.3-0.20250122003441-91171709155b/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs= +github.com/scroll-tech/da-codec v0.1.3-0.20250122041800-4ef7bfc6b634 h1:YtD7XjP1F7GzL9nxj1lq88m1/bwSroVSGVR050i05yY= +github.com/scroll-tech/da-codec v0.1.3-0.20250122041800-4ef7bfc6b634/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs= github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE= github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= diff --git a/rollup/da_syncer/da/calldata_blob_source.go b/rollup/da_syncer/da/calldata_blob_source.go index bf4a2a24ef2c..2978c8c1bd79 100644 --- a/rollup/da_syncer/da/calldata_blob_source.go +++ b/rollup/da_syncer/da/calldata_blob_source.go @@ -8,6 +8,7 @@ import ( "github.com/scroll-tech/da-codec/encoding" "github.com/scroll-tech/go-ethereum/accounts/abi" + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" @@ -95,6 +96,28 @@ func (ds *CalldataBlobSource) L1Finalized() uint64 { func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEvents) (Entries, error) { var entries Entries + var emptyHash common.Hash + // we keep track of the last commit transaction hash, so we can process all events created in the same tx together. + // if we have a different commit transaction, we need to create a new commit batch DA. + var lastCommitTransactionHash common.Hash + // we keep track of the commit events created in the same tx, so we can process them together. + var lastCommitEvents []*l1.CommitBatchEvent + + // getAndAppendCommitBatchDA is a helper function that gets the commit batch DA for the last commit events and appends it to the entries list. + // It also resets the last commit events and last commit transaction hash. + getAndAppendCommitBatchDA := func() error { + commitBatchDAEntries, err := ds.getCommitBatchDA(lastCommitEvents) + if err != nil { + return fmt.Errorf("failed to get commit batch da: %v, err: %w", lastCommitEvents[0].BatchIndex().Uint64(), err) + } + + entries = append(entries, commitBatchDAEntries...) + lastCommitEvents = nil + lastCommitTransactionHash = emptyHash + + return nil + } + var entry Entry var err error for _, rollupEvent := range rollupEvents { @@ -105,11 +128,25 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven if !ok { return nil, fmt.Errorf("unexpected type of rollup event: %T", rollupEvent) } - if entry, err = ds.getCommitBatchDA(commitEvent); err != nil { - return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", rollupEvent.BatchIndex().Uint64(), err) + + // if this is a different commit transaction, we need to create a new DA + if lastCommitTransactionHash != commitEvent.TxHash() && len(lastCommitEvents) > 0 { + if err = getAndAppendCommitBatchDA(); err != nil { + return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err) + } } + // add commit event to the list of previous commit events, so we can process events created in the same tx together + lastCommitTransactionHash = commitEvent.TxHash() + lastCommitEvents = append(lastCommitEvents, commitEvent) case l1.RevertEventType: + // if we have any previous commit events, we need to create a new DA before processing the revert event + if len(lastCommitEvents) > 0 { + if err = getAndAppendCommitBatchDA(); err != nil { + return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err) + } + } + revertEvent, ok := rollupEvent.(*l1.RevertBatchEvent) // this should never happen because we just check event type if !ok { @@ -117,8 +154,15 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven } entry = NewRevertBatch(revertEvent) - + entries = append(entries, entry) case l1.FinalizeEventType: + // if we have any previous commit events, we need to create a new DA before processing the finalized event + if len(lastCommitEvents) > 0 { + if err = getAndAppendCommitBatchDA(); err != nil { + return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err) + } + } + finalizeEvent, ok := rollupEvent.(*l1.FinalizeBatchEvent) // this should never happen because we just check event type if !ok { @@ -126,37 +170,97 @@ func (ds *CalldataBlobSource) processRollupEventsToDA(rollupEvents l1.RollupEven } entry = NewFinalizeBatch(finalizeEvent) - + entries = append(entries, entry) default: return nil, fmt.Errorf("unknown rollup event, type: %v", rollupEvent.Type()) } + } - entries = append(entries, entry) + // if we have any previous commit events, we need to process them before returning + if len(lastCommitEvents) > 0 { + if err = getAndAppendCommitBatchDA(); err != nil { + return nil, fmt.Errorf("failed to get and append commit batch DA: %w", err) + } } + return entries, nil } -func (ds *CalldataBlobSource) getCommitBatchDA(commitEvent *l1.CommitBatchEvent) (Entry, error) { - if commitEvent.BatchIndex().Uint64() == 0 { - return NewCommitBatchDAV0Empty(), nil +func (ds *CalldataBlobSource) getCommitBatchDA(commitEvents []*l1.CommitBatchEvent) (Entries, error) { + if len(commitEvents) == 0 { + return nil, fmt.Errorf("commit events are empty") + } + + if commitEvents[0].BatchIndex().Uint64() == 0 { + return Entries{NewCommitBatchDAV0Empty()}, nil } - args, err := ds.l1Reader.FetchCommitTxData(commitEvent) + firstCommitEvent := commitEvents[0] + args, err := ds.l1Reader.FetchCommitTxData(firstCommitEvent) if err != nil { - return nil, fmt.Errorf("failed to fetch commit tx data of batch %d, tx hash: %v, err: %w", commitEvent.BatchIndex().Uint64(), commitEvent.TxHash().Hex(), err) + return nil, fmt.Errorf("failed to fetch commit tx data of batch %d, tx hash: %v, err: %w", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.TxHash().Hex(), err) + } + + blockHeader, err := ds.l1Reader.FetchBlockHeaderByNumber(firstCommitEvent.BlockNumber()) + if err != nil { + return nil, fmt.Errorf("failed to get header by number, err: %w", err) } codec, err := encoding.CodecFromVersion(encoding.CodecVersion(args.Version)) if err != nil { - return nil, fmt.Errorf("unsupported codec version: %v, batch index: %v, err: %w", args.Version, commitEvent.BatchIndex().Uint64(), err) + return nil, fmt.Errorf("unsupported codec version: %v, batch index: %v, err: %w", args.Version, firstCommitEvent.BatchIndex().Uint64(), err) } - switch codec.Version() { - case 0: - return NewCommitBatchDAV0(ds.db, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) - case 1, 2, 3, 4: - return NewCommitBatchDAWithBlob(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) - default: - return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) + var entries Entries + var entry Entry + var previousEvent *l1.CommitBatchEvent + for i, commitEvent := range commitEvents { + // sanity check commit events from batches submitted in the same L1 transaction + if commitEvent.TxHash() != firstCommitEvent.TxHash() { + return nil, fmt.Errorf("commit events have different tx hashes, batch index: %d, tx: %s - batch index: %d, tx: %s", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.TxHash().Hex(), commitEvent.BatchIndex().Uint64(), commitEvent.TxHash().Hex()) + } + if commitEvent.BlockNumber() != firstCommitEvent.BlockNumber() { + return nil, fmt.Errorf("commit events have different block numbers, batch index: %d, block number: %d - batch index: %d, block number: %d", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.BlockNumber(), commitEvent.BatchIndex().Uint64(), commitEvent.BlockNumber()) + } + if commitEvent.BlockHash() != firstCommitEvent.BlockHash() { + return nil, fmt.Errorf("commit events have different block hashes, batch index: %d, hash: %s - batch index: %d, hash: %s", firstCommitEvent.BatchIndex().Uint64(), firstCommitEvent.BlockHash().Hex(), commitEvent.BatchIndex().Uint64(), commitEvent.BlockHash().Hex()) + } + if previousEvent != nil && commitEvent.BatchIndex().Uint64() != previousEvent.BatchIndex().Uint64()+1 { + return nil, fmt.Errorf("commit events are not in sequence, batch index: %d, hash: %s - previous batch index: %d, hash: %s", commitEvent.BatchIndex().Uint64(), commitEvent.BatchHash().Hex(), previousEvent.BatchIndex().Uint64(), previousEvent.BatchHash().Hex()) + } + + switch codec.Version() { + case 0: + if entry, err = NewCommitBatchDAV0(ds.db, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap); err != nil { + return nil, fmt.Errorf("failed to decode DA, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err) + } + case 1, 2, 3, 4, 5, 6: + if entry, err = NewCommitBatchDAV1(ds.ctx, ds.db, ds.blobClient, codec, commitEvent, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, args.BlobHashes, blockHeader.Time); err != nil { + return nil, fmt.Errorf("failed to decode DA, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err) + } + case 7: + if i >= len(args.BlobHashes) { + return nil, fmt.Errorf("not enough blob hashes for commit transaction: %s, index in tx: %d, batch index: %d, hash: %s", firstCommitEvent.TxHash(), i, commitEvent.BatchIndex().Uint64(), commitEvent.BatchHash().Hex()) + } + blobHash := args.BlobHashes[i] + + var parentBatchHash common.Hash + if previousEvent == nil { + parentBatchHash = common.BytesToHash(args.ParentBatchHeader) + } else { + parentBatchHash = previousEvent.BatchHash() + } + + if entry, err = NewCommitBatchDAV7(ds.ctx, ds.db, ds.blobClient, codec, commitEvent, blobHash, parentBatchHash, blockHeader.Time); err != nil { + return nil, fmt.Errorf("failed to decode DA, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err) + } + default: + return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) + } + + previousEvent = commitEvent + entries = append(entries, entry) } + + return entries, nil } diff --git a/rollup/da_syncer/da/commitV1.go b/rollup/da_syncer/da/commitV1.go index 6fdcf45b6d14..c0ef1292055d 100644 --- a/rollup/da_syncer/da/commitV1.go +++ b/rollup/da_syncer/da/commitV1.go @@ -21,36 +21,28 @@ type CommitBatchDAV1 struct { versionedHashes []common.Hash } -func NewCommitBatchDAWithBlob(ctx context.Context, db ethdb.Database, - l1Reader *l1.Reader, +func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database, blobClient blob_client.BlobClient, codec encoding.Codec, commitEvent *l1.CommitBatchEvent, parentBatchHeader []byte, chunks [][]byte, skippedL1MessageBitmap []byte, + versionedHashes []common.Hash, + l1BlockTime uint64, ) (*CommitBatchDAV1, error) { decodedChunks, err := codec.DecodeDAChunksRawTx(chunks) if err != nil { return nil, fmt.Errorf("failed to unpack chunks: %v, err: %w", commitEvent.BatchIndex().Uint64(), err) } - versionedHashes, err := l1Reader.FetchTxBlobHashes(commitEvent.TxHash(), commitEvent.BlockHash()) - if err != nil { - return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err) - } - // with CommitBatchDAV1 we expect only one versioned hash as we commit only one blob per batch submission if len(versionedHashes) != 1 { return nil, fmt.Errorf("unexpected number of versioned hashes: %d", len(versionedHashes)) } versionedHash := versionedHashes[0] - header, err := l1Reader.FetchBlockHeaderByNumber(commitEvent.BlockNumber()) - if err != nil { - return nil, fmt.Errorf("failed to get header by number, err: %w", err) - } - blob, err := blobClient.GetBlobByVersionedHashAndBlockTime(ctx, versionedHash, header.Time) + blob, err := blobClient.GetBlobByVersionedHashAndBlockTime(ctx, versionedHash, l1BlockTime) if err != nil { return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) } diff --git a/rollup/da_syncer/da/commitV7.go b/rollup/da_syncer/da/commitV7.go new file mode 100644 index 000000000000..01aa5249e749 --- /dev/null +++ b/rollup/da_syncer/da/commitV7.go @@ -0,0 +1,189 @@ +package da + +import ( + "context" + "crypto/sha256" + "fmt" + + "github.com/scroll-tech/da-codec/encoding" + + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" + "github.com/scroll-tech/go-ethereum/rollup/l1" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" + "github.com/scroll-tech/go-ethereum/ethdb" +) + +type CommitBatchDAV7 struct { + version encoding.CodecVersion + batchIndex uint64 + initialL1MessageIndex uint64 + blocks []encoding.DABlock + transactions []types.Transactions + l1Txs []types.Transactions + versionedHashes []common.Hash + + event *l1.CommitBatchEvent +} + +func NewCommitBatchDAV7(ctx context.Context, db ethdb.Database, + blobClient blob_client.BlobClient, + codec encoding.Codec, + commitEvent *l1.CommitBatchEvent, + blobHash common.Hash, + parentBatchHash common.Hash, + l1BlockTime uint64, +) (*CommitBatchDAV7, error) { + calculatedBatch, err := codec.NewDABatchFromParams(commitEvent.BatchIndex().Uint64(), blobHash, parentBatchHash) + if err != nil { + return nil, fmt.Errorf("failed to create new DA batch from params, batch index: %d, err: %w", commitEvent.BatchIndex().Uint64(), err) + } + + if calculatedBatch.Hash() != commitEvent.BatchHash() { + return nil, fmt.Errorf("calculated batch hash is not equal to the one from commit event: %s, calculated hash: %s", commitEvent.BatchHash().Hex(), calculatedBatch.Hash().Hex()) + } + + blob, err := blobClient.GetBlobByVersionedHashAndBlockTime(ctx, blobHash, l1BlockTime) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) + } + if blob == nil { + return nil, fmt.Errorf("unexpected, blob == nil and err != nil, batch index: %d, versionedHash: %s, blobClient: %T", commitEvent.BatchIndex().Uint64(), blobHash.Hex(), blobClient) + } + + // compute blob versioned hash and compare with one from tx + c, err := kzg4844.BlobToCommitment(blob) + if err != nil { + return nil, fmt.Errorf("failed to create blob commitment: %w", err) + } + blobVersionedHash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &c)) + if blobVersionedHash != blobHash { + return nil, fmt.Errorf("blobVersionedHash from blob source is not equal to versionedHash from tx, correct versioned hash: %s, fetched blob hash: %s", blobHash.Hex(), blobVersionedHash.String()) + } + + blobPayload, err := codec.DecodeBlob(blob) + if err != nil { + return nil, fmt.Errorf("failed to decode blob: %w", err) + } + + l1Txs, err := getL1MessagesV7(db, blobPayload.Blocks(), blobPayload.InitialL1MessageIndex()) + if err != nil { + return nil, fmt.Errorf("failed to get L1 messages for v7 batch %d: %w", commitEvent.BatchIndex().Uint64(), err) + } + + return &CommitBatchDAV7{ + version: codec.Version(), + batchIndex: commitEvent.BatchIndex().Uint64(), + initialL1MessageIndex: blobPayload.InitialL1MessageIndex(), + blocks: blobPayload.Blocks(), + transactions: blobPayload.Transactions(), + l1Txs: l1Txs, + versionedHashes: []common.Hash{blobVersionedHash}, + event: commitEvent, + }, nil +} + +func (c *CommitBatchDAV7) Type() Type { + return CommitBatchWithBlobType +} + +func (c *CommitBatchDAV7) BlobVersionedHashes() []common.Hash { + return c.versionedHashes +} + +func (c *CommitBatchDAV7) BatchIndex() uint64 { + return c.batchIndex +} + +func (c *CommitBatchDAV7) L1BlockNumber() uint64 { + return c.event.BlockNumber() +} + +func (c *CommitBatchDAV7) CompareTo(other Entry) int { + if c.BatchIndex() < other.BatchIndex() { + return -1 + } else if c.BatchIndex() > other.BatchIndex() { + return 1 + } + return 0 +} + +func (c *CommitBatchDAV7) Event() l1.RollupEvent { + return c.event +} + +func (c *CommitBatchDAV7) Blocks() []*PartialBlock { + var blocks []*PartialBlock + + for i, daBlock := range c.blocks { + // create txs + txs := make(types.Transactions, 0, daBlock.NumTransactions()) + + // insert L1 messages + txs = append(txs, c.l1Txs[i]...) + + // insert L2 txs + txs = append(txs, c.transactions[i]...) + + block := NewPartialBlock( + &PartialHeader{ + Number: daBlock.Number(), + Time: daBlock.Timestamp(), + BaseFee: daBlock.BaseFee(), + GasLimit: daBlock.GasLimit(), + Difficulty: 1, // difficulty is enforced to be 1 + ExtraData: []byte{}, // extra data is enforced to be empty or at least excluded from the block hash + }, + txs) + blocks = append(blocks, block) + } + + return blocks +} + +func (c *CommitBatchDAV7) Version() encoding.CodecVersion { + return c.version +} + +func (c *CommitBatchDAV7) Chunks() []*encoding.DAChunkRawTx { + return []*encoding.DAChunkRawTx{ + { + Blocks: c.blocks, + Transactions: c.transactions, + }, + } +} + +func getL1MessagesV7(db ethdb.Database, blocks []encoding.DABlock, initialL1MessageIndex uint64) ([]types.Transactions, error) { + allTxs := make([]types.Transactions, 0, len(blocks)) + + messageIndex := initialL1MessageIndex + totalL1Messages := 0 + for _, block := range blocks { + var txsPerBlock types.Transactions + for i := messageIndex; i < messageIndex+uint64(block.NumL1Messages()); i++ { + l1Tx := rawdb.ReadL1Message(db, i) + if l1Tx == nil { + // message not yet available + // we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry + return nil, serrors.EOFError + } + + txsPerBlock = append(txsPerBlock, types.NewTx(l1Tx)) + } + + totalL1Messages += int(block.NumL1Messages()) + messageIndex += uint64(block.NumL1Messages()) + allTxs = append(allTxs, txsPerBlock) + } + + if messageIndex != initialL1MessageIndex+uint64(totalL1Messages) { + return nil, fmt.Errorf("unexpected message index: %d, expected: %d", messageIndex, initialL1MessageIndex+uint64(totalL1Messages)) + } + + return allTxs, nil +} diff --git a/rollup/l1/abi.go b/rollup/l1/abi.go index dcf09f25fd13..de232045365d 100644 --- a/rollup/l1/abi.go +++ b/rollup/l1/abi.go @@ -234,6 +234,7 @@ type CommitBatchArgs struct { ParentBatchHeader []byte Chunks [][]byte SkippedL1MessageBitmap []byte + BlobHashes []common.Hash } func newCommitBatchArgs(method *abi.Method, values []interface{}) (*CommitBatchArgs, error) { diff --git a/rollup/l1/reader.go b/rollup/l1/reader.go index 2902b48caefa..82905f9511ec 100644 --- a/rollup/l1/reader.go +++ b/rollup/l1/reader.go @@ -379,5 +379,7 @@ func (r *Reader) FetchCommitTxData(commitEvent *CommitBatchEvent) (*CommitBatchA return nil, fmt.Errorf("unknown method name for commit transaction: %s", method.Name) } + args.BlobHashes = tx.BlobHashes() + return args, nil }