Skip to content

Commit

Permalink
fix: stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Sep 19, 2024
1 parent ed525f0 commit d0df831
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 104 deletions.
2 changes: 1 addition & 1 deletion l1infotreesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func buildAppender(client EthClienter, globalExitRoot, rollupManager common.Addr
l, err,
)
}
log.Infof("updateL1InfoTreeSignatureV2: expected root: %s", common.Bytes2Hex(l1InfoTreeUpdate.CurrentL1InfoRoot[:]))
log.Infof("updateL1InfoTreeSignatureV2: expected root: %s", common.BytesToHash(l1InfoTreeUpdate.CurrentL1InfoRoot[:]).String())

return nil
}
Expand Down
81 changes: 40 additions & 41 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestWithReorgs(t *testing.T) {
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 5)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 25)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down Expand Up @@ -272,12 +272,12 @@ func TestWithReorgs(t *testing.T) {

func TestStressAndReorgs(t *testing.T) {
const (
totalIterations = 200 // Have tested with much larger number (+10k)
enableReorgs = true // test fails when set to true
reorgEveryXIterations = 53
maxReorgDepth = 12
maxEventsPerBlock = 7
maxRollups = 31
totalIterations = 2 // Have tested with much larger number (+10k)
blocksInIteration = 140
reorgEveryXIterations = 70
reorgSizeInBlocks = 2
maxRollupID = 31
extraBlocksToMine = 10
)

ctx := context.Background()
Expand All @@ -292,51 +292,49 @@ func TestStressAndReorgs(t *testing.T) {
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 5)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, time.Second, 100)
require.NoError(t, err)
go syncer.Start(ctx)

var extraBlocksToMine int
for i := 0; i < totalIterations; i++ {
for j := 0; j < i%maxEventsPerBlock; j++ {
switch j % 3 {
case 0: // Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)
case 1: // Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)
case 2: // Update Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err := verifySC.VerifyBatches(auth, 1+uint32(i%maxRollups), 0, newLocalExitRoot, common.Hash{}, false)
require.NoError(t, err)
}
}
updateL1InfoTreeAndRollupExitTree := func(i, j int, rollupID uint32) {
// Update L1 Info Tree
_, err := gerSc.UpdateExitRoot(auth, common.HexToHash(strconv.Itoa(i)))
require.NoError(t, err)

time.Sleep(time.Microsecond * 30) // Sleep just enough for goroutine to switch
// Update L1 Info Tree + Rollup Exit Tree
newLocalExitRoot := common.HexToHash(strconv.Itoa(i) + "ffff" + strconv.Itoa(j))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, true)
require.NoError(t, err)

if enableReorgs && i > 0 && i%reorgEveryXIterations == 0 {
reorgDepth := i%maxReorgDepth + 1
extraBlocksToMine += reorgDepth + 1
currentBlockNum, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
// Update Rollup Exit Tree
newLocalExitRoot = common.HexToHash(strconv.Itoa(i) + "fffa" + strconv.Itoa(j))
_, err = verifySC.VerifyBatches(auth, rollupID, 0, newLocalExitRoot, common.Hash{}, false)
require.NoError(t, err)
}

targetReorgBlockNum := currentBlockNum
if uint64(reorgDepth) <= currentBlockNum {
targetReorgBlockNum -= uint64(reorgDepth)
}
for i := 1; i <= totalIterations; i++ {
for j := 1; j <= blocksInIteration; j++ {
commitBlocks(t, client, 1, time.Millisecond*10)

if targetReorgBlockNum < currentBlockNum { // we are dealing with uints...
reorgBlock, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(targetReorgBlockNum)))
if j%reorgEveryXIterations == 0 {
currentBlockNum, err := client.Client().BlockNumber(ctx)
require.NoError(t, err)
err = client.Fork(reorgBlock.Hash())

block, err := client.Client().BlockByNumber(ctx, big.NewInt(int64(currentBlockNum-reorgSizeInBlocks)))
require.NoError(t, err)
reorgFrom := block.Hash()
err = client.Fork(reorgFrom)
require.NoError(t, err)

fmt.Println("reorg from block:", block.Number().Uint64()+1)
} else {
updateL1InfoTreeAndRollupExitTree(i, j, uint32(j%maxRollupID)+1)
}
}
}

commitBlocks(t, client, extraBlocksToMine, time.Millisecond*100)
commitBlocks(t, client, 1, time.Millisecond*10)

waitForSyncerToCatchUp(ctx, t, syncer, client)

// Assert rollup exit root
Expand All @@ -356,8 +354,9 @@ func TestStressAndReorgs(t *testing.T) {
info, err := syncer.GetInfoByIndex(ctx, lastRoot.Index)
require.NoError(t, err, fmt.Sprintf("index: %d", lastRoot.Index))

require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash)
t.Logf("lastRoot: %+v", lastRoot)
require.Equal(t, common.Hash(expectedGER), info.GlobalExitRoot, fmt.Sprintf("%+v", info))
require.Equal(t, common.Hash(expectedL1InfoRoot), lastRoot.Hash)
}

func waitForSyncerToCatchUp(ctx context.Context, t *testing.T, syncer *l1infotreesync.L1InfoTreeSync, client *simulated.Backend) {
Expand All @@ -366,7 +365,7 @@ func waitForSyncerToCatchUp(ctx context.Context, t *testing.T, syncer *l1infotre
syncerUpToDate := false
var errMsg string

for i := 0; i < 50; i++ {
for i := 0; i < 200; i++ {
lpb, err := syncer.GetLastProcessedBlock(ctx)
require.NoError(t, err)
lb, err := client.Client().BlockNumber(ctx)
Expand Down
7 changes: 6 additions & 1 deletion l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
return err
}

_, err = tx.Exec(`DELETE FROM l1info_leaf WHERE block_num >= $1;`, firstReorgedBlock)
if err != nil {
return err
}

if err = p.l1InfoTree.Reorg(tx, firstReorgedBlock); err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +245,7 @@ func (p *processor) ProcessBlock(ctx context.Context, b sync.Block) error {
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil {
if _, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, b.Num); err != nil {
return fmt.Errorf("err: %w", err)
}

Expand Down
6 changes: 5 additions & 1 deletion lastgersync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC
break
}

blockHeader := d.GetBlockHeader(ctx, lastBlock)
blockHeader, isCanceled := d.GetBlockHeader(ctx, lastBlock)
if isCanceled {
return
}

block := &sync.EVMBlock{
EVMBlockHeader: sync.EVMBlockHeader{
Num: blockHeader.Num,
Expand Down
19 changes: 15 additions & 4 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,21 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
errGroup errgroup.Group
)

rd.trackedBlocksLock.Lock()
defer rd.trackedBlocksLock.Unlock()
subscriberIDs := rd.getSubscriberIDs()

for id, hdrs := range rd.trackedBlocks {
for _, id := range subscriberIDs {
id := id
hdrs := hdrs

// This is done like this because of a possible deadlock between AddBlocksToTrack and detectReorgInTrackedList
// because detectReorgInTrackedList would take the trackedBlocksLock and try to notify the subscriber in case of a reorg
// but the subscriber would be trying to add a block to track and save it to trackedBlocks, resulting in a deadlock
rd.trackedBlocksLock.RLock()
hdrs, ok := rd.trackedBlocks[id]
rd.trackedBlocksLock.RUnlock()

if !ok {
continue
}

errGroup.Go(func() error {
headers := hdrs.getSorted()
Expand Down Expand Up @@ -153,6 +162,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
continue
}

log.Info("[ReorgDetector] Reorg detected", "blockNum", hdr.Num)

// Notify the subscriber about the reorg
rd.notifySubscriber(id, hdr)

Expand Down
6 changes: 5 additions & 1 deletion reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ func (rd *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]*head

// saveTrackedBlock saves the tracked block for a subscriber in db and in memory
func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b header) error {
rd.trackedBlocksLock.Lock()

// this has to go after the lock, because of a possible deadlock between AddBlocksToTrack and detectReorgInTrackedList
// because AddBlocksToTrack would start a transaction on db, but detectReorgInTrackedList would lock the trackedBlocksLock
// and then try to start a transaction on db, resulting in a deadlock
tx, err := rd.db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()

rd.trackedBlocksLock.Lock()
hdrs, ok := rd.trackedBlocks[id]
if !ok || hdrs.isEmpty() {
hdrs = newHeadersList(b)
Expand Down
19 changes: 17 additions & 2 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,24 @@ func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) {
func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
// Notify subscriber about this particular reorg
rd.subscriptionsLock.RLock()
if sub, ok := rd.subscriptions[id]; ok {
sub, ok := rd.subscriptions[id]
rd.subscriptionsLock.RUnlock()

if ok {
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
}
rd.subscriptionsLock.RUnlock()
}

// getSubscriberIDs returns a list of subscriber IDs
func (rd *ReorgDetector) getSubscriberIDs() []string {
rd.subscriptionsLock.RLock()
defer rd.subscriptionsLock.RUnlock()

ids := make([]string, 0, len(rd.subscriptions))
for id := range rd.subscriptions {
ids = append(ids, id)
}

return ids
}
Loading

0 comments on commit d0df831

Please sign in to comment.