diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 6bef197be..04b37f8cb 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -826,6 +826,16 @@ impl ExecutedBlock { pub fn trie_updates(&self) -> &TrieUpdates { &self.trie } + + /// Returns a state root of the block. + pub fn state_root(&self) -> B256 { + self.block.header.header().state_root + } + + /// Sets the trie updates for the block. + pub fn set_trie_updates(&mut self, trie: TrieUpdates) { + self.trie = Arc::new(trie); + } } /// Non-empty chain of blocks. diff --git a/crates/cli/commands/src/node.rs b/crates/cli/commands/src/node.rs index 03c48ad9c..dfcf7f377 100644 --- a/crates/cli/commands/src/node.rs +++ b/crates/cli/commands/src/node.rs @@ -188,6 +188,8 @@ impl< pruning, enable_prefetch, skip_state_root_validation: performance_optimization.skip_state_root_validation, + compute_state_root_in_background: performance_optimization + .compute_state_root_in_background, enable_execution_cache: performance_optimization.enable_execution_cache, }; diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index 02599e227..c4dedbb9a 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -77,6 +77,7 @@ where mode: MiningMode, payload_attributes_builder: B, skip_state_root_validation: bool, + compute_state_root_in_background: bool, enable_prefetch: bool, enable_execution_cache: bool, ) -> Self @@ -87,8 +88,13 @@ where let engine_kind = if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum }; - let persistence_handle = - PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx, false); + let persistence_handle = PersistenceHandle::spawn_service( + provider, + blockchain_db.clone(), + pruner, + sync_metrics_tx, + false, + ); let payload_validator = ExecutionPayloadValidator::new(chain_spec); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); @@ -105,6 +111,7 @@ where invalid_block_hook, engine_kind, skip_state_root_validation, + compute_state_root_in_background, enable_prefetch, enable_execution_cache, ); diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 0d5c1848d..67a212359 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -80,6 +80,7 @@ where invalid_block_hook: Box, sync_metrics_tx: MetricEventsSender, skip_state_root_validation: bool, + compute_state_root_in_background: bool, enable_prefetch: bool, enable_execution_cache: bool, ) -> Self { @@ -90,6 +91,7 @@ where let persistence_handle = PersistenceHandle::spawn_service( provider, + blockchain_db.clone(), pruner, sync_metrics_tx, enable_execution_cache, @@ -110,6 +112,7 @@ where invalid_block_hook, engine_kind, skip_state_root_validation, + compute_state_root_in_background, enable_prefetch, enable_execution_cache, ); @@ -228,6 +231,7 @@ mod tests { false, false, false, + false, ); } } diff --git a/crates/engine/tree/src/metrics.rs b/crates/engine/tree/src/metrics.rs index ecbddc68d..dd669da47 100644 --- a/crates/engine/tree/src/metrics.rs +++ b/crates/engine/tree/src/metrics.rs @@ -21,4 +21,6 @@ pub(crate) struct PersistenceMetrics { pub(crate) save_blocks_duration_seconds: Histogram, /// How long it took for blocks to be pruned pub(crate) prune_before_duration_seconds: Histogram, + /// The height of blocks was persisted + pub(crate) persistence_height: Gauge, } diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 991d5f196..76e91b4da 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,13 +1,19 @@ use crate::metrics::PersistenceMetrics; use alloy_eips::BlockNumHash; +use alloy_primitives::B256; use reth_chain_state::ExecutedBlock; use reth_errors::ProviderError; +use reth_primitives::GotExpected; use reth_provider::{ - providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, - ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory, + providers::{ConsistentDbView, ProviderNodeTypes}, + writer::UnifiedStorageWriter, + BlockHashReader, BlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, + StateProviderFactory, StateReader, StaticFileProviderFactory, }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; +use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; +use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError}; use std::{ sync::mpsc::{Receiver, SendError, Sender}, time::Instant, @@ -24,9 +30,11 @@ use tracing::{debug, error}; /// This should be spawned in its own thread with [`std::thread::spawn`], since this performs /// blocking I/O operations in an endless loop. #[derive(Debug)] -pub struct PersistenceService { +pub struct PersistenceService { /// The provider factory to use provider: ProviderFactory, + /// The view provider + view_provider: P, /// Incoming requests incoming: Receiver, /// The pruner @@ -39,10 +47,15 @@ pub struct PersistenceService { enable_state_cache: bool, } -impl PersistenceService { +impl PersistenceService +where + P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static, +

::Provider: BlockReader, +{ /// Create a new persistence service pub fn new( provider: ProviderFactory, + view_provider: P, incoming: Receiver, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, @@ -50,6 +63,7 @@ impl PersistenceService { ) -> Self { Self { provider, + view_provider, incoming, pruner, metrics: PersistenceMetrics::default(), @@ -70,7 +84,11 @@ impl PersistenceService { } } -impl PersistenceService { +impl PersistenceService +where + P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static, +

::Provider: BlockReader, +{ /// This is the main loop, that will listen to database events and perform the requested /// database actions pub fn run(mut self) -> Result<(), PersistenceError> { @@ -104,6 +122,30 @@ impl PersistenceService { } } } + PersistenceAction::SaveBlocksWithStateRootCalculation( + blocks, + parent_hash, + sender, + ) => { + let result = + self.on_save_block_with_state_root_calculation(blocks, parent_hash)?; + let result_number = result.0.map(|r| r.number); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(result); + + if let Some(block_number) = result_number { + // send new sync metrics based on saved blocks + let _ = self + .sync_metrics_tx + .send(MetricEvent::SyncHeight { height: block_number }); + + if self.pruner.is_pruning_needed(block_number) { + // We log `PrunerOutput` inside the `Pruner` + let _ = self.prune_before(block_number)?; + } + } + } PersistenceAction::SaveFinalizedBlock(finalized_block) => { let provider = self.provider.database_provider_rw()?; provider.save_finalized_block_number(finalized_block)?; @@ -166,8 +208,90 @@ impl PersistenceService { UnifiedStorageWriter::commit(provider_rw, static_file_provider)?; } self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); + self.metrics + .persistence_height + .set(last_block_hash_num.as_ref().map(|b| b.number).unwrap_or(0) as f64); Ok(last_block_hash_num) } + + fn on_save_block_with_state_root_calculation( + &self, + mut blocks: Vec, + parent_hash: B256, + ) -> Result<(Option, TrieUpdates), PersistenceError> { + debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks"); + + let state_root_result = self + .compute_state_root_in_batch_blocks(blocks.clone(), parent_hash) + .map_err(PersistenceError::StateRootError)?; + + if let Some(last_block) = blocks.last_mut() { + last_block.set_trie_updates(state_root_result.1.clone()); + } + + let save_blocks_result = self.on_save_blocks(blocks)?; + Ok((save_blocks_result, state_root_result.1)) + } + + fn compute_state_root_in_batch_blocks( + &self, + blocks: Vec, + parent_hash: B256, + ) -> Result<(B256, TrieUpdates), AdvanceCalculateStateRootError> { + let mut hashed_state = HashedPostState::default(); + for block in &blocks { + hashed_state.extend(block.hashed_state().clone()); + } + let block_number = blocks.last().unwrap().block().number; + let block_hash = blocks.last().unwrap().block().hash(); + let target_state_root = blocks.last().unwrap().state_root(); + + let root_time = Instant::now(); + debug!(target: "engine::persistence", ?block_number, ?block_hash, "Computing state root"); + let state_root_result = match self.compute_state_root_parallel(parent_hash, &hashed_state) { + Ok((state_root, trie_output)) => Some((state_root, trie_output)), + Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => { + debug!(target: "engine::persistence", %error, "Parallel state root computation failed consistency check, falling back"); + None + } + Err(error) => return Err(AdvanceCalculateStateRootError::ComputeFailed(error)), + }; + + let (state_root, trie_output) = if let Some(result) = state_root_result { + result + } else { + return Err(AdvanceCalculateStateRootError::ResultNotFound()); + }; + + let root_elapsed = root_time.elapsed(); + debug!(target: "engine::persistence", ?block_number, ?block_hash, ?state_root, ?root_elapsed, "Computed state root"); + + if state_root != target_state_root { + return Err(AdvanceCalculateStateRootError::StateRootDiff(GotExpected { + got: state_root, + expected: target_state_root, + })) + } + + Ok((state_root, trie_output)) + } + + fn compute_state_root_parallel( + &self, + parent_hash: B256, + hashed_state: &HashedPostState, + ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { + let consistent_view = ConsistentDbView::new_with_latest_tip(self.view_provider.clone())?; + let mut input = TrieInput::default(); + + let revert_state = consistent_view.revert_state(parent_hash)?; + input.append(revert_state); + + // Extend with block we are validating root for. + input.append_ref(hashed_state); + + ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates() + } } /// One of the errors that can happen when using the persistence service. @@ -180,6 +304,28 @@ pub enum PersistenceError { /// A provider error #[error(transparent)] ProviderError(#[from] ProviderError), + + /// A state root error + #[error(transparent)] + StateRootError(#[from] AdvanceCalculateStateRootError), +} + +/// This is an error that can come from advancing state root calculation. Either this can be a +/// [`ProviderError`], or this can be a [`GotExpected`] +#[derive(Debug, Error)] +pub enum AdvanceCalculateStateRootError { + /// A provider error + #[error(transparent)] + Provider(#[from] ProviderError), + /// An error that can come from a state root diff + #[error(transparent)] + StateRootDiff(#[from] GotExpected), + /// An error that can come from a parallel state root error + #[error(transparent)] + ComputeFailed(#[from] ParallelStateRootError), + /// An error that can come from a trie update + #[error("Result not found")] + ResultNotFound(), } /// A signal to the persistence service that part of the tree state can be persisted. @@ -192,6 +338,15 @@ pub enum PersistenceAction { /// Then the execution history-related data will be written to the database. SaveBlocks(Vec, oneshot::Sender>), + /// The section of tree state that should be persisted. These blocks are expected in order of + /// increasing block number. + /// This action will also calculate the state root for the given blocks. + SaveBlocksWithStateRootCalculation( + Vec, + B256, + oneshot::Sender<(Option, TrieUpdates)>, + ), + /// Removes block data above the given block number from the database. /// /// This will first update checkpoints from the database, then remove actual block data from @@ -219,12 +374,22 @@ impl PersistenceHandle { } /// Create a new [`PersistenceHandle`], and spawn the persistence service. - pub fn spawn_service( + pub fn spawn_service( provider_factory: ProviderFactory, + view_provider: P, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, enable_state_cache: bool, - ) -> Self { + ) -> Self + where + P: DatabaseProviderFactory + + BlockReader + + StateProviderFactory + + StateReader + + Clone + + 'static, +

::Provider: BlockReader, + { // create the initial channels let (db_service_tx, db_service_rx) = std::sync::mpsc::channel(); @@ -234,6 +399,7 @@ impl PersistenceHandle { // spawn the persistence service let db_service = PersistenceService::new( provider_factory, + view_provider, db_service_rx, pruner, sync_metrics_tx, @@ -276,6 +442,23 @@ impl PersistenceHandle { self.send_action(PersistenceAction::SaveBlocks(blocks, tx)) } + /// Persists the finalized block number on disk. + /// This will also calculate the state root for the given blocks. + /// The resulting [`TrieUpdates`] is returned in the receiver end of the sender argument. + /// The new tip hash is returned in the receiver end of the sender argument. + pub fn save_blocks_with_state_root_calculation( + &self, + blocks: Vec, + parent_hash: B256, + tx: oneshot::Sender<(Option, TrieUpdates)>, + ) -> Result<(), SendError> { + self.send_action(PersistenceAction::SaveBlocksWithStateRootCalculation( + blocks, + parent_hash, + tx, + )) + } + /// Persists the finalized block number on disk. pub fn save_finalized_block_number( &self, @@ -312,12 +495,13 @@ mod tests { use alloy_primitives::B256; use reth_chain_state::test_utils::TestBlockBuilder; use reth_exex_types::FinishedExExHeight; - use reth_provider::test_utils::create_test_provider_factory; + use reth_provider::{providers::BlockchainProvider2, test_utils::create_test_provider_factory}; use reth_prune::Pruner; use tokio::sync::mpsc::unbounded_channel; fn default_persistence_handle() -> PersistenceHandle { let provider = create_test_provider_factory(); + let view_provider = BlockchainProvider2::new(provider.clone()).unwrap(); let (_finished_exex_height_tx, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs); @@ -334,7 +518,7 @@ mod tests { ); let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel(); - PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx, false) + PersistenceHandle::spawn_service(provider, view_provider, pruner, sync_metrics_tx, false) } #[tokio::test] diff --git a/crates/engine/tree/src/tree/config.rs b/crates/engine/tree/src/tree/config.rs index d252b65a8..8cb710ff5 100644 --- a/crates/engine/tree/src/tree/config.rs +++ b/crates/engine/tree/src/tree/config.rs @@ -1,10 +1,10 @@ //! Engine tree configuration. /// Triggers persistence when the number of canonical blocks in memory exceeds this threshold. -pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2; +pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 16; /// How close to the canonical head we persist blocks. -pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 2; +pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 16; const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256; const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index d099cccc0..ddb292a94 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -3,6 +3,7 @@ use crate::{ chain::FromOrchestrator, engine::{DownloadRequest, EngineApiEvent, FromEngine}, persistence::PersistenceHandle, + tree::persistence_state::PersistenceReceiver, }; use alloy_eips::BlockNumHash; use alloy_primitives::{ @@ -515,6 +516,8 @@ pub struct EngineApiTreeHandler { engine_kind: EngineApiKind, /// Flag indicating whether the state root validation should be skipped. skip_state_root_validation: bool, + /// Flag indicating whether to compute the state root in the background. + compute_state_root_in_background: bool, /// Flag indicating whether to enable prefetch. enable_prefetch: bool, /// Flag indicating whether the cache for execution is enabled. @@ -542,6 +545,7 @@ impl std::fmt::Debug .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook)) .field("engine_kind", &self.engine_kind) .field("skip_state_root_validation", &self.skip_state_root_validation) + .field("compute_state_root_in_background", &self.compute_state_root_in_background) .field("enable_prefetch", &self.enable_prefetch) .field("enable_execution_cache", &self.enable_execution_cache) .finish() @@ -572,11 +576,11 @@ where config: TreeConfig, engine_kind: EngineApiKind, skip_state_root_validation: bool, + compute_state_root_in_background: bool, enable_prefetch: bool, enable_execution_cache: bool, ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); - Self { provider, executor_provider, @@ -596,6 +600,7 @@ where invalid_block_hook: Box::new(NoopInvalidBlockHook), engine_kind, skip_state_root_validation, + compute_state_root_in_background, enable_prefetch, enable_execution_cache, } @@ -624,6 +629,7 @@ where invalid_block_hook: Box, kind: EngineApiKind, skip_state_root_validation: bool, + compute_state_root_in_background: bool, enable_prefetch: bool, enable_execution_cache: bool, ) -> (Sender>>, UnboundedReceiver) { @@ -657,6 +663,7 @@ where config, kind, skip_state_root_validation, + compute_state_root_in_background, enable_prefetch, enable_execution_cache, ); @@ -683,7 +690,7 @@ where /// Run the engine API handler. /// /// This will block the current thread and process incoming messages. - pub fn run(mut self) { + pub fn run(&mut self) { loop { match self.try_recv_engine_message() { Ok(Some(msg)) => { @@ -1178,9 +1185,18 @@ where self.persistence_state.start(rx); } } else if self.should_persist() { - let blocks_to_persist = self.get_canonical_blocks_to_persist(); + let (blocks_to_persist, parent_hash) = self.get_canonical_blocks_to_persist(); if blocks_to_persist.is_empty() { debug!(target: "engine::tree", "Returned empty set of blocks to persist"); + } else if !self.skip_state_root_validation && self.compute_state_root_in_background + { + let (tx, rx) = oneshot::channel(); + let _ = self.persistence.save_blocks_with_state_root_calculation( + blocks_to_persist, + parent_hash, + tx, + ); + self.persistence_state.start_with_root_update(rx); } else { debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.block.num_hash()).collect::>(), "Persisting blocks"); let (tx, rx) = oneshot::channel(); @@ -1191,33 +1207,75 @@ where } if self.persistence_state.in_progress() { - let (mut rx, start_time) = self + let (rx, start_time) = self .persistence_state .rx .take() .expect("if a persistence task is in progress Receiver must be Some"); - // Check if persistence has complete - match rx.try_recv() { - Ok(last_persisted_hash_num) => { - self.metrics.engine.persistence_duration.record(start_time.elapsed()); - let Some(BlockNumHash { - hash: last_persisted_block_hash, - number: last_persisted_block_number, - }) = last_persisted_hash_num - else { - // if this happened, then we persisted no blocks because we sent an - // empty vec of blocks - warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks"); - return Ok(()) - }; - debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish"); - self.persistence_state - .finish(last_persisted_block_hash, last_persisted_block_number); - self.on_new_persisted_block()?; + match rx { + PersistenceReceiver::Simple(mut rx) => { + // Check if persistence has complete + match rx.try_recv() { + Ok(last_persisted_hash_num) => { + self.metrics.engine.persistence_duration.record(start_time.elapsed()); + let Some(BlockNumHash { + hash: last_persisted_block_hash, + number: last_persisted_block_number, + }) = last_persisted_hash_num + else { + // if this happened, then we persisted no blocks because we sent an + // empty vec of blocks + warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks"); + return Ok(()) + }; + + debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish"); + self.persistence_state + .finish(last_persisted_block_hash, last_persisted_block_number); + self.on_new_persisted_block()?; + } + Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()), + Err(TryRecvError::Empty) => { + self.persistence_state.rx = + Some((PersistenceReceiver::Simple(rx), start_time)) + } + } + } + PersistenceReceiver::WithRootUpdates(mut rx) => { + // Check if persistence has complete + match rx.try_recv() { + Ok(persistence_result) => { + let elapsed = start_time.elapsed(); + let (last_persisted_hash_num, root_updates) = persistence_result; + self.metrics + .block_validation + .record_state_root(&root_updates, elapsed.as_secs_f64()); + + self.metrics.engine.persistence_duration.record(elapsed); + let Some(BlockNumHash { + hash: last_persisted_block_hash, + number: last_persisted_block_number, + }) = last_persisted_hash_num + else { + // if this happened, then we persisted no blocks because we sent an + // empty vec of blocks + warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks"); + return Ok(()) + }; + + trace!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish"); + self.persistence_state + .finish(last_persisted_block_hash, last_persisted_block_number); + self.on_new_persisted_block()?; + } + Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()), + Err(TryRecvError::Empty) => { + self.persistence_state.rx = + Some((PersistenceReceiver::WithRootUpdates(rx), start_time)) + } + } } - Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()), - Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)), } } Ok(()) @@ -1507,7 +1565,7 @@ where /// Returns a batch of consecutive canonical blocks to persist in the range /// `(last_persisted_number .. canonical_head - threshold]` . The expected /// order is oldest -> newest. - fn get_canonical_blocks_to_persist(&self) -> Vec { + fn get_canonical_blocks_to_persist(&self) -> (Vec, B256) { let mut blocks_to_persist = Vec::new(); let mut current_hash = self.state.tree_state.canonical_block_hash(); let last_persisted_number = self.persistence_state.last_persisted_block.number; @@ -1533,7 +1591,7 @@ where // reverse the order so that the oldest block comes first blocks_to_persist.reverse(); - blocks_to_persist + (blocks_to_persist, current_hash) } /// This clears the blocks from the in-memory tree state that have been persisted to the @@ -2276,7 +2334,7 @@ where let mut trie_output: TrieUpdates = TrieUpdates::default(); trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root"); - if !self.skip_state_root_validation { + if !self.skip_state_root_validation && !self.compute_state_root_in_background { let root_time = Instant::now(); let mut state_root_result = None; @@ -2831,6 +2889,7 @@ mod tests { false, false, false, + false, ); let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); @@ -3152,10 +3211,10 @@ mod tests { let blocks: Vec<_> = test_block_builder .get_executed_blocks(1..tree_config.persistence_threshold() + 2) .collect(); - let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone()); + let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone()); std::thread::Builder::new() .name("Tree Task".to_string()) - .spawn(|| test_harness.tree.run()) + .spawn(move || test_harness.tree.run()) .unwrap(); // send a message to the tree to enter the main loop. @@ -3657,7 +3716,7 @@ mod tests { .with_persistence_threshold(persistence_threshold) .with_memory_block_buffer_target(memory_block_buffer_target); - let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist(); + let (blocks_to_persist, _) = test_harness.tree.get_canonical_blocks_to_persist(); let expected_blocks_to_persist_length: usize = (canonical_head_number - memory_block_buffer_target - last_persisted_block_number) @@ -3678,7 +3737,7 @@ mod tests { assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some()); - let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist(); + let (blocks_to_persist, _) = test_harness.tree.get_canonical_blocks_to_persist(); assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length); // check that the fork block is not included in the blocks to persist diff --git a/crates/engine/tree/src/tree/persistence_state.rs b/crates/engine/tree/src/tree/persistence_state.rs index b00b7175f..1ce15d27f 100644 --- a/crates/engine/tree/src/tree/persistence_state.rs +++ b/crates/engine/tree/src/tree/persistence_state.rs @@ -1,9 +1,19 @@ use alloy_eips::BlockNumHash; use alloy_primitives::B256; +use reth_trie::updates::TrieUpdates; use std::{collections::VecDeque, time::Instant}; use tokio::sync::oneshot; use tracing::{debug, trace}; +/// Enum to represent different types of persistence receivers. +#[derive(Debug)] +pub(crate) enum PersistenceReceiver { + /// Receiver for simple persistence tasks. + Simple(oneshot::Receiver>), + /// Receiver for persistence tasks with root updates. + WithRootUpdates(oneshot::Receiver<(Option, TrieUpdates)>), +} + /// The state of the persistence task. #[derive(Default, Debug)] pub struct PersistenceState { @@ -13,12 +23,13 @@ pub struct PersistenceState { pub(crate) last_persisted_block: BlockNumHash, /// Receiver end of channel where the result of the persistence task will be /// sent when done. A None value means there's no persistence task in progress. - pub(crate) rx: Option<(oneshot::Receiver>, Instant)>, + pub(crate) rx: Option<(PersistenceReceiver, Instant)>, /// The block above which blocks should be removed from disk, because there has been an on disk /// reorg. pub(crate) remove_above_state: VecDeque, } +/// The state of the persistence task. impl PersistenceState { /// Determines if there is a persistence task in progress by checking if the /// receiver is set. @@ -28,7 +39,15 @@ impl PersistenceState { /// Sets state for a started persistence task. pub(crate) fn start(&mut self, rx: oneshot::Receiver>) { - self.rx = Some((rx, Instant::now())); + self.rx = Some((PersistenceReceiver::Simple(rx), Instant::now())); + } + + /// Sets state for a started persistence task with root updates. + pub(crate) fn start_with_root_update( + &mut self, + rx: oneshot::Receiver<(Option, TrieUpdates)>, + ) { + self.rx = Some((PersistenceReceiver::WithRootUpdates(rx), Instant::now())); } /// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index bae8ce482..65e33b945 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -237,6 +237,7 @@ where mining_mode, LocalPayloadAttributesBuilder::new(ctx.chain_spec()), ctx.node_config().skip_state_root_validation, + ctx.node_config().compute_state_root_in_background, ctx.node_config().enable_prefetch, ctx.node_config().enable_execution_cache, ); @@ -261,6 +262,7 @@ where ctx.invalid_block_hook()?, ctx.sync_metrics_tx(), ctx.node_config().skip_state_root_validation, + ctx.node_config().compute_state_root_in_background, ctx.node_config().enable_prefetch, ctx.node_config().enable_execution_cache, ); @@ -298,6 +300,7 @@ where ctx.invalid_block_hook()?, ctx.sync_metrics_tx(), ctx.node_config().skip_state_root_validation, + ctx.node_config().compute_state_root_in_background, ctx.node_config().enable_prefetch, ctx.node_config().enable_execution_cache, ); diff --git a/crates/node/core/src/args/performance_optimization.rs b/crates/node/core/src/args/performance_optimization.rs index a4c6f1232..208ba2fcb 100644 --- a/crates/node/core/src/args/performance_optimization.rs +++ b/crates/node/core/src/args/performance_optimization.rs @@ -19,4 +19,13 @@ pub struct PerformanceOptimizationArgs { /// This flag is intended for performance optimization when importing blocks of live-sync. #[arg(long = "optimize.enable-execution-cache", default_value_t = false)] pub enable_execution_cache: bool, + + /// Enables state root computation in the background with a persistent database. + /// + /// This option is intended for performance optimization when importing blocks + /// during live sync. It allows state root calculations to be performed + /// concurrently with other operations, potentially reducing overall + /// processing time. + #[arg(long = "optimize.compute-state-root-in-background", default_value_t = false)] + pub compute_state_root_in_background: bool, } diff --git a/crates/node/core/src/node_config.rs b/crates/node/core/src/node_config.rs index b8278c69b..baa06ab13 100644 --- a/crates/node/core/src/node_config.rs +++ b/crates/node/core/src/node_config.rs @@ -135,6 +135,9 @@ pub struct NodeConfig { /// Disable hashing stages to skip merkle tree building pub skip_state_root_validation: bool, + /// Compute state root in background + pub compute_state_root_in_background: bool, + /// Enable execution cache during block insertion pub enable_execution_cache: bool, } @@ -167,6 +170,7 @@ impl NodeConfig { datadir: DatadirArgs::default(), enable_prefetch: false, skip_state_root_validation: false, + compute_state_root_in_background: false, enable_execution_cache: false, } } @@ -459,6 +463,7 @@ impl NodeConfig { enable_prefetch: self.enable_prefetch, skip_state_root_validation: self.skip_state_root_validation, enable_execution_cache: self.enable_execution_cache, + compute_state_root_in_background: self.compute_state_root_in_background, } } } @@ -487,6 +492,7 @@ impl Clone for NodeConfig { datadir: self.datadir.clone(), enable_prefetch: false, skip_state_root_validation: false, + compute_state_root_in_background: false, enable_execution_cache: false, } }