Skip to content

Commit

Permalink
[buffer manager] move epoch notification to after commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and zekun000 committed Jan 16, 2025
1 parent f69dc31 commit ea88731
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 15 deletions.
9 changes: 1 addition & 8 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::Protoc
use aptos_reliable_broadcast::{DropGuard, ReliableBroadcast};
use aptos_time_service::TimeService;
use aptos_types::{
account_address::AccountAddress, epoch_change::EpochChangeProof, epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
account_address::AccountAddress, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
};
use bytes::Bytes;
use fail::fail_point;
Expand Down Expand Up @@ -544,12 +543,6 @@ impl BufferManager {
}))
.await
.expect("Failed to send persist request");
// this needs to be done after creating the persisting request to avoid it being lost
if commit_proof.ledger_info().ends_epoch() {
self.commit_msg_tx
.send_epoch_change(EpochChangeProof::new(vec![commit_proof], false))
.await;
}
info!("Advance head to {:?}", self.buffer.head_cursor());
self.previous_commit_time = Instant::now();
return;
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/pipeline/decoupled_execution_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ pub fn prepare_phases_and_buffer_manager(
let (persisting_phase_request_tx, persisting_phase_request_rx) =
create_channel::<CountedRequest<PersistingRequest>>();
let (persisting_phase_response_tx, persisting_phase_response_rx) = create_channel();
let commit_msg_tx = Arc::new(commit_msg_tx);

let persisting_phase_processor = PersistingPhase::new(persisting_proxy);
let persisting_phase_processor = PersistingPhase::new(persisting_proxy, commit_msg_tx.clone());
let persisting_phase = PipelinePhase::new(
persisting_phase_request_rx,
Some(persisting_phase_response_tx),
Expand All @@ -120,7 +121,7 @@ pub fn prepare_phases_and_buffer_manager(
execution_wait_phase_response_rx,
signing_phase_request_tx,
signing_phase_response_rx,
Arc::new(commit_msg_tx),
commit_msg_tx,
commit_msg_rx,
persisting_phase_request_tx,
persisting_phase_response_rx,
Expand Down
24 changes: 19 additions & 5 deletions consensus/src/pipeline/persisting_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
network::NetworkSender,
pipeline::pipeline_phase::StatelessPipeline,
state_replication::{StateComputer, StateComputerCommitCallBackType},
};
use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock};
use aptos_executor_types::ExecutorResult;
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::{epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures};
use async_trait::async_trait;
use std::{
fmt::{Debug, Display, Formatter},
Expand Down Expand Up @@ -46,11 +47,18 @@ pub type PersistingResponse = ExecutorResult<Round>;

pub struct PersistingPhase {
persisting_handle: Arc<dyn StateComputer>,
commit_msg_tx: Arc<NetworkSender>,
}

impl PersistingPhase {
pub fn new(persisting_handle: Arc<dyn StateComputer>) -> Self {
Self { persisting_handle }
pub fn new(
persisting_handle: Arc<dyn StateComputer>,
commit_msg_tx: Arc<NetworkSender>,
) -> Self {
Self {
persisting_handle,
commit_msg_tx,
}
}
}

Expand All @@ -68,7 +76,7 @@ impl StatelessPipeline for PersistingPhase {
callback,
} = req;

if blocks
let response = if blocks
.last()
.expect("Blocks can't be empty")
.pipeline_enabled()
Expand All @@ -86,9 +94,15 @@ impl StatelessPipeline for PersistingPhase {
} else {
let round = commit_ledger_info.ledger_info().round();
self.persisting_handle
.commit(&blocks, commit_ledger_info, callback)
.commit(&blocks, commit_ledger_info.clone(), callback)
.await
.map(|_| round)
};
if commit_ledger_info.ledger_info().ends_epoch() {
self.commit_msg_tx
.send_epoch_change(EpochChangeProof::new(vec![commit_ledger_info], false))
.await;
}
response
}
}

0 comments on commit ea88731

Please sign in to comment.