Skip to content

Commit

Permalink
Re-execution of pending certs must happen concurrently with consensus…
Browse files Browse the repository at this point in the history
… handling, since there may be dependencies in either direction.
  • Loading branch information
mystenmark committed Jan 29, 2025
1 parent 8e6047e commit 3e72199
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 49 deletions.
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,9 @@ impl AuthorityState {
)
.await
.tap_err(|e| info!("process_certificate failed: {e}"))
.tap_ok(|_| debug!("process_certificate succeeded"))
.tap_ok(
|(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
)
}

pub fn read_objects_for_execution(
Expand Down
19 changes: 9 additions & 10 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl CheckpointExecutor {

/// Post processing and plumbing after we executed a checkpoint. This function is guaranteed
/// to be called in the order of checkpoint sequence number.
#[instrument(level = "debug", skip_all)]
#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
async fn process_executed_checkpoint(
&self,
epoch_store: &AuthorityPerEpochStore,
Expand All @@ -447,7 +447,7 @@ impl CheckpointExecutor {
) {
// Commit all transaction effects to disk
let cache_commit = self.state.get_cache_commit();
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
debug!("committing checkpoint transactions to disk");
cache_commit
.commit_transaction_outputs(
epoch_store.epoch(),
Expand Down Expand Up @@ -1040,8 +1040,8 @@ fn extract_end_of_epoch_tx(
let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
(*change_epoch_tx.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
digests.transaction,
"state-sync should have ensured that transaction with digests {:?} exists for checkpoint: {checkpoint:?}",
digests
)
)).clone(),
epoch_store.epoch(),
Expand Down Expand Up @@ -1100,16 +1100,15 @@ fn get_unexecuted_transactions(

// Remove the change epoch transaction so that we can special case its execution.
checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
let change_epoch_tx_digest = execution_digests
let digests = execution_digests
.pop()
.expect("Final checkpoint must have at least one transaction")
.transaction;
.expect("Final checkpoint must have at least one transaction");

let change_epoch_tx = cache_reader
.get_transaction_block(&change_epoch_tx_digest)
.get_transaction_block(&digests.transaction)
.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}",
"state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {}",
checkpoint.sequence_number()
)
);
Expand Down Expand Up @@ -1138,7 +1137,7 @@ fn get_unexecuted_transactions(
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
"state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
checkpoint.sequence_number()
)
);
Expand Down
109 changes: 71 additions & 38 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use anyhow::Result;
use arc_swap::ArcSwap;
use fastcrypto_zkp::bn254::zk_login::JwkId;
use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
use futures::future::BoxFuture;
use futures::TryFutureExt;
use mysten_common::debug_fatal;
use mysten_network::server::SUI_TLS_SERVER_NAME;
use prometheus::Registry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt;
use std::future::Future;
use std::path::PathBuf;
use std::str::FromStr;
#[cfg(msim)]
Expand Down Expand Up @@ -151,7 +153,7 @@ mod handle;
pub mod metrics;

pub struct ValidatorComponents {
validator_server_handle: JoinHandle<Result<()>>,
validator_server_handle: ValidatorGrpcServerHandle,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
consensus_manager: ConsensusManager,
consensus_store_pruner: ConsensusStorePruner,
Expand Down Expand Up @@ -836,26 +838,30 @@ impl SuiNode {
let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));

let validator_components = if state.is_validator(&epoch_store) {
Self::reexecute_pending_consensus_certs(&epoch_store, &state).await;
let (components, _) = futures::join!(
Self::construct_validator_components(
config.clone(),
state.clone(),
committee,
epoch_store.clone(),
checkpoint_store.clone(),
state_sync_handle.clone(),
randomness_handle.clone(),
Arc::downgrade(&accumulator),
backpressure_manager.clone(),
connection_monitor_status.clone(),
&registry_service,
sui_node_metrics.clone(),
),
Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
);
let mut components = components?;

let components = Self::construct_validator_components(
config.clone(),
state.clone(),
committee,
epoch_store.clone(),
checkpoint_store.clone(),
state_sync_handle.clone(),
randomness_handle.clone(),
Arc::downgrade(&accumulator),
backpressure_manager.clone(),
connection_monitor_status.clone(),
&registry_service,
sui_node_metrics.clone(),
)
.await?;
// This is only needed during cold start.
components.consensus_adapter.submit_recovered(&epoch_store);

// Start the gRPC server
components.validator_server_handle = components.validator_server_handle.start();

Some(components)
} else {
None
Expand Down Expand Up @@ -1325,7 +1331,7 @@ impl SuiNode {
consensus_store_pruner: ConsensusStorePruner,
accumulator: Weak<StateAccumulator>,
backpressure_manager: Arc<BackpressureManager>,
validator_server_handle: JoinHandle<Result<()>>,
validator_server_handle: ValidatorGrpcServerHandle,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
checkpoint_metrics: Arc<CheckpointMetrics>,
sui_node_metrics: Arc<SuiNodeMetrics>,
Expand Down Expand Up @@ -1505,7 +1511,7 @@ impl SuiNode {
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
prometheus_registry: &Registry,
) -> Result<tokio::task::JoinHandle<Result<()>>> {
) -> Result<ValidatorGrpcServerHandle> {
let validator_service = ValidatorService::new(
state.clone(),
consensus_adapter,
Expand Down Expand Up @@ -1533,9 +1539,10 @@ impl SuiNode {
.map_err(|err| anyhow!(err.to_string()))?;
let local_addr = server.local_addr();
info!("Listening to traffic on {local_addr}");
let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into));

Ok(grpc_server)
Ok(ValidatorGrpcServerHandle::new(
server.serve().map_err(Into::into),
))
}

async fn reexecute_pending_consensus_certs(
Expand Down Expand Up @@ -1898,23 +1905,25 @@ impl SuiNode {
if self.state.is_validator(&new_epoch_store) {
info!("Promoting the node from fullnode to validator, starting grpc server");

Some(
Self::construct_validator_components(
self.config.clone(),
self.state.clone(),
Arc::new(next_epoch_committee.clone()),
new_epoch_store.clone(),
self.checkpoint_store.clone(),
self.state_sync_handle.clone(),
self.randomness_handle.clone(),
weak_accumulator,
self.backpressure_manager.clone(),
self.connection_monitor_status.clone(),
&self.registry_service,
self.metrics.clone(),
)
.await?,
let mut components = Self::construct_validator_components(
self.config.clone(),
self.state.clone(),
Arc::new(next_epoch_committee.clone()),
new_epoch_store.clone(),
self.checkpoint_store.clone(),
self.state_sync_handle.clone(),
self.randomness_handle.clone(),
weak_accumulator,
self.backpressure_manager.clone(),
self.connection_monitor_status.clone(),
&self.registry_service,
self.metrics.clone(),
)
.await?;

components.validator_server_handle = components.validator_server_handle.start();

Some(components)
} else {
None
}
Expand Down Expand Up @@ -2042,6 +2051,30 @@ impl SuiNode {
}
}

enum ValidatorGrpcServerHandle {
// Mutex is only needed to make ValidatorGrpcServerHandle Send
Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
#[allow(unused)]
Started(JoinHandle<Result<()>>),
}

impl ValidatorGrpcServerHandle {
pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
Self::Unstarted(Mutex::new(Box::pin(future)))
}

pub fn start(self) -> Self {
match self {
Self::Unstarted(future) => {
let future = future.into_inner();
let handle = tokio::spawn(future);
Self::Started(handle)
}
Self::Started(_) => self,
}
}
}

/// Notify state-sync that a new list of trusted peers are now available.
fn send_trusted_peer_change(
config: &NodeConfig,
Expand Down

0 comments on commit 3e72199

Please sign in to comment.