Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add e2e reorg tests to syncers #56

Merged
merged 12 commits into from
Sep 25, 2024
3 changes: 2 additions & 1 deletion l1infotreesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func buildAppender(client EthClienter, globalExitRoot, rollupManager common.Addr
l, err,
)
}
log.Infof("updateL1InfoTreeSignatureV2: expected root: %s", common.Bytes2Hex(l1InfoTreeUpdate.CurrentL1InfoRoot[:]))
log.Infof("updateL1InfoTreeSignatureV2: expected root: %s",
common.BytesToHash(l1InfoTreeUpdate.CurrentL1InfoRoot[:]).String())
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand Down
231 changes: 183 additions & 48 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,124 @@ func TestE2E(t *testing.T) {
}
}

func TestWithReorgs(t *testing.T) {
ctx := context.Background()
dbPathSyncer := path.Join(t.TempDir(), "file::memory:?cache=shared")
dbPathReorg := t.TempDir()
privateKey, err := crypto.GenerateKey()
require.NoError(t, err)
auth, err := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(1337))
require.NoError(t, err)
client, gerAddr, verifyAddr, gerSc, verifySC, err := newSimulatedClient(auth)
require.NoError(t, err)
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 25)
require.NoError(t, err)
go syncer.Start(ctx)

// Commit block
header, err := client.Client().HeaderByHash(ctx, client.Commit()) // Block 3
require.NoError(t, err)
reorgFrom := header.Hash()
fmt.Println("start from header:", header.Number)

updateL1InfoTreeAndRollupExitTree := func(i int, rollupID uint32) {
// Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)

// Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(1))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)

// Update Rollup Exit Tree
newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(2))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false)
require.NoError(t, err)
}

// create some events and update the trees
updateL1InfoTreeAndRollupExitTree(1, 1)

// Block 4
commitBlocks(t, client, 1, time.Second*5)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root
expectedRollupExitRoot, err := verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualRollupExitRoot, err := syncer.GetLastRollupExitRoot(ctx)
require.NoError(t, err)
t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)

// Assert L1 Info tree root
expectedL1InfoRoot, err := gerSc.GetRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
expectedGER, err := gerSc.GetLastGlobalExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualL1InfoRoot, err := syncer.GetLastL1InfoTreeRoot(ctx)
require.NoError(t, err)
info, err := syncer.GetInfoByIndex(ctx, actualL1InfoRoot.Index)
require.NoError(t, err)

require.Equal(t, common.Hash(expectedL1InfoRoot), actualL1InfoRoot.Hash)
require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info))

// Forking from block 3
err = client.Fork(reorgFrom)
require.NoError(t, err)

// Block 4, 5, 6 after the fork
commitBlocks(t, client, 3, time.Millisecond*500)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork
expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualRollupExitRoot, err = syncer.GetLastRollupExitRoot(ctx)
require.ErrorContains(t, err, "not found") // rollup exit tree reorged, it does not have any exits in it
t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)

// Forking from block 3 again
err = client.Fork(reorgFrom)
require.NoError(t, err)
time.Sleep(time.Millisecond * 500)

// create some events and update the trees
updateL1InfoTreeAndRollupExitTree(2, 1)

// Block 4, 5, 6, 7 after the fork
commitBlocks(t, client, 4, time.Millisecond*100)

// Make sure syncer is up to date
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root after the fork - should be zero since there are no events in the block after the fork
expectedRollupExitRoot, err = verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
require.NoError(t, err)
actualRollupExitRoot, err = syncer.GetLastRollupExitRoot(ctx)
require.NoError(t, err)
t.Log("exit roots", common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
require.Equal(t, common.Hash(expectedRollupExitRoot), actualRollupExitRoot.Hash)
}

func TestStressAndReorgs(t *testing.T) {
const (
totalIterations = 200 // Have tested with much larger number (+10k)
enableReorgs = false // test fails when set to true
reorgEveryXIterations = 53
maxReorgDepth = 5
maxEventsPerBlock = 7
maxRollups = 31
totalIterations = 3
blocksInIteration = 140
reorgEveryXIterations = 70
reorgSizeInBlocks = 2
maxRollupID = 31
extraBlocksToMine = 10
)

ctx := context.Background()
Expand All @@ -182,58 +292,50 @@ func TestStressAndReorgs(t *testing.T) {
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 100)
require.NoError(t, err)
go syncer.Start(ctx)

for i := 0; i < totalIterations; i++ {
for j := 0; j < i%maxEventsPerBlock; j++ {
switch j % 3 {
case 0: // Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)
case 1: // Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)
case 2: // Update Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, false)
updateL1InfoTreeAndRollupExitTree := func(i, j int, rollupID uint32) {
// Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)

// Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)

// Update Rollup Exit Tree
newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "fffa" + strconv.Itoa(j))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false)
require.NoError(t, err)
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
}

for i := 1; i <= totalIterations; i++ {
for j := 1; j <= blocksInIteration; j++ {
commitBlocks(t, client, 1, time.Millisecond*10)

if j%reorgEveryXIterations == 0 {
currentBlockNum, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
}
}
client.Commit()
time.Sleep(time.Microsecond * 30) // Sleep just enough for goroutine to switch
if enableReorgs && i%reorgEveryXIterations == 0 {
reorgDepth := i%maxReorgDepth + 1
currentBlockNum, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
targetReorgBlockNum := currentBlockNum - uint64(reorgDepth)
if targetReorgBlockNum < currentBlockNum { // we are dealing with uints...
reorgBlock, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(targetReorgBlockNum)))

block, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(currentBlockNum-reorgSizeInBlocks)))
require.NoError(t, err)
err = client.Fork(reorgBlock.Hash())
reorgFrom := block.Hash()
err = client.Fork(reorgFrom)
require.NoError(t, err)

fmt.Println("reorg from block:", block.Number().Uint64()+1)
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
} else {
updateL1InfoTreeAndRollupExitTree(i, j, uint32(j%maxRollupID)+1)
}
}
}

syncerUpToDate := false
var errMsg string
lb, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
for i := 0; i < 50; i++ {
lpb, err := syncer.GetLastProcessedBlock(ctx)
require.NoError(t, err)
if lpb == lb {
syncerUpToDate = true
commitBlocks(t, client, 1, time.Millisecond*10)

break
}
time.Sleep(time.Millisecond * 100)
errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb, lpb)
}
require.True(t, syncerUpToDate, errMsg)
waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root
expectedRollupExitRoot, err := verifySC.GetRollupExitRoot(&bind.CallOpts{Pending: false})
Expand All @@ -252,6 +354,39 @@ func TestStressAndReorgs(t *testing.T) {
info, err := syncer.GetInfoByIndex(ctx, lastRoot.Index)
require.NoError(t, err, fmt.Sprintf("index: %d", lastRoot.Index))

require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash)
t.Logf("expectedL1InfoRoot: %s", common.Hash(expectedL1InfoRoot).String())
require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info))
require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash)
}

func waitForSyncerToCatchUp(ctx context.Context, t *testing.T, syncer *l1infotreesync.L1InfoTreeSync, client *simulated.Backend) {
t.Helper()

syncerUpToDate := false
var errMsg string

for i := 0; i < 200; i++ {
lpb, err := syncer.GetLastProcessedBlock(ctx)
require.NoError(t, err)
lb, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
if lpb == lb {
syncerUpToDate = true
break
}
time.Sleep(time.Second / 2)
errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb, lpb)
}

require.True(t, syncerUpToDate, errMsg)
}

// commitBlocks commits the specified number of blocks with the given client and waits for the specified duration after each block
func commitBlocks(t *testing.T, client *simulated.Backend, numBlocks int, waitDuration time.Duration) {
t.Helper()

for i := 0; i < numBlocks; i++ {
client.Commit()
time.Sleep(waitDuration)
}
}
7 changes: 6 additions & 1 deletion l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
return err
}

_, err = tx.Exec(`DELETE FROM l1info_leaf WHERE block_num >= $1;`, firstReorgedBlock)
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

if err = p.l1InfoTree.Reorg(tx, firstReorgedBlock); err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +245,7 @@ func (p *processor) ProcessBlock(ctx context.Context, b sync.Block) error {
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil {
if _, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil {
return fmt.Errorf("err: %w", err)
}

Expand Down
6 changes: 5 additions & 1 deletion lastgersync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC
break
}

blockHeader := d.GetBlockHeader(ctx, lastBlock)
blockHeader, isCanceled := d.GetBlockHeader(ctx, lastBlock)
if isCanceled {
return
}

block := &sync.EVMBlock{
EVMBlockHeader: sync.EVMBlockHeader{
Num: blockHeader.Num,
Expand Down
20 changes: 15 additions & 5 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,20 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
errGroup errgroup.Group
)

rd.trackedBlocksLock.Lock()
defer rd.trackedBlocksLock.Unlock()
subscriberIDs := rd.getSubscriberIDs()

for id, hdrs := range rd.trackedBlocks {
for _, id := range subscriberIDs {
id := id
hdrs := hdrs

// This is done like this because of a possible deadlock
// between AddBlocksToTrack and detectReorgInTrackedList
rd.trackedBlocksLock.RLock()
hdrs, ok := rd.trackedBlocks[id]
rd.trackedBlocksLock.RUnlock()

if !ok {
continue
}

errGroup.Go(func() error {
headers := hdrs.getSorted()
Expand All @@ -136,7 +144,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
if !ok || currentHeader == nil {
if currentHeader, err = rd.client.HeaderByNumber(ctx, new(big.Int).SetUint64(hdr.Num)); err != nil {
headersCacheLock.Unlock()
return fmt.Errorf("failed to get the header: %w", err)
return fmt.Errorf("failed to get the header %d: %w", hdr.Num, err)
}
headersCache[hdr.Num] = currentHeader
}
Expand All @@ -153,6 +161,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
continue
}

log.Info("[ReorgDetector] Reorg detected", " blockNum ", hdr.Num)
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved

// Notify the subscriber about the reorg
rd.notifySubscriber(id, hdr)

Expand Down
5 changes: 4 additions & 1 deletion reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,17 @@ func (rd *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]*head

// saveTrackedBlock saves the tracked block for a subscriber in db and in memory
func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b header) error {
rd.trackedBlocksLock.Lock()

// this has to go after the lock, because of a possible deadlock
// between AddBlocksToTrack and detectReorgInTrackedList
tx, err := rd.db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()

rd.trackedBlocksLock.Lock()
hdrs, ok := rd.trackedBlocks[id]
if !ok || hdrs.isEmpty() {
hdrs = newHeadersList(b)
Expand Down
19 changes: 17 additions & 2 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,24 @@ func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) {
func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
// Notify subscriber about this particular reorg
rd.subscriptionsLock.RLock()
if sub, ok := rd.subscriptions[id]; ok {
sub, ok := rd.subscriptions[id]
rd.subscriptionsLock.RUnlock()

if ok {
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
}
rd.subscriptionsLock.RUnlock()
}

// getSubscriberIDs returns a list of subscriber IDs
func (rd *ReorgDetector) getSubscriberIDs() []string {
rd.subscriptionsLock.RLock()
defer rd.subscriptionsLock.RUnlock()

ids := make([]string, 0, len(rd.subscriptions))
for id := range rd.subscriptions {
ids = append(ids, id)
}

return ids
}
Loading
Loading