diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index dafddeffcc750..e9f18ae21b367 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -1255,7 +1255,9 @@ impl AuthorityState { ) .await .tap_err(|e| info!("process_certificate failed: {e}")) - .tap_ok(|_| debug!("process_certificate succeeded")) + .tap_ok( + |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"), + ) } pub fn read_objects_for_execution( diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index 6030006dc4fc3..c6204b6299003 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -436,7 +436,7 @@ impl CheckpointExecutor { /// Post processing and plumbing after we executed a checkpoint. This function is guaranteed /// to be called in the order of checkpoint sequence number. - #[instrument(level = "debug", skip_all)] + #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))] async fn process_executed_checkpoint( &self, epoch_store: &AuthorityPerEpochStore, @@ -447,7 +447,7 @@ impl CheckpointExecutor { ) { // Commit all transaction effects to disk let cache_commit = self.state.get_cache_commit(); - debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk"); + debug!("committing checkpoint transactions to disk"); cache_commit .commit_transaction_outputs( epoch_store.epoch(), @@ -1040,8 +1040,8 @@ fn extract_end_of_epoch_tx( let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint( (*change_epoch_tx.unwrap_or_else(|| panic!( - "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}", - digests.transaction, + "state-sync should have ensured that transaction with digests {:?} exists for checkpoint: {checkpoint:?}", + digests ) )).clone(), epoch_store.epoch(), @@ -1100,16 +1100,15 @@ fn get_unexecuted_transactions( // Remove the change epoch transaction so that we can special case its execution. checkpoint.end_of_epoch_data.as_ref().tap_some(|_| { - let change_epoch_tx_digest = execution_digests + let digests = execution_digests .pop() - .expect("Final checkpoint must have at least one transaction") - .transaction; + .expect("Final checkpoint must have at least one transaction"); let change_epoch_tx = cache_reader - .get_transaction_block(&change_epoch_tx_digest) + .get_transaction_block(&digests.transaction) .unwrap_or_else(|| panic!( - "state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}", + "state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {}", checkpoint.sequence_number() ) ); @@ -1138,7 +1137,7 @@ fn get_unexecuted_transactions( let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction) .unwrap_or_else(|| panic!( - "state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}", + "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}", checkpoint.sequence_number() ) ); diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 59e14a435f073..a4878e21a6641 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -12,12 +12,14 @@ use anyhow::Result; use arc_swap::ArcSwap; use fastcrypto_zkp::bn254::zk_login::JwkId; use fastcrypto_zkp::bn254::zk_login::OIDCProvider; +use futures::future::BoxFuture; use futures::TryFutureExt; use mysten_common::debug_fatal; use mysten_network::server::SUI_TLS_SERVER_NAME; use prometheus::Registry; use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt; +use std::future::Future; use std::path::PathBuf; use std::str::FromStr; #[cfg(msim)] @@ -151,7 +153,7 @@ mod handle; pub mod metrics; pub struct ValidatorComponents { - validator_server_handle: JoinHandle>, + validator_server_handle: ValidatorGrpcServerHandle, validator_overload_monitor_handle: Option>, consensus_manager: ConsensusManager, consensus_store_pruner: ConsensusStorePruner, @@ -836,26 +838,30 @@ impl SuiNode { let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry())); let validator_components = if state.is_validator(&epoch_store) { - Self::reexecute_pending_consensus_certs(&epoch_store, &state).await; + let (components, _) = futures::join!( + Self::construct_validator_components( + config.clone(), + state.clone(), + committee, + epoch_store.clone(), + checkpoint_store.clone(), + state_sync_handle.clone(), + randomness_handle.clone(), + Arc::downgrade(&accumulator), + backpressure_manager.clone(), + connection_monitor_status.clone(), + ®istry_service, + sui_node_metrics.clone(), + ), + Self::reexecute_pending_consensus_certs(&epoch_store, &state,) + ); + let mut components = components?; - let components = Self::construct_validator_components( - config.clone(), - state.clone(), - committee, - epoch_store.clone(), - checkpoint_store.clone(), - state_sync_handle.clone(), - randomness_handle.clone(), - Arc::downgrade(&accumulator), - backpressure_manager.clone(), - connection_monitor_status.clone(), - ®istry_service, - sui_node_metrics.clone(), - ) - .await?; - // This is only needed during cold start. components.consensus_adapter.submit_recovered(&epoch_store); + // Start the gRPC server + components.validator_server_handle = components.validator_server_handle.start(); + Some(components) } else { None @@ -1325,7 +1331,7 @@ impl SuiNode { consensus_store_pruner: ConsensusStorePruner, accumulator: Weak, backpressure_manager: Arc, - validator_server_handle: JoinHandle>, + validator_server_handle: ValidatorGrpcServerHandle, validator_overload_monitor_handle: Option>, checkpoint_metrics: Arc, sui_node_metrics: Arc, @@ -1505,7 +1511,7 @@ impl SuiNode { state: Arc, consensus_adapter: Arc, prometheus_registry: &Registry, - ) -> Result>> { + ) -> Result { let validator_service = ValidatorService::new( state.clone(), consensus_adapter, @@ -1533,9 +1539,10 @@ impl SuiNode { .map_err(|err| anyhow!(err.to_string()))?; let local_addr = server.local_addr(); info!("Listening to traffic on {local_addr}"); - let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into)); - Ok(grpc_server) + Ok(ValidatorGrpcServerHandle::new( + server.serve().map_err(Into::into), + )) } async fn reexecute_pending_consensus_certs( @@ -1898,23 +1905,25 @@ impl SuiNode { if self.state.is_validator(&new_epoch_store) { info!("Promoting the node from fullnode to validator, starting grpc server"); - Some( - Self::construct_validator_components( - self.config.clone(), - self.state.clone(), - Arc::new(next_epoch_committee.clone()), - new_epoch_store.clone(), - self.checkpoint_store.clone(), - self.state_sync_handle.clone(), - self.randomness_handle.clone(), - weak_accumulator, - self.backpressure_manager.clone(), - self.connection_monitor_status.clone(), - &self.registry_service, - self.metrics.clone(), - ) - .await?, + let mut components = Self::construct_validator_components( + self.config.clone(), + self.state.clone(), + Arc::new(next_epoch_committee.clone()), + new_epoch_store.clone(), + self.checkpoint_store.clone(), + self.state_sync_handle.clone(), + self.randomness_handle.clone(), + weak_accumulator, + self.backpressure_manager.clone(), + self.connection_monitor_status.clone(), + &self.registry_service, + self.metrics.clone(), ) + .await?; + + components.validator_server_handle = components.validator_server_handle.start(); + + Some(components) } else { None } @@ -2042,6 +2051,30 @@ impl SuiNode { } } +enum ValidatorGrpcServerHandle { + // Mutex is only needed to make ValidatorGrpcServerHandle Send + Unstarted(Mutex>>), + #[allow(unused)] + Started(JoinHandle>), +} + +impl ValidatorGrpcServerHandle { + pub fn new(future: impl Future> + Send + 'static) -> Self { + Self::Unstarted(Mutex::new(Box::pin(future))) + } + + pub fn start(self) -> Self { + match self { + Self::Unstarted(future) => { + let future = future.into_inner(); + let handle = tokio::spawn(future); + Self::Started(handle) + } + Self::Started(_) => self, + } + } +} + /// Notify state-sync that a new list of trusted peers are now available. fn send_trusted_peer_change( config: &NodeConfig,