Skip to content

Commit

Permalink
Merge pull request #236 from eqlabs/backoff
Browse files Browse the repository at this point in the history
Reduce sequencer spam when at head of chain
  • Loading branch information
Mirko-von-Leipzig authored Apr 13, 2022
2 parents 327b14e + 541851b commit 379ded1
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pathfinder"
version = "0.1.6"
version = "0.1.7"
edition = "2021"
license = "MIT OR Apache-2.0"
rust-version = "1.58"
Expand Down
22 changes: 13 additions & 9 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod l1;
mod l2;

use std::sync::Arc;
use std::time::Duration;

use crate::{
core::{
Expand Down Expand Up @@ -75,11 +74,12 @@ pub async fn sync(
Arc::clone(&state),
sequencer.clone(),
starting_block,
chain,
));

// Start L1 and L2 sync processes.
let mut l1_handle = tokio::spawn(l1::sync(tx_l1, transport.clone(), chain, l1_head));
let mut l2_handle = tokio::spawn(l2::sync(tx_l2, sequencer.clone(), l2_head));
let mut l2_handle = tokio::spawn(l2::sync(tx_l2, sequencer.clone(), l2_head, chain));

let mut existed = (0, 0);

Expand Down Expand Up @@ -285,7 +285,7 @@ pub async fn sync(
let (new_tx, new_rx) = mpsc::channel(1);
rx_l2 = new_rx;

l2_handle = tokio::spawn(l2::sync(new_tx, sequencer.clone(), l2_head));
l2_handle = tokio::spawn(l2::sync(new_tx, sequencer.clone(), l2_head, chain));
tracing::info!("L2 sync process restarted.");
}
}
Expand All @@ -298,8 +298,11 @@ async fn update_sync_status_latest(
state: Arc<State>,
sequencer: sequencer::Client,
starting_block: StarknetBlockHash,
chain: Chain,
) -> anyhow::Result<()> {
use crate::rpc::types::{BlockNumberOrTag, Tag};
let poll_interval = l2::head_poll_interval(chain);

loop {
// Work-around the sequencer block fetch being flakey.
let latest = loop {
Expand All @@ -322,23 +325,24 @@ async fn update_sync_status_latest(
});

tracing::debug!(
"Updated sync status with latest block hash: {}",
latest.0.to_hex_str()
starting=%starting_block.0,
current=%starting_block.0,
highest=%latest.0,
"Updated sync status",
);
}
SyncStatus::Status(status) => {
if status.highest_block != latest {
status.highest_block = latest;
tracing::debug!(
"Updated sync status with latest block hash: {}",
latest.0.to_hex_str()
highest=%latest.0,
"Updated sync status",
);
}
}
}

// Update once every 10 seconds at most.
tokio::time::sleep(Duration::from_secs(10)).await;
tokio::time::sleep(poll_interval).await;
}
}

Expand Down
56 changes: 36 additions & 20 deletions crates/pathfinder/src/state/sync/l2.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::{collections::HashSet, time::Duration};
use std::collections::HashSet;
use std::time::Duration;

use anyhow::Context;

use tokio::sync::{mpsc, oneshot};

use crate::{
core::{ContractHash, StarknetBlockHash, StarknetBlockNumber},
ethereum::state_update::{ContractUpdate, DeployedContract, StateUpdate, StorageUpdate},
rpc::types::{BlockNumberOrTag, Tag},
sequencer::{
self,
error::SequencerError,
reply::{
state_update::{Contract, StateDiff},
Block,
},
},
state::{contract_hash::extract_abi_code_hash, CompressedContract},
};
use crate::core::{ContractHash, StarknetBlockHash, StarknetBlockNumber};
use crate::ethereum::state_update::{ContractUpdate, DeployedContract, StateUpdate, StorageUpdate};
use crate::rpc::types::{BlockNumberOrTag, Tag};
use crate::sequencer::error::SequencerError;
use crate::sequencer::reply::state_update::{Contract, StateDiff};
use crate::sequencer::reply::Block;
use crate::sequencer::{self};
use crate::state::contract_hash::extract_abi_code_hash;
use crate::state::CompressedContract;

#[derive(Debug, Clone, Copy)]
pub struct Timings {
Expand Down Expand Up @@ -56,6 +51,7 @@ pub async fn sync(
tx_event: mpsc::Sender<Event>,
sequencer: sequencer::Client,
mut head: Option<(StarknetBlockNumber, StarknetBlockHash)>,
chain: crate::ethereum::Chain,
) -> anyhow::Result<()> {
'outer: loop {
// Get the next block from L2.
Expand All @@ -68,7 +64,11 @@ pub async fn sync(
let block = loop {
match download_block(next, &sequencer).await? {
DownloadBlock::Block(block) => break block,
DownloadBlock::AtHead => tokio::time::sleep(Duration::from_secs(5)).await,
DownloadBlock::AtHead => {
let poll_interval = head_poll_interval(chain);
tracing::info!(poll_interval=?poll_interval, "At head of chain");
tokio::time::sleep(poll_interval).await;
}
DownloadBlock::Reorg => {
let some_head = head.unwrap();
head = reorg(some_head, &tx_event, &sequencer)
Expand Down Expand Up @@ -176,8 +176,8 @@ async fn download_block(
Ok(block) => Ok(DownloadBlock::Block(block)),
Err(SequencerError::StarknetError(err)) if err.code == BlockNotFound => {
// This would occur if we queried past the head of the chain. We now need to check that
// a reorg hasn't put us too far in the future. This does run into race conditions with the
// sequencer but this is the best we can do I think.
// a reorg hasn't put us too far in the future. This does run into race conditions with
// the sequencer but this is the best we can do I think.
let latest = sequencer
.block_by_number(BlockNumberOrTag::Tag(Tag::Latest))
.await
Expand Down Expand Up @@ -225,7 +225,7 @@ async fn reorg(
.with_context(|| format!("Download block {} from sequencer", previous_block_number.0))?
{
DownloadBlock::Block(block) if block.block_hash.unwrap() == previous_hash => {
break Some((previous_block_number, previous_hash))
break Some((previous_block_number, previous_hash));
}
_ => {}
};
Expand Down Expand Up @@ -358,3 +358,19 @@ async fn download_and_compress_contract(
hash,
})
}

/// Returns the interval to be used when polling the sequencer while at the head of the chain. The
/// interval is chosen to provide a good balance between spamming the sequencer and getting new
/// block information as it is available.
///
/// The interval is based on the block creation time, which is 2 minutes for Goerlie and 2 hours for
/// Mainnet.
pub fn head_poll_interval(chain: crate::ethereum::Chain) -> Duration {
use crate::ethereum::Chain::*;
match chain {
// 15 minute interval for a 2 hour block time.
Mainnet => Duration::from_secs(60 * 15),
// 30 second interval for a 2 minute block time.
Goerli => Duration::from_secs(30),
}
}

0 comments on commit 379ded1

Please sign in to comment.