From e95a77f5147f01fe89dbed4f860d519d6bc7335b Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig Date: Wed, 13 Apr 2022 12:23:34 +0200 Subject: [PATCH 1/2] feat(sync): increased L2 poll interval when at head --- crates/pathfinder/src/state/sync.rs | 22 +++++----- crates/pathfinder/src/state/sync/l2.rs | 56 +++++++++++++++++--------- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 1c15e099d5..a1872a3589 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -2,7 +2,6 @@ mod l1; mod l2; use std::sync::Arc; -use std::time::Duration; use crate::{ core::{ @@ -75,11 +74,12 @@ pub async fn sync( Arc::clone(&state), sequencer.clone(), starting_block, + chain, )); // Start L1 and L2 sync processes. let mut l1_handle = tokio::spawn(l1::sync(tx_l1, transport.clone(), chain, l1_head)); - let mut l2_handle = tokio::spawn(l2::sync(tx_l2, sequencer.clone(), l2_head)); + let mut l2_handle = tokio::spawn(l2::sync(tx_l2, sequencer.clone(), l2_head, chain)); let mut existed = (0, 0); @@ -285,7 +285,7 @@ pub async fn sync( let (new_tx, new_rx) = mpsc::channel(1); rx_l2 = new_rx; - l2_handle = tokio::spawn(l2::sync(new_tx, sequencer.clone(), l2_head)); + l2_handle = tokio::spawn(l2::sync(new_tx, sequencer.clone(), l2_head, chain)); tracing::info!("L2 sync process restarted."); } } @@ -298,8 +298,11 @@ async fn update_sync_status_latest( state: Arc, sequencer: sequencer::Client, starting_block: StarknetBlockHash, + chain: Chain, ) -> anyhow::Result<()> { use crate::rpc::types::{BlockNumberOrTag, Tag}; + let poll_interval = l2::head_poll_interval(chain); + loop { // Work-around the sequencer block fetch being flakey. let latest = loop { @@ -322,23 +325,24 @@ async fn update_sync_status_latest( }); tracing::debug!( - "Updated sync status with latest block hash: {}", - latest.0.to_hex_str() + starting=%starting_block.0, + current=%starting_block.0, + highest=%latest.0, + "Updated sync status", ); } SyncStatus::Status(status) => { if status.highest_block != latest { status.highest_block = latest; tracing::debug!( - "Updated sync status with latest block hash: {}", - latest.0.to_hex_str() + highest=%latest.0, + "Updated sync status", ); } } } - // Update once every 10 seconds at most. - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(poll_interval).await; } } diff --git a/crates/pathfinder/src/state/sync/l2.rs b/crates/pathfinder/src/state/sync/l2.rs index 0b8594ab9c..183899056b 100644 --- a/crates/pathfinder/src/state/sync/l2.rs +++ b/crates/pathfinder/src/state/sync/l2.rs @@ -1,23 +1,18 @@ -use std::{collections::HashSet, time::Duration}; +use std::collections::HashSet; +use std::time::Duration; use anyhow::Context; - use tokio::sync::{mpsc, oneshot}; -use crate::{ - core::{ContractHash, StarknetBlockHash, StarknetBlockNumber}, - ethereum::state_update::{ContractUpdate, DeployedContract, StateUpdate, StorageUpdate}, - rpc::types::{BlockNumberOrTag, Tag}, - sequencer::{ - self, - error::SequencerError, - reply::{ - state_update::{Contract, StateDiff}, - Block, - }, - }, - state::{contract_hash::extract_abi_code_hash, CompressedContract}, -}; +use crate::core::{ContractHash, StarknetBlockHash, StarknetBlockNumber}; +use crate::ethereum::state_update::{ContractUpdate, DeployedContract, StateUpdate, StorageUpdate}; +use crate::rpc::types::{BlockNumberOrTag, Tag}; +use crate::sequencer::error::SequencerError; +use crate::sequencer::reply::state_update::{Contract, StateDiff}; +use crate::sequencer::reply::Block; +use crate::sequencer::{self}; +use crate::state::contract_hash::extract_abi_code_hash; +use crate::state::CompressedContract; #[derive(Debug, Clone, Copy)] pub struct Timings { @@ -56,6 +51,7 @@ pub async fn sync( tx_event: mpsc::Sender, sequencer: sequencer::Client, mut head: Option<(StarknetBlockNumber, StarknetBlockHash)>, + chain: crate::ethereum::Chain, ) -> anyhow::Result<()> { 'outer: loop { // Get the next block from L2. @@ -68,7 +64,11 @@ pub async fn sync( let block = loop { match download_block(next, &sequencer).await? { DownloadBlock::Block(block) => break block, - DownloadBlock::AtHead => tokio::time::sleep(Duration::from_secs(5)).await, + DownloadBlock::AtHead => { + let poll_interval = head_poll_interval(chain); + tracing::info!(poll_interval=?poll_interval, "At head of chain"); + tokio::time::sleep(poll_interval).await; + } DownloadBlock::Reorg => { let some_head = head.unwrap(); head = reorg(some_head, &tx_event, &sequencer) @@ -176,8 +176,8 @@ async fn download_block( Ok(block) => Ok(DownloadBlock::Block(block)), Err(SequencerError::StarknetError(err)) if err.code == BlockNotFound => { // This would occur if we queried past the head of the chain. We now need to check that - // a reorg hasn't put us too far in the future. This does run into race conditions with the - // sequencer but this is the best we can do I think. + // a reorg hasn't put us too far in the future. This does run into race conditions with + // the sequencer but this is the best we can do I think. let latest = sequencer .block_by_number(BlockNumberOrTag::Tag(Tag::Latest)) .await @@ -225,7 +225,7 @@ async fn reorg( .with_context(|| format!("Download block {} from sequencer", previous_block_number.0))? { DownloadBlock::Block(block) if block.block_hash.unwrap() == previous_hash => { - break Some((previous_block_number, previous_hash)) + break Some((previous_block_number, previous_hash)); } _ => {} }; @@ -358,3 +358,19 @@ async fn download_and_compress_contract( hash, }) } + +/// Returns the interval to be used when polling the sequencer while at the head of the chain. The +/// interval is chosen to provide a good balance between spamming the sequencer and getting new +/// block information as it is available. +/// +/// The interval is based on the block creation time, which is 2 minutes for Goerlie and 2 hours for +/// Mainnet. +pub fn head_poll_interval(chain: crate::ethereum::Chain) -> Duration { + use crate::ethereum::Chain::*; + match chain { + // 15 minute interval for a 2 hour block time. + Mainnet => Duration::from_secs(60 * 15), + // 30 second interval for a 2 minute block time. + Goerli => Duration::from_secs(30), + } +} From 541851b9a74272dace8636f5b4d8cb4dbabd7d08 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig Date: Wed, 13 Apr 2022 12:23:40 +0200 Subject: [PATCH 2/2] feat: bump version number --- Cargo.lock | 2 +- crates/pathfinder/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7eb104184..f51da8678b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1767,7 +1767,7 @@ dependencies = [ [[package]] name = "pathfinder" -version = "0.1.6" +version = "0.1.7" dependencies = [ "anyhow", "assert_matches", diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index d0175cef4f..d7515bf7a1 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathfinder" -version = "0.1.6" +version = "0.1.7" edition = "2021" license = "MIT OR Apache-2.0" rust-version = "1.58"