Skip to content

Commit

Permalink
feat: more logs for l1infotreesyncer (#262)
Browse files Browse the repository at this point in the history
* feat: add new db migartion

* feat: save block hash to block table and add more logs

* feat: fix BuildVersions during cargo build (#258)

* fix: increase log level

* fix: processor logger

* fix: e2e tests

* fix: remove unnecessary environment variables from e2e workflow

* fix: update reference to Kurtosis CDK version 0.2.25 in e2e workflow

* fix: refactor e2e workflow (1st attempt)

---------

Co-authored-by: Toni Ramírez <[email protected]>
Co-authored-by: Stefan Negovanović <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent a5b8984 commit 4c9af9c
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ jobs:
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
path: "kurtosis-cdk"
ref: "v0.2.19"
path: kurtosis-cdk
ref: v0.2.25

- name: Setup Bats and bats libs
uses: bats-core/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion bridgesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestBridgeEventE2E(t *testing.T) {
dbPathReorg := path.Join(t.TempDir(), "file::memory:?cache=shared")

client, setup := helpers.SimulatedBackend(t, nil, 0)
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg}, reorgdetector.L1)
require.NoError(t, err)

go rd.Start(ctx) //nolint:errcheck
Expand Down
7 changes: 4 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ func newState(c *config.Config, l2ChainID uint64, sqlDB *pgxpool.Pool) *state.St
func newReorgDetector(
cfg *reorgdetector.Config,
client *ethclient.Client,
network reorgdetector.Network,
) *reorgdetector.ReorgDetector {
rd, err := reorgdetector.New(client, *cfg)
rd, err := reorgdetector.New(client, *cfg, network)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -600,7 +601,7 @@ func runReorgDetectorL1IfNeeded(
components) {
return nil, nil
}
rd := newReorgDetector(cfg, l1Client)
rd := newReorgDetector(cfg, l1Client, reorgdetector.L1)

errChan := make(chan error)
go func() {
Expand All @@ -622,7 +623,7 @@ func runReorgDetectorL2IfNeeded(
if !isNeeded([]string{cdkcommon.AGGORACLE, cdkcommon.RPC, cdkcommon.AGGSENDER}, components) {
return nil, nil
}
rd := newReorgDetector(cfg, l2Client)
rd := newReorgDetector(cfg, l2Client, reorgdetector.L2)

errChan := make(chan error)
go func() {
Expand Down
56 changes: 34 additions & 22 deletions crates/cdk/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use regex::Regex;
use reqwest::blocking::get;
use std::env;
use std::fs::File;
use std::io::Write;
use std::io::{self, Write};
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use serde_json::Value;

fn main() {
let _ = build_versions();
Expand Down Expand Up @@ -55,45 +56,56 @@ fn main() {
}

// build_versions retrieves the versions from the Starlark file and embeds them in the binary.
fn build_versions() -> std::io::Result<()> {
// Retrieve the contents of the file from the URL
fn build_versions() -> io::Result<()> {
// URL of the Starlark file
let url = "https://raw.githubusercontent.com/0xPolygon/kurtosis-cdk/refs/heads/main/input_parser.star";

// Download the file content
let response = get(url).expect("Failed to send request");
let content = response.text().expect("Failed to read response text");

// Write the contents to a file
let out_dir = std::env::var("OUT_DIR").unwrap();
let dest_path = Path::new(&out_dir).join("input_parser.star");
let mut file = File::create(&dest_path)?;
file.write_all(content.as_bytes())?;

// Get the corresponding lines from the contents of the starlark file
let versions = content
// Extract the relevant lines (skip the first 30 lines, take the next 15)
let raw_versions = content
.lines()
.skip(30)
.take(15)
.collect::<Vec<&str>>()
.join("\n");

// Replace the string DEFAULT_IMAGES = from the versions string
let versions = versions.replace("DEFAULT_IMAGES = ", "");
// Remove the declaration `DEFAULT_IMAGES = `
let raw_versions = raw_versions.replace("DEFAULT_IMAGES = ", "");

// Clean up the content by removing comments and unnecessary spaces
let re_comments = Regex::new(r"#.*$").unwrap(); // Regex to remove comments
let re_trailing_commas = Regex::new(r",(\s*})").unwrap(); // Regex to fix trailing commas

let cleaned_versions = raw_versions
.lines()
.map(|line| re_comments.replace_all(line, "").trim().to_string()) // Remove comments and trim spaces
.filter(|line| !line.is_empty()) // Filter out empty lines
.collect::<Vec<_>>()
.join("\n");

// Remove all comments to the end of the line using a regexp
let re = Regex::new(r"\s#\s.*\n").unwrap();
let versions = re.replace_all(&versions, "");
// Replace the trailing comma on the last line
let versions = versions.replace(", }", " }");
// Fix improperly placed trailing commas
let cleaned_versions = re_trailing_commas.replace_all(&cleaned_versions, "$1");

// The versions string is a JSON object we can parse
let versions_json: serde_json::Value = serde_json::from_str(&versions).unwrap();
// Attempt to parse the cleaned content as JSON
let versions_json: Value = match serde_json::from_str(&cleaned_versions) {
Ok(json) => json,
Err(e) => {
eprintln!("Failed to parse JSON: {}", e); // Print the error
eprintln!("Input string was: {}", cleaned_versions); // Print the input causing the error
return Err(io::Error::new(io::ErrorKind::InvalidData, "JSON parsing failed"));
}
};

// Write the versions to a file
// Define the output file path for the JSON
let dest_path = Path::new(".").join("versions.json");
let mut file = File::create(&dest_path)?;
file.write_all(
format!(
"{}\n",
serde_json::to_string_pretty(&versions_json).unwrap()
serde_json::to_string_pretty(&versions_json).unwrap() // Pretty-print JSON to the file
)
.as_bytes(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestWithReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down Expand Up @@ -278,7 +278,7 @@ func TestStressAndReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down
5 changes: 5 additions & 0 deletions l1infotreesync/migrations/l1infotreesync0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +migrate Down
ALTER TABLE block DROP COLUMN hash;

-- +migrate Up
ALTER TABLE block ADD COLUMN hash VARCHAR;
7 changes: 7 additions & 0 deletions l1infotreesync/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var mig001 string
//go:embed l1infotreesync0002.sql
var mig002 string

//go:embed l1infotreesync0003.sql
var mig003 string

func RunMigrations(dbPath string) error {
migrations := []types.Migration{
{
Expand All @@ -29,6 +32,10 @@ func RunMigrations(dbPath string) error {
ID: "l1infotreesync0002",
SQL: mig002,
},
{
ID: "l1infotreesync0003",
SQL: mig003,
},
}
for _, tm := range treeMigrations.Migrations {
migrations = append(migrations, types.Migration{
Expand Down
38 changes: 24 additions & 14 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type processor struct {
rollupExitTree *tree.UpdatableTree
halted bool
haltedReason string
log *log.Logger
}

// UpdateL1InfoTree representation of the UpdateL1InfoTree event
Expand Down Expand Up @@ -149,6 +150,7 @@ func newProcessor(dbPath string) (*processor, error) {
db: db,
l1InfoTree: tree.NewAppendOnlyTree(db, migrations.L1InfoTreePrefix),
rollupExitTree: tree.NewUpdatableTree(db, migrations.RollupExitTreePrefix),
log: log.WithFields("processor", "l1infotreesync"),
}, nil
}

Expand Down Expand Up @@ -176,7 +178,7 @@ func (p *processor) GetLatestInfoUntilBlock(ctx context.Context, blockNum uint64
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Warnf("error rolling back tx: %v", err)
p.log.Warnf("error rolling back tx: %v", err)
}
}()

Expand Down Expand Up @@ -233,6 +235,8 @@ func (p *processor) getLastProcessedBlockWithTx(tx db.Querier) (uint64, error) {
// Reorg triggers a purge and reset process on the processor to leaf it on a state
// as if the last block processed was firstReorgedBlock-1
func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
p.log.Infof("reorging to block %d", firstReorgedBlock)

tx, err := db.NewTx(ctx, p.db)
if err != nil {
return err
Expand Down Expand Up @@ -266,6 +270,9 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
if err := tx.Commit(); err != nil {
return err
}

p.log.Infof("reorged to block %d, %d rows affected", firstReorgedBlock, rowsAffected)

if rowsAffected > 0 {
p.halted = false
p.haltedReason = ""
Expand All @@ -278,7 +285,7 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
// and updates the last processed block (can be called without events for that purpose)
func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if p.halted {
log.Errorf("processor is halted due to: %s", p.haltedReason)
p.log.Errorf("processor is halted due to: %s", p.haltedReason)
return sync.ErrInconsistentState
}
tx, err := db.NewTx(ctx, p.db)
Expand All @@ -289,12 +296,12 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
defer func() {
if shouldRollback {
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
p.log.Errorf("error while rolling back tx %v", errRllbck)
}
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, block.Num); err != nil {
if _, err := tx.Exec(`INSERT INTO block (num, hash) VALUES ($1, $2)`, block.Num, block.Hash.String()); err != nil {
return fmt.Errorf("insert Block. err: %w", err)
}

Expand Down Expand Up @@ -340,10 +347,13 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if err != nil {
return fmt.Errorf("AddLeaf(%s). err: %w", info.String(), err)
}
log.Infof("inserted L1InfoTreeLeaf %s", info.String())
p.log.Infof("inserted L1InfoTreeLeaf %s", info.String())
l1InfoLeavesAdded++
}
if event.UpdateL1InfoTreeV2 != nil {
p.log.Infof("handle UpdateL1InfoTreeV2 event. Block: %d, block hash: %s. Event root: %s. Event leaf count: %d.",
block.Num, block.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(), event.UpdateL1InfoTreeV2.LeafCount)

root, err := p.l1InfoTree.GetLastRoot(tx)
if err != nil {
return fmt.Errorf("GetLastRoot(). err: %w", err)
Expand All @@ -355,33 +365,33 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if root.Hash != event.UpdateL1InfoTreeV2.CurrentL1InfoRoot || root.Index+1 != event.UpdateL1InfoTreeV2.LeafCount {
errStr := fmt.Sprintf(
"failed to check UpdateL1InfoTreeV2. Root: %s vs event:%s. "+
"Index: : %d vs event.LeafCount:%d. Happened on block %d",
root.Hash, common.Bytes2Hex(event.UpdateL1InfoTreeV2.CurrentL1InfoRoot[:]),
"Index: %d vs event.LeafCount: %d. Happened on block %d. Block hash: %s.",
root.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(),
root.Index, event.UpdateL1InfoTreeV2.LeafCount,
block.Num,
block.Num, block.Hash.String(),
)
log.Error(errStr)
p.log.Error(errStr)
p.haltedReason = errStr
p.halted = true
return sync.ErrInconsistentState
}
}
if event.VerifyBatches != nil {
log.Debugf("handle VerifyBatches event %s", event.VerifyBatches.String())
p.log.Debugf("handle VerifyBatches event %s", event.VerifyBatches.String())
err = p.processVerifyBatches(tx, block.Num, event.VerifyBatches)
if err != nil {
err = fmt.Errorf("processVerifyBatches. err: %w", err)
log.Errorf("error processing VerifyBatches: %v", err)
p.log.Errorf("error processing VerifyBatches: %v", err)
return err
}
}

if event.InitL1InfoRootMap != nil {
log.Debugf("handle InitL1InfoRootMap event %s", event.InitL1InfoRootMap.String())
p.log.Debugf("handle InitL1InfoRootMap event %s", event.InitL1InfoRootMap.String())
err = processEventInitL1InfoRootMap(tx, block.Num, event.InitL1InfoRootMap)
if err != nil {
err = fmt.Errorf("initL1InfoRootMap. Err: %w", err)
log.Errorf("error processing InitL1InfoRootMap: %v", err)
p.log.Errorf("error processing InitL1InfoRootMap: %v", err)
return err
}
}
Expand All @@ -392,7 +402,7 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
}
shouldRollback = false

log.Infof("block %d processed with %d events", block.Num, len(block.Events))
p.log.Infof("block %d processed with %d events", block.Num, len(block.Events))
return nil
}

Expand Down
18 changes: 17 additions & 1 deletion reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ import (
"golang.org/x/sync/errgroup"
)

type Network string

const (
L1 Network = "l1"
L2 Network = "l2"
)

func (n Network) String() string {
return string(n)
}

type EthClient interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
Expand All @@ -34,9 +45,11 @@ type ReorgDetector struct {

subscriptionsLock sync.RWMutex
subscriptions map[string]*Subscription

log *log.Logger
}

func New(client EthClient, cfg Config) (*ReorgDetector, error) {
func New(client EthClient, cfg Config, network Network) (*ReorgDetector, error) {
err := migrations.RunMigrations(cfg.DBPath)
if err != nil {
return nil, err
Expand All @@ -52,6 +65,7 @@ func New(client EthClient, cfg Config) (*ReorgDetector, error) {
checkReorgInterval: cfg.GetCheckReorgsInterval(),
trackedBlocks: make(map[string]*headersList),
subscriptions: make(map[string]*Subscription),
log: log.WithFields("reorg-detector", network.String()),
}, nil
}

Expand Down Expand Up @@ -122,6 +136,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
errGroup errgroup.Group
)

rd.log.Infof("Checking reorgs in tracked blocks up to block %d", lastFinalisedBlock.Number.Uint64())

subscriberIDs := rd.getSubscriberIDs()

for _, id := range subscriberIDs {
Expand Down
3 changes: 3 additions & 0 deletions reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error {
hdrs.add(b)
}
rd.trackedBlocksLock.Unlock()

rd.log.Debugf("Tracking block %d for subscriber %s", b.Num, id)

return meddler.Insert(rd.db, "tracked_block", &headerWithSubscriberID{
SubscriberID: id,
Num: b.Num,
Expand Down
2 changes: 2 additions & 0 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
sub, ok := rd.subscriptions[id]
rd.subscriptionsLock.RUnlock()

rd.log.Infof("Reorg detected for subscriber %s at block %d", id, startingBlock.Num)

if ok {
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
Expand Down
Loading

0 comments on commit 4c9af9c

Please sign in to comment.