Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync rollup_sync_service fixes #860

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions core/rawdb/accessors_rollup_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type FinalizedBatchMeta struct {
func WriteRollupEventSyncedL1BlockNumber(db ethdb.KeyValueWriter, l1BlockNumber uint64) {
value := big.NewInt(0).SetUint64(l1BlockNumber).Bytes()
if err := db.Put(rollupEventSyncedL1BlockNumberKey, value); err != nil {
log.Crit("failed to store rollup event synced L1 block number for rollup event", "err", err)
log.Crit("failed to store rollup event synced L1 block number for rollup event", "L1 block number", l1BlockNumber, "value", value, "err", err)
}
}

Expand All @@ -44,7 +44,7 @@ func ReadRollupEventSyncedL1BlockNumber(db ethdb.Reader) *uint64 {

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("unexpected rollup event synced L1 block number in database", "number", number)
log.Crit("unexpected rollup event synced L1 block number in database", "data", data, "number", number)
}

rollupEventSyncedL1BlockNumber := number.Uint64()
Expand All @@ -54,12 +54,12 @@ func ReadRollupEventSyncedL1BlockNumber(db ethdb.Reader) *uint64 {
// WriteBatchChunkRanges writes the block ranges for each chunk within a batch to the database.
// It serializes the chunk ranges using RLP and stores them under a key derived from the batch index.
func WriteBatchChunkRanges(db ethdb.KeyValueWriter, batchIndex uint64, chunkBlockRanges []*ChunkBlockRange) {
bytes, err := rlp.EncodeToBytes(chunkBlockRanges)
value, err := rlp.EncodeToBytes(chunkBlockRanges)
if err != nil {
log.Crit("failed to RLP encode batch chunk ranges", "batch index", batchIndex, "err", err)
}
if err := db.Put(batchChunkRangesKey(batchIndex), bytes); err != nil {
log.Crit("failed to store batch chunk ranges", "batch index", batchIndex, "err", err)
if err := db.Put(batchChunkRangesKey(batchIndex), value); err != nil {
log.Crit("failed to store batch chunk ranges", "batch index", batchIndex, "value", value, "err", err)
}
}

Expand Down Expand Up @@ -92,12 +92,12 @@ func ReadBatchChunkRanges(db ethdb.Reader, batchIndex uint64) []*ChunkBlockRange
// WriteFinalizedBatchMeta stores the metadata of a finalized batch in the database.
func WriteFinalizedBatchMeta(db ethdb.KeyValueWriter, batchIndex uint64, finalizedBatchMeta *FinalizedBatchMeta) {
var err error
bytes, err := rlp.EncodeToBytes(finalizedBatchMeta)
value, err := rlp.EncodeToBytes(finalizedBatchMeta)
if err != nil {
log.Crit("failed to RLP encode batch metadata", "batch index", batchIndex, "err", err)
log.Crit("failed to RLP encode batch metadata", "batch index", batchIndex, "finalized batch meta", finalizedBatchMeta, "err", err)
}
if err := db.Put(batchMetaKey(batchIndex), bytes); err != nil {
log.Crit("failed to store batch metadata", "batch index", batchIndex, "err", err)
if err := db.Put(batchMetaKey(batchIndex), value); err != nil {
log.Crit("failed to store batch metadata", "batch index", batchIndex, "value", value, "err", err)
}
}

Expand All @@ -108,7 +108,7 @@ func ReadFinalizedBatchMeta(db ethdb.Reader, batchIndex uint64) *FinalizedBatchM
return nil
}
if err != nil {
log.Crit("failed to read finalized batch metadata from database", "err", err)
log.Crit("failed to read finalized batch metadata from database", "batch index", batchIndex, "err", err)
}

fbm := new(FinalizedBatchMeta)
Expand All @@ -122,7 +122,7 @@ func ReadFinalizedBatchMeta(db ethdb.Reader, batchIndex uint64) *FinalizedBatchM
func WriteFinalizedL2BlockNumber(db ethdb.KeyValueWriter, l2BlockNumber uint64) {
value := big.NewInt(0).SetUint64(l2BlockNumber).Bytes()
if err := db.Put(finalizedL2BlockNumberKey, value); err != nil {
log.Crit("failed to store finalized L2 block number for rollup event", "err", err)
log.Crit("failed to store finalized L2 block number for rollup event", "L2 block number", l2BlockNumber, "value", value, "err", err)
}
}

Expand All @@ -133,12 +133,12 @@ func ReadFinalizedL2BlockNumber(db ethdb.Reader) *uint64 {
return nil
}
if err != nil {
log.Crit("failed to read finalized L2 block number from database", "err", err)
log.Crit("failed to read finalized L2 block number from database", "key", finalizedL2BlockNumberKey, "err", err)
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("unexpected finalized L2 block number in database", "number", number)
log.Crit("unexpected finalized L2 block number in database", "data", data, "number", number)
}

finalizedL2BlockNumber := number.Uint64()
Expand Down
6 changes: 6 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (b *EthAPIBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumb
// return block, nil
// }
if number == rpc.FinalizedBlockNumber {
if !b.eth.config.EnableRollupVerify {
return nil, errors.New("sync L1 finalized batch feature not enabled, cannot query L2 finalized block height")
}
finalizedBlockHeightPtr := rawdb.ReadFinalizedL2BlockNumber(b.eth.ChainDb())
if finalizedBlockHeightPtr == nil {
return nil, errors.New("L2 finalized block height not found in database")
Expand Down Expand Up @@ -152,6 +155,9 @@ func (b *EthAPIBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumbe
// return b.eth.blockchain.GetBlock(header.Hash(), header.Number.Uint64()), nil
// }
if number == rpc.FinalizedBlockNumber {
if !b.eth.config.EnableRollupVerify {
return nil, errors.New("sync L1 finalized batch feature not enabled, cannot query L2 finalized block height")
}
finalizedBlockHeightPtr := rawdb.ReadFinalizedL2BlockNumber(b.eth.ChainDb())
if finalizedBlockHeightPtr == nil {
return nil, errors.New("L2 finalized block height not found in database")
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
eth.syncService.Start()
if config.EnableRollupVerify {
// initialize and start rollup event sync service
eth.rollupSyncService, err = rollup_sync_service.NewRollupSyncService(context.Background(), chainConfig, eth.chainDb, l1Client, eth.blockchain, stack.Config().L1DeploymentBlock)
eth.rollupSyncService, err = rollup_sync_service.NewRollupSyncService(context.Background(), chainConfig, eth.chainDb, l1Client, eth.blockchain, stack)
if err != nil {
return nil, fmt.Errorf("cannot initialize rollup event sync service: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion rollup/rollup_sync_service/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func DecodeChunkBlockRanges(chunks [][]byte) ([]*rawdb.ChunkBlockRange, error) {

numBlocks := int(chunk[0])
if len(chunk) < 1+numBlocks*blockContextByteSize {
return nil, fmt.Errorf("chunk size doesn't match with numBlocks")
return nil, fmt.Errorf("chunk size doesn't match with numBlocks, byte length of chunk: %v, expected length: %v", len(chunk), 1+numBlocks*blockContextByteSize)
}

blockContexts := make([]*BlockContext, numBlocks)
Expand Down
4 changes: 4 additions & 0 deletions rollup/rollup_sync_service/l1client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (m *mockEthClient) TransactionByHash(ctx context.Context, txHash common.Has
}
return &tx, false, nil
}

func (m *mockEthClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
return nil, nil
}
73 changes: 47 additions & 26 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"reflect"
"syscall"
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
Expand All @@ -16,8 +15,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"

"github.com/ethereum/go-ethereum/rollup/rcfg"
"github.com/ethereum/go-ethereum/rollup/sync_service"
"github.com/ethereum/go-ethereum/rollup/withdrawtrie"
Expand Down Expand Up @@ -54,9 +53,10 @@ type RollupSyncService struct {
l1RevertBatchEventSignature common.Hash
l1FinalizeBatchEventSignature common.Hash
bc *core.BlockChain
stack *node.Node
}

func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, l1DeploymentBlock uint64) (*RollupSyncService, error) {
func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) {
// terminate if the caller does not provide an L1 client (e.g. in tests)
if l1Client == nil || (reflect.ValueOf(l1Client).Kind() == reflect.Ptr && reflect.ValueOf(l1Client).IsNil()) {
log.Warn("No L1 client provided, L1 rollup sync service will not run")
Expand All @@ -80,8 +80,8 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
// Initialize the latestProcessedBlock with the block just before the L1 deployment block.
// This serves as a default value when there's no L1 rollup events synced in the database.
var latestProcessedBlock uint64
if l1DeploymentBlock > 0 {
latestProcessedBlock = l1DeploymentBlock - 1
if stack.Config().L1DeploymentBlock > 0 {
latestProcessedBlock = stack.Config().L1DeploymentBlock - 1
}

block := rawdb.ReadRollupEventSyncedL1BlockNumber(db)
Expand All @@ -103,6 +103,7 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
l1RevertBatchEventSignature: scrollChainABI.Events["RevertBatch"].ID,
l1FinalizeBatchEventSignature: scrollChainABI.Events["FinalizeBatch"].ID,
bc: bc,
stack: stack,
}

return &service, nil
Expand Down Expand Up @@ -196,7 +197,7 @@ func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endB

chunkBlockRanges, err := s.getChunkRanges(batchIndex, &vLog)
if err != nil {
return fmt.Errorf("failed to get chunk ranges, err: %w", err)
return fmt.Errorf("failed to get chunk ranges, batch index: %v, err: %w", batchIndex, err)
}
rawdb.WriteBatchChunkRanges(s.db, batchIndex, chunkBlockRanges)

Expand All @@ -223,7 +224,7 @@ func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endB
return fmt.Errorf("failed to get local node info, batch index: %v, err: %w", batchIndex, err)
}

endBlock, finalizedBatchMeta, err := validateBatch(event, parentBatchMeta, chunks)
endBlock, finalizedBatchMeta, err := validateBatch(event, parentBatchMeta, chunks, s.stack)
if err != nil {
return fmt.Errorf("fatal: validateBatch failed: finalize event: %v, err: %w", event, err)
}
Expand Down Expand Up @@ -311,9 +312,26 @@ func (s *RollupSyncService) getChunkRanges(batchIndex uint64, vLog *types.Log) (
return []*rawdb.ChunkBlockRange{{StartBlockNumber: 0, EndBlockNumber: 0}}, nil
}

tx, _, err := s.client.client.TransactionByHash(context.Background(), vLog.TxHash)
tx, _, err := s.client.client.TransactionByHash(s.ctx, vLog.TxHash)
if err != nil {
return nil, fmt.Errorf("failed to get transaction, err: %w", err)
log.Debug("failed to get transaction by hash, probably an unindexed transaction, fetching the whole block to get the transaction",
"tx hash", vLog.TxHash.Hex(), "block number", vLog.BlockNumber, "block hash", vLog.BlockHash.Hex(), "err", err)
block, err := s.client.client.BlockByHash(s.ctx, vLog.BlockHash)
if err != nil {
return nil, fmt.Errorf("failed to get block by hash, block number: %v, block hash: %v, err: %w", vLog.BlockNumber, vLog.BlockHash.Hex(), err)
}

found := false
for _, txInBlock := range block.Transactions() {
if txInBlock.Hash() == vLog.TxHash {
tx = txInBlock
found = true
break
}
}
if !found {
return nil, fmt.Errorf("transaction not found in the block, tx hash: %v, block number: %v, block hash: %v", vLog.TxHash.Hex(), vLog.BlockNumber, vLog.BlockHash.Hex())
}
}

return s.decodeChunkBlockRanges(tx.Data())
Expand All @@ -323,17 +341,17 @@ func (s *RollupSyncService) getChunkRanges(batchIndex uint64, vLog *types.Log) (
func (s *RollupSyncService) decodeChunkBlockRanges(txData []byte) ([]*rawdb.ChunkBlockRange, error) {
const methodIDLength = 4
if len(txData) < methodIDLength {
return nil, fmt.Errorf("transaction data is too short")
return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength)
}

method, err := s.scrollChainABI.MethodById(txData[:methodIDLength])
if err != nil {
return nil, fmt.Errorf("failed to get method by ID, ID: %v, err: %w", txData[:4], err)
return nil, fmt.Errorf("failed to get method by ID, ID: %v, err: %w", txData[:methodIDLength], err)
}

values, err := method.Inputs.Unpack(txData[methodIDLength:])
if err != nil {
return nil, fmt.Errorf("failed to unpack transaction data using ABI: %v", err)
return nil, fmt.Errorf("failed to unpack transaction data using ABI, tx data: %v, err: %w", txData, err)
}

type commitBatchArgs struct {
Expand All @@ -345,11 +363,11 @@ func (s *RollupSyncService) decodeChunkBlockRanges(txData []byte) ([]*rawdb.Chun
var args commitBatchArgs
err = method.Inputs.Copy(&args, values)
if err != nil {
return nil, fmt.Errorf("failed to decode calldata into commitBatch args, err: %w", err)
return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err)
}

if args.Version != batchHeaderVersion {
return nil, fmt.Errorf("unexpected batch version, expected: %d, got: %v", batchHeaderVersion, args.Version)
return nil, fmt.Errorf("unexpected batch version, expected: %v, got: %v", batchHeaderVersion, args.Version)
}

return DecodeChunkBlockRanges(args.Chunks)
Expand All @@ -358,52 +376,55 @@ func (s *RollupSyncService) decodeChunkBlockRanges(txData []byte) ([]*rawdb.Chun
// validateBatch verifies the consistency between the L1 contract and L2 node data.
// The function will terminate the node and exit if any consistency check fails.
// It returns the number of the end block, a finalized batch meta data, and an error if any.
func validateBatch(event *L1FinalizeBatchEvent, parentBatchMeta *rawdb.FinalizedBatchMeta, chunks []*Chunk) (uint64, *rawdb.FinalizedBatchMeta, error) {
func validateBatch(event *L1FinalizeBatchEvent, parentBatchMeta *rawdb.FinalizedBatchMeta, chunks []*Chunk, stack *node.Node) (uint64, *rawdb.FinalizedBatchMeta, error) {
if len(chunks) == 0 {
return 0, nil, fmt.Errorf("invalid argument: length of chunks is 0")
return 0, nil, fmt.Errorf("invalid argument: length of chunks is 0, batch index: %v", event.BatchIndex.Uint64())
}

startChunk := chunks[0]
if len(startChunk.Blocks) == 0 {
return 0, nil, fmt.Errorf("invalid argument: block count of start chunk is 0")
return 0, nil, fmt.Errorf("invalid argument: block count of start chunk is 0, batch index: %v", event.BatchIndex.Uint64())
}
startBlock := startChunk.Blocks[0]

endChunk := chunks[len(chunks)-1]
if len(endChunk.Blocks) == 0 {
return 0, nil, fmt.Errorf("invalid argument: block count of end chunk is 0")
return 0, nil, fmt.Errorf("invalid argument: block count of end chunk is 0, batch index: %v", event.BatchIndex.Uint64())
}
endBlock := endChunk.Blocks[len(endChunk.Blocks)-1]

localStateRoot := endBlock.Header.Root
if localStateRoot != event.StateRoot {
log.Error("State root mismatch", "batch index", event.BatchIndex.Uint64(), "start block", startBlock.Header.Number.Uint64(), "end block", endBlock.Header.Number.Uint64(), "parent batch hash", parentBatchMeta.BatchHash.Hex(), "l1 finalized state root", event.StateRoot.Hex(), "l2 state root", localStateRoot.Hex())
syscall.Kill(os.Getpid(), syscall.SIGTERM)
return 0, nil, fmt.Errorf("state root mismatch")
stack.Close()
os.Exit(1)
}

localWithdrawRoot := endBlock.WithdrawRoot
if localWithdrawRoot != event.WithdrawRoot {
log.Error("Withdraw root mismatch", "batch index", event.BatchIndex.Uint64(), "start block", startBlock.Header.Number.Uint64(), "end block", endBlock.Header.Number.Uint64(), "parent batch hash", parentBatchMeta.BatchHash.Hex(), "l1 finalized withdraw root", event.WithdrawRoot.Hex(), "l2 withdraw root", localWithdrawRoot.Hex())
syscall.Kill(os.Getpid(), syscall.SIGTERM)
return 0, nil, fmt.Errorf("withdraw root mismatch")
stack.Close()
os.Exit(1)
}

// Note: All params for NewBatchHeader are calculated locally based on the block data.
batchHeader, err := NewBatchHeader(batchHeaderVersion, event.BatchIndex.Uint64(), parentBatchMeta.TotalL1MessagePopped, parentBatchMeta.BatchHash, chunks)
if err != nil {
return 0, nil, fmt.Errorf("failed to construct batch header, err: %w", err)
return 0, nil, fmt.Errorf("failed to construct batch header, batch index: %v, err: %w", event.BatchIndex.Uint64(), err)
}

// Note: If the batch headers match, this ensures the consistency of blocks and transactions
// (including skipped transactions) between L1 and L2.
localBatchHash := batchHeader.Hash()
if localBatchHash != event.BatchHash {
log.Error("Batch hash mismatch", "batch index", event.BatchIndex.Uint64(), "start block", startBlock.Header.Number.Uint64(), "end block", endBlock.Header.Number.Uint64(), "parent batch hash", parentBatchMeta.BatchHash.Hex(), "parent TotalL1MessagePopped", parentBatchMeta.TotalL1MessagePopped, "l1 finalized batch hash", event.BatchHash.Hex(), "l2 batch hash", localBatchHash.Hex())
chunksJson, _ := json.Marshal(chunks)
chunksJson, err := json.Marshal(chunks)
if err != nil {
log.Error("marshal chunks failed", "err", err)
}
log.Error("Chunks", "chunks", string(chunksJson))
syscall.Kill(os.Getpid(), syscall.SIGTERM)
return 0, nil, fmt.Errorf("batch hash mismatch")
stack.Close()
os.Exit(1)
}

totalL1MessagePopped := parentBatchMeta.TotalL1MessagePopped
Expand Down
19 changes: 15 additions & 4 deletions rollup/rollup_sync_service/rollup_sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
)

Expand All @@ -32,7 +33,12 @@ func TestRollupSyncServiceStartAndStop(t *testing.T) {
db := rawdb.NewDatabase(memorydb.New())
l1Client := &mockEthClient{}
bc := &core.BlockChain{}
service, err := NewRollupSyncService(context.Background(), genesisConfig, db, l1Client, bc, 1)
stack, err := node.New(&node.DefaultConfig)
if err != nil {
t.Fatalf("Failed to new P2P node: %v", err)
}
defer stack.Close()
service, err := NewRollupSyncService(context.Background(), genesisConfig, db, l1Client, bc, stack)
if err != nil {
t.Fatalf("Failed to new rollup sync service: %v", err)
}
Expand Down Expand Up @@ -112,7 +118,12 @@ func TestGetChunkRanges(t *testing.T) {
commitBatchRLP: rlpData,
}
bc := &core.BlockChain{}
service, err := NewRollupSyncService(context.Background(), genesisConfig, db, l1Client, bc, 1)
stack, err := node.New(&node.DefaultConfig)
if err != nil {
t.Fatalf("Failed to new P2P node: %v", err)
}
defer stack.Close()
service, err := NewRollupSyncService(context.Background(), genesisConfig, db, l1Client, bc, stack)
if err != nil {
t.Fatalf("Failed to new rollup sync service: %v", err)
}
Expand Down Expand Up @@ -169,7 +180,7 @@ func TestValidateBatch(t *testing.T) {
StateRoot: chunk3.Blocks[len(chunk3.Blocks)-1].Header.Root,
WithdrawRoot: chunk3.Blocks[len(chunk3.Blocks)-1].WithdrawRoot,
}
endBlock1, finalizedBatchMeta1, err := validateBatch(event1, parentBatchMeta1, []*Chunk{chunk1, chunk2, chunk3})
endBlock1, finalizedBatchMeta1, err := validateBatch(event1, parentBatchMeta1, []*Chunk{chunk1, chunk2, chunk3}, nil)
assert.NoError(t, err)
assert.Equal(t, uint64(13), endBlock1)

Expand All @@ -193,7 +204,7 @@ func TestValidateBatch(t *testing.T) {
StateRoot: chunk4.Blocks[len(chunk4.Blocks)-1].Header.Root,
WithdrawRoot: chunk4.Blocks[len(chunk4.Blocks)-1].WithdrawRoot,
}
endBlock2, finalizedBatchMeta2, err := validateBatch(event2, parentBatchMeta2, []*Chunk{chunk4})
endBlock2, finalizedBatchMeta2, err := validateBatch(event2, parentBatchMeta2, []*Chunk{chunk4}, nil)
assert.NoError(t, err)
assert.Equal(t, uint64(17), endBlock2)

Expand Down
1 change: 1 addition & 0 deletions rollup/sync_service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ type EthClient interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, isPending bool, err error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
}
Loading