Skip to content

Commit

Permalink
run zksync-era on local machine
Browse files Browse the repository at this point in the history
  • Loading branch information
mm-zk authored and vivijj committed Jun 7, 2024
1 parent fd41e42 commit 7af0256
Show file tree
Hide file tree
Showing 516 changed files with 4,419 additions and 3,201 deletions.
7 changes: 5 additions & 2 deletions core/bin/block_reverter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ enum Command {
/// Flag that specifies if snapshot files in GCS should be rolled back.
#[arg(long, requires = "rollback_postgres")]
rollback_snapshots: bool,
/// Flag that allows to roll back already executed blocks. It's ultra dangerous and required only for fixing external nodes.
/// Flag that allows to roll back already executed blocks. It's ultra dangerous and
/// required only for fixing external nodes.
#[arg(long)]
allow_executed_block_reversion: bool,
},
Expand Down Expand Up @@ -209,7 +210,9 @@ async fn main() -> anyhow::Result<()> {
}

if allow_executed_block_reversion {
println!("You want to roll back already executed blocks. It's impossible to restore them for the main node");
println!(
"You want to roll back already executed blocks. It's impossible to restore them for the main node"
);
println!("Make sure you are doing it ONLY for external node");
println!("Are you sure? Print y/n");

Expand Down
5 changes: 3 additions & 2 deletions core/bin/contract-verifier/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,9 @@ impl JobProcessor for ContractVerifier {
const TIME_OVERHEAD: Duration = Duration::from_secs(10);

// Considering that jobs that reach compilation timeout will be executed in
// `compilation_timeout` + `non_compilation_time_overhead` (which is significantly less than `compilation_timeout`),
// we re-pick up jobs that are being executed for a bit more than `compilation_timeout`.
// `compilation_timeout` + `non_compilation_time_overhead` (which is significantly less than
// `compilation_timeout`), we re-pick up jobs that are being executed for a bit more
// than `compilation_timeout`.
let job = connection
.contract_verification_dal()
.get_next_queued_verification_request(self.config.compilation_timeout() + TIME_OVERHEAD)
Expand Down
6 changes: 3 additions & 3 deletions core/bin/contract-verifier/src/zksolc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ pub struct Source {
}

/// Compiler settings.
/// There are fields like `output_selection`, `is_system`, `force_evmla` which are accessed by contract verifier explicitly.
/// Other fields are accumulated in `other`, this way every field that was in the original request will be passed to a compiler.
/// There are fields like `output_selection`, `is_system`, `force_evmla` which are accessed by
/// contract verifier explicitly. Other fields are accumulated in `other`, this way every field that
/// was in the original request will be passed to a compiler.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Settings {
Expand Down Expand Up @@ -77,7 +78,6 @@ impl Default for Optimizer {
impl Optimizer {
///
/// A shortcut constructor.
///
pub fn new(enabled: bool) -> Self {
Self {
enabled,
Expand Down
173 changes: 98 additions & 75 deletions core/bin/external_node/src/config/mod.rs

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions core/bin/external_node/src/config/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use super::{ConfigurationSource, Environment};
/// Observability part of the node configuration.
#[derive(Debug, Default, Deserialize)]
pub(crate) struct ObservabilityENConfig {
/// Port to bind the Prometheus exporter server to. If not specified, the server will not be launched.
/// If the push gateway URL is specified, it will prevail.
/// Port to bind the Prometheus exporter server to. If not specified, the server will not be
/// launched. If the push gateway URL is specified, it will prevail.
pub prometheus_port: Option<u16>,
/// Prometheus push gateway to push metrics to. Overrides `prometheus_port`. A full URL must be specified
/// including `job_id` and other path segments; it will be used verbatim as the URL to push data to.
/// Prometheus push gateway to push metrics to. Overrides `prometheus_port`. A full URL must be
/// specified including `job_id` and other path segments; it will be used verbatim as the
/// URL to push data to.
pub prometheus_pushgateway_url: Option<String>,
/// Interval between pushing metrics to the Prometheus push gateway.
#[serde(default = "ObservabilityENConfig::default_prometheus_push_interval_ms")]
Expand Down Expand Up @@ -68,7 +69,9 @@ impl ObservabilityENConfig {
match (self.prometheus_port, &self.prometheus_pushgateway_url) {
(_, Some(url)) => {
if self.prometheus_port.is_some() {
tracing::info!("Both Prometheus port and push gateway URLs are specified; the push gateway URL will be used");
tracing::info!(
"Both Prometheus port and push gateway URLs are specified; the push gateway URL will be used"
);
}
let push_interval = Duration::from_millis(self.prometheus_push_interval_ms);
Some(PrometheusExporterConfig::push(url.clone(), push_interval))
Expand Down
4 changes: 2 additions & 2 deletions core/bin/external_node/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ impl ValidateChainIdsTask {
}

pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
// Since check futures are fused, they are safe to poll after getting resolved; they will never resolve again,
// so we'll just wait for another check or a stop signal.
// Since check futures are fused, they are safe to poll after getting resolved; they will
// never resolve again, so we'll just wait for another check or a stop signal.
let eth_client_check = Self::check_eth_client(self.eth_client, self.l1_chain_id).fuse();
let main_node_l1_check =
Self::check_l1_chain_using_main_node(self.main_node_client.clone(), self.l1_chain_id)
Expand Down
8 changes: 6 additions & 2 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub(crate) async fn ensure_storage_initialized(
InitDecision::Genesis
}
(None, Some(snapshot_recovery)) => {
tracing::info!("Node has no genesis L1 batch and snapshot recovery information: {snapshot_recovery:?}");
tracing::info!(
"Node has no genesis L1 batch and snapshot recovery information: {snapshot_recovery:?}"
);
InitDecision::SnapshotRecovery
}
(None, None) => {
Expand Down Expand Up @@ -84,7 +86,9 @@ pub(crate) async fn ensure_storage_initialized(
`EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery"
);

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
tracing::warn!(
"Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"
);
let recovery_config = SnapshotsRecoveryConfig::new()?;
let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store)
.create_store()
Expand Down
65 changes: 39 additions & 26 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ async fn run_tree(
app_health.insert_custom_component(Arc::new(metadata_calculator.tree_health_check()))?;

if config.optional.pruning_enabled {
tracing::warn!("Proceeding with node state pruning for the Merkle tree. This is an experimental feature; use at your own risk");
tracing::warn!(
"Proceeding with node state pruning for the Merkle tree. This is an experimental feature; use at your own risk"
);

let pruning_task =
metadata_calculator.pruning_task(config.optional.pruning_removal_delay() / 2);
Expand Down Expand Up @@ -226,9 +228,11 @@ async fn run_core(

let mut persistence = persistence.with_tx_insertion();
if !config.optional.protective_reads_persistence_enabled {
// **Important:** Disabling protective reads persistence is only sound if the node will never
// run a full Merkle tree.
tracing::warn!("Disabling persisting protective reads; this should be safe, but is considered an experimental option at the moment");
// **Important:** Disabling protective reads persistence is only sound if the node will
// never run a full Merkle tree.
tracing::warn!(
"Disabling persisting protective reads; this should be safe, but is considered an experimental option at the moment"
);
persistence = persistence.without_protective_reads();
}
let tree_writes_persistence = TreeWritesPersistence::new(connection_pool.clone());
Expand Down Expand Up @@ -270,11 +274,12 @@ async fn run_core(
let main_node_client = main_node_client.clone();
let mut stop_receiver = stop_receiver.clone();
async move {
// We instantiate the root context here, since the consensus task is the only user of the
// structured concurrency framework.
// Note, however, that awaiting for the `stop_receiver` is related to the root context behavior,
// not the consensus task itself. There may have been any number of tasks running in the root context,
// but we only need to wait for stop signal once, and it will be propagated to all child contexts.
// We instantiate the root context here, since the consensus task is the only user of
// the structured concurrency framework.
// Note, however, that awaiting for the `stop_receiver` is related to the root context
// behavior, not the consensus task itself. There may have been any number
// of tasks running in the root context, but we only need to wait for stop
// signal once, and it will be propagated to all child contexts.
let ctx = ctx::root();
scope::run!(&ctx, |ctx, s| async move {
s.spawn_bg(consensus::era::run_en(
Expand All @@ -294,7 +299,9 @@ async fn run_core(
}));

if config.optional.pruning_enabled {
tracing::warn!("Proceeding with node state pruning for Postgres. This is an experimental feature; use at your own risk");
tracing::warn!(
"Proceeding with node state pruning for Postgres. This is an experimental feature; use at your own risk"
);

let minimum_l1_batch_age = config.optional.pruning_data_retention();
tracing::info!(
Expand Down Expand Up @@ -329,8 +336,9 @@ async fn run_core(
remote_diamond_proxy_addr
};

// Run validation asynchronously: the node starting shouldn't depend on Ethereum client availability,
// and the impact of a failed async check is reasonably low (the commitment mode is only used in consistency checker).
// Run validation asynchronously: the node starting shouldn't depend on Ethereum client
// availability, and the impact of a failed async check is reasonably low (the commitment
// mode is only used in consistency checker).
let validation_task = L1BatchCommitmentModeValidationTask::new(
diamond_proxy_addr,
config.optional.l1_batch_commit_data_generator_mode,
Expand Down Expand Up @@ -489,7 +497,8 @@ async fn run_api(
.build(
fee_params_fetcher,
Arc::new(vm_concurrency_limiter),
ApiContracts::load_from_disk(), // TODO (BFT-138): Allow to dynamically reload API contracts
ApiContracts::load_from_disk(), /* TODO (BFT-138): Allow to dynamically reload API
* contracts */
storage_caches,
)
.await;
Expand Down Expand Up @@ -699,8 +708,8 @@ struct Cli {
/// Revert the pending L1 batch and exit.
#[arg(long)]
revert_pending_l1_batch: bool,
/// Enables consensus-based syncing instead of JSON-RPC based one. This is an experimental and incomplete feature;
/// do not use unless you know what you're doing.
/// Enables consensus-based syncing instead of JSON-RPC based one. This is an experimental and
/// incomplete feature; do not use unless you know what you're doing.
#[arg(long)]
enable_consensus: bool,

Expand Down Expand Up @@ -821,7 +830,8 @@ async fn main() -> anyhow::Result<()> {
.await
}

/// Environment for the node encapsulating its interactions. Used in EN tests to mock signal sending etc.
/// Environment for the node encapsulating its interactions. Used in EN tests to mock signal sending
/// etc.
trait NodeEnvironment {
/// Sets the SIGINT handler, returning a future that will resolve when a signal is sent.
fn setup_sigint_handler(&mut self) -> oneshot::Receiver<()>;
Expand Down Expand Up @@ -866,12 +876,14 @@ async fn run_node(
connection_pool.clone(),
)))?;

// Start the health check server early into the node lifecycle so that its health can be monitored from the very start.
// Start the health check server early into the node lifecycle so that its health can be
// monitored from the very start.
let healthcheck_handle = HealthCheckHandle::spawn_server(
([0, 0, 0, 0], config.required.healthcheck_port).into(),
app_health.clone(),
);
// Start exporting metrics at the very start so that e.g., snapshot recovery metrics are timely reported.
// Start exporting metrics at the very start so that e.g., snapshot recovery metrics are timely
// reported.
let prometheus_task = if let Some(prometheus) = config.observability.prometheus() {
tracing::info!("Starting Prometheus exporter with configuration: {prometheus:?}");

Expand Down Expand Up @@ -938,8 +950,8 @@ async fn run_node(
)
.await?;
let sigint_receiver = env.setup_sigint_handler();
// Spawn reacting to signals in a separate task so that the node is responsive to signals right away
// (e.g., during the initial reorg detection).
// Spawn reacting to signals in a separate task so that the node is responsive to signals right
// away (e.g., during the initial reorg detection).
tokio::spawn({
let stop_sender = stop_sender.clone();
async move {
Expand All @@ -959,10 +971,10 @@ async fn run_node(
.enable_rolling_back_state_keeper_cache(config.required.state_cache_path.clone());

let mut reorg_detector = ReorgDetector::new(main_node_client.clone(), connection_pool.clone());
// We're checking for the reorg in the beginning because we expect that if reorg is detected during
// the node lifecycle, the node will exit the same way as it does with any other critical error,
// and would restart. Then, on the 2nd launch reorg would be detected here, then processed and the node
// will be able to operate normally afterwards.
// We're checking for the reorg in the beginning because we expect that if reorg is detected
// during the node lifecycle, the node will exit the same way as it does with any other
// critical error, and would restart. Then, on the 2nd launch reorg would be detected here,
// then processed and the node will be able to operate normally afterwards.
match reorg_detector.run_once(stop_receiver.clone()).await {
Ok(()) if *stop_receiver.borrow() => {
tracing::info!("Stop signal received during initial reorg detection; shutting down");
Expand Down Expand Up @@ -1030,8 +1042,9 @@ async fn run_node(
() = tasks.wait_single() => {},
}

// Reaching this point means that either some actor exited unexpectedly or we received a stop signal.
// Broadcast the stop signal (in case it wasn't broadcast previously) to all actors and exit.
// Reaching this point means that either some actor exited unexpectedly or we received a stop
// signal. Broadcast the stop signal (in case it wasn't broadcast previously) to all actors
// and exit.
stop_sender.send_replace(true);
shutdown_components(tasks, healthcheck_handle).await?;
tracing::info!("Stopped");
Expand Down
7 changes: 4 additions & 3 deletions core/bin/external_node/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ struct TestEnvironmentHandles {
app_health_receiver: oneshot::Receiver<Arc<AppHealthCheck>>,
}

// The returned components have the fully implemented health check life cycle (i.e., signal their shutdown).
// The returned components have the fully implemented health check life cycle (i.e., signal their
// shutdown).
fn expected_health_components(components: &ComponentsToRun) -> Vec<&'static str> {
let mut output = vec!["reorg_detector"];
if components.0.contains(&Component::Core) {
Expand Down Expand Up @@ -134,8 +135,8 @@ async fn external_node_basics(components_str: &'static str) {
let _guard = vlog::ObservabilityBuilder::new().build(); // Enable logging to simplify debugging
let temp_dir = tempfile::TempDir::new().unwrap();

// Simplest case to mock: the EN already has a genesis L1 batch / L2 block, and it's the only L1 batch / L2 block
// in the network.
// Simplest case to mock: the EN already has a genesis L1 batch / L2 block, and it's the only L1
// batch / L2 block in the network.
let connection_pool = ConnectionPool::test_pool().await;
let singleton_pool_builder = ConnectionPool::singleton(connection_pool.database_url().clone());
let mut storage = connection_pool.connection().await.unwrap();
Expand Down
12 changes: 9 additions & 3 deletions core/bin/external_node/src/version_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ pub async fn sync_versions(
let right_bound_remote_version =
get_l1_batch_remote_protocol_version(main_node_client.as_ref(), right_bound).await?;
if right_bound_remote_version != Some(ProtocolVersionId::Version22) {
anyhow::bail!("Remote protocol versions should be v22 for the first local v22 batch, got {right_bound_remote_version:?}");
anyhow::bail!(
"Remote protocol versions should be v22 for the first local v22 batch, got {right_bound_remote_version:?}"
);
}

while left_bound < right_bound {
Expand All @@ -81,7 +83,9 @@ pub async fn sync_versions(
right_bound = mid_batch;
}
Ordering::Greater => {
anyhow::bail!("Unexpected remote protocol version: {mid_protocol_version:?} for miniblock #{mid_miniblock}");
anyhow::bail!(
"Unexpected remote protocol version: {mid_protocol_version:?} for miniblock #{mid_miniblock}"
);
}
}
}
Expand Down Expand Up @@ -116,7 +120,9 @@ pub async fn sync_versions(
format!("Postgres is inconsistent: missing miniblocks for L1 batch #{local_first_v22_l1_batch}")
})?;

tracing::info!("Setting version 22 for miniblocks {remote_first_v22_miniblock}..={local_first_v22_miniblock}");
tracing::info!(
"Setting version 22 for miniblocks {remote_first_v22_miniblock}..={local_first_v22_miniblock}"
);
transaction
.blocks_dal()
.reset_protocol_version_for_l2_blocks(
Expand Down
3 changes: 2 additions & 1 deletion core/bin/genesis_generator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ async fn generate_new_config(
let mut updated_genesis = GenesisConfig {
protocol_version: Some(ProtocolSemanticVersion {
minor: ProtocolVersionId::latest(),
patch: 0.into(), // genesis generator proposes some new valid config, so patch 0 works here.
patch: 0.into(), /* genesis generator proposes some new valid config, so patch 0
* works here. */
}),
genesis_root_hash: None,
rollup_last_leaf_index: None,
Expand Down
10 changes: 6 additions & 4 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ impl SnapshotCreator {
);
let l1_batch_number = sealed_l1_batch_number - 1;

// Sanity check: the selected L1 batch should have Merkle tree data; otherwise, it could be impossible
// to recover from the generated snapshot.
// Sanity check: the selected L1 batch should have Merkle tree data; otherwise, it could be
// impossible to recover from the generated snapshot.
conn.blocks_dal()
.get_l1_batch_tree_data(l1_batch_number)
.await?
Expand All @@ -229,7 +229,8 @@ impl SnapshotCreator {
.get_distinct_storage_logs_keys_count(l1_batch_number)
.await?;
let chunk_size = config.storage_logs_chunk_size;
// We force the minimum number of chunks to avoid situations where only one chunk is created in tests.
// We force the minimum number of chunks to avoid situations where only one chunk is created
// in tests.
let chunk_count = distinct_storage_logs_keys_count
.div_ceil(chunk_size)
.max(min_chunk_count);
Expand Down Expand Up @@ -288,7 +289,8 @@ impl SnapshotCreator {
.load_or_initialize_snapshot_progress(&config, min_chunk_count)
.await?
else {
// No snapshot creation is necessary; a snapshot for the current L1 batch is already created
// No snapshot creation is necessary; a snapshot for the current L1 batch is already
// created
return Ok(());
};

Expand Down
Loading

0 comments on commit 7af0256

Please sign in to comment.