Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4155: Unit saving pipeline #432

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.36"
aleph-bft = "^0.37"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.3"
version = "0.37.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
23 changes: 17 additions & 6 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{error, info, warn};

use crate::{
units::{UncheckedSignedUnit, Unit, UnitCoord},
Data, Hasher, NodeIndex, Round, SessionId, Signature,
Data, Hasher, NodeIndex, NodeMap, Round, SessionId, Signature,
};

const LOG_TARGET: &str = "AlephBFT-backup-loader";
Expand Down Expand Up @@ -91,7 +91,8 @@ impl<H: Hasher, D: Data, S: Signature, R: AsyncRead> BackupLoader<H, D, S, R> {
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
result.push(<UncheckedSignedUnit<H, D, S>>::decode(input)?);
// TODO(A0-4234): We should use the parents information to respond to reconstruction failures.
result.push(<(UncheckedSignedUnit<H, D, S>, NodeMap<H::Hash>)>::decode(input)?.0);
}
Ok(result)
}
Expand Down Expand Up @@ -241,15 +242,15 @@ mod tests {
use codec::Encode;
use futures::channel::oneshot;

use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature};
use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, Loader, Signature};

use crate::{
backup::BackupLoader,
units::{
create_preunits, creator_set, preunit_to_full_unit, preunit_to_unchecked_signed_unit,
UncheckedSignedUnit as GenericUncheckedSignedUnit,
UncheckedSignedUnit as GenericUncheckedSignedUnit, Unit,
},
NodeCount, NodeIndex, Round, SessionId,
NodeCount, NodeIndex, NodeMap, Round, SessionId,
};

type UncheckedSignedUnit = GenericUncheckedSignedUnit<Hasher64, Data, Signature>;
Expand Down Expand Up @@ -308,7 +309,17 @@ mod tests {
}

fn encode_all(items: Vec<UncheckedSignedUnit>) -> Vec<Vec<u8>> {
items.iter().map(|u| u.encode()).collect()
items
.iter()
.map(|u| {
(
u,
// for now encode empty parents as we ignore them anyway
NodeMap::<Hash64>::with_size(u.as_signable().control_hash().n_members()),
)
.encode()
})
.collect()
}

fn prepare_test(encoded_items: Vec<u8>) -> PrepareTestResponse<impl futures::Future> {
Expand Down
63 changes: 35 additions & 28 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::pin::Pin;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use crate::{
dag::DagUnit,
units::{UncheckedSignedUnit, WrappedUnit},
Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};
use codec::Encode;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use log::{debug, error};
Expand All @@ -10,30 +14,29 @@ const LOG_TARGET: &str = "AlephBFT-backup-saver";
/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: AsyncWrite> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> {
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: Pin<Box<W>>,
}

impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: W,
) -> BackupSaver<H, D, S, W> {
) -> BackupSaver<H, D, MK, W> {
BackupSaver {
units_from_runway,
responses_for_runway,
backup: Box::pin(backup),
}
}

pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode()).await?;
pub async fn save_unit(&mut self, unit: &DagUnit<H, D, MK>) -> Result<(), std::io::Error> {
let parents = unit.parents().clone();
let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into();
timorleph marked this conversation as resolved.
Show resolved Hide resolved
self.backup.write_all(&(unit, parents).encode()).await?;
woocash2 marked this conversation as resolved.
Show resolved Hide resolved
self.backup.flush().await
}

Expand All @@ -49,7 +52,7 @@ impl<H: Hasher, D: Data, S: Signature, W: AsyncWrite> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item).await {
if let Err(e) = self.save_unit(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down Expand Up @@ -80,16 +83,17 @@ mod tests {
StreamExt,
};

use aleph_bft_mock::{Data, Hasher64, Keychain, Saver, Signature};
use aleph_bft_mock::{Data, Hasher64, Keychain, Saver};

use crate::{
backup::BackupSaver,
units::{creator_set, preunit_to_unchecked_signed_unit, UncheckedSignedUnit},
NodeCount, NodeIndex, Terminator,
dag::ReconstructedUnit,
units::{creator_set, preunit_to_signed_unit, TestingSignedUnit},
NodeCount, Terminator,
};

type TestBackupSaver = BackupSaver<Hasher64, Data, Signature, Saver>;
type TestUnit = UncheckedSignedUnit<Hasher64, Data, Signature>;
type TestUnit = ReconstructedUnit<TestingSignedUnit>;
type TestBackupSaver = BackupSaver<Hasher64, Data, Keychain, Saver>;
struct PrepareSaverResponse<F: futures::Future> {
task: F,
units_for_saver: mpsc::UnboundedSender<TestUnit>,
Expand Down Expand Up @@ -122,6 +126,7 @@ mod tests {

#[tokio::test]
async fn test_proper_relative_responses_ordering() {
let node_count = NodeCount(5);
let PrepareSaverResponse {
task,
units_for_saver,
Expand All @@ -133,17 +138,19 @@ mod tests {
task.await;
});

let creators = creator_set(NodeCount(5));
let keychains: Vec<_> = (0..5)
.map(|id| Keychain::new(NodeCount(5), NodeIndex(id)))
let creators = creator_set(node_count);
let keychains: Vec<_> = node_count
.into_iterator()
.map(|id| Keychain::new(node_count, id))
.collect();
let units: Vec<TestUnit> = (0..5)
.map(|k| {
preunit_to_unchecked_signed_unit(
creators[k].create_unit(0).unwrap(),
let units: Vec<TestUnit> = node_count
.into_iterator()
.map(|id| {
ReconstructedUnit::initial(preunit_to_signed_unit(
creators[id.0].create_unit(0).unwrap(),
0,
&keychains[k],
)
&keychains[id.0],
))
})
.collect();

Expand Down
24 changes: 10 additions & 14 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use validation::{Error as ValidationError, Validator};

const LOG_TARGET: &str = "AlephBFT-dag";

pub type DagUnit<H, D, MK> = ReconstructedUnit<SignedUnit<H, D, MK>>;

/// The result of sending some information to the Dag.
pub struct DagResult<H: Hasher, D: Data, MK: MultiKeychain> {
/// Units added to the dag.
pub units: Vec<ReconstructedUnit<SignedUnit<H, D, MK>>>,
pub units: Vec<DagUnit<H, D, MK>>,
/// Requests for more information.
pub requests: Vec<Request<H>>,
/// Alerts raised due to encountered forks.
Expand Down Expand Up @@ -114,25 +116,16 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}

fn handle_result(&mut self, result: &DagResult<H, D, MK>) {
// just clean the validator cache of units that we are returning
for unit in &result.units {
self.validator.finished_processing(&unit.hash());
}
}

/// Add a unit to the Dag.
pub fn add_unit<U: WrappedUnit<H, Wrapped = SignedUnit<H, D, MK>>>(
&mut self,
unit: UncheckedSignedUnit<H, D, MK::Signature>,
store: &UnitStore<U>,
) -> DagResult<H, D, MK> {
let result = match self.validator.validate(unit, store) {
match self.validator.validate(unit, store) {
Ok(unit) => self.reconstruction.add_unit(unit).into(),
Err(e) => Self::handle_validation_error(e),
};
self.handle_result(&result);
result
}
}

/// Add parents of a unit to the Dag.
Expand Down Expand Up @@ -180,7 +173,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
.add_parents(unit_hash, parent_hashes)
.into(),
);
self.handle_result(&result);
result
}

Expand Down Expand Up @@ -208,10 +200,14 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Dag<H, D, MK> {
}
}
}
self.handle_result(&result);
result
}

/// Notify the dag that a unit has finished processing and can be cleared from the cache.
pub fn finished_processing(&mut self, hash: &H::Hash) {
self.validator.finished_processing(hash);
}

pub fn status(&self) -> DagStatus {
self.validator.status()
}
Expand Down
41 changes: 20 additions & 21 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
alerts::{Alert, ForkingNotification, NetworkMessage},
creation,
dag::{Dag, DagResult, DagStatus, ReconstructedUnit, Request as ReconstructionRequest},
dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest},
extension::{ExtenderUnit, Service as Extender},
handle_task_termination,
member::UnitMessage,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
{
missing_coords: HashSet<UnitCoord>,
missing_parents: HashSet<H::Hash>,
store: UnitStore<ReconstructedUnit<SignedUnit<H, D, MK>>>,
store: UnitStore<DagUnit<H, D, MK>>,
keychain: MK,
dag: Dag<H, D, MK>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
Expand All @@ -118,12 +118,12 @@ where
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
resolved_requests: Sender<Request<H>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
exiting: bool,
}
Expand Down Expand Up @@ -210,15 +210,15 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {

struct RunwayConfig<H: Hasher, D: Data, FH: FinalizationHandler<D>, MK: MultiKeychain> {
finalization_handler: FH,
backup_units_for_saver: Sender<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
units_for_extender: Sender<ExtenderUnit<H>>,
backup_units_from_saver: Receiver<UncheckedSignedUnit<H, D, MK::Signature>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
parents_for_creator: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
ordered_batch_rx: Receiver<Vec<H::Hash>>,
resolved_requests: Sender<Request<H>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
Expand Down Expand Up @@ -455,10 +455,18 @@ where
}
}

fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit<SignedUnit<H, D, MK>>) {
fn on_unit_reconstructed(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord());
if self.backup_units_for_saver.unbounded_send(unit).is_err() {
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: DagUnit<H, D, MK>) {
let unit_hash = unit.hash();
self.store.insert(unit.clone());
self.dag.finished_processing(&unit_hash);
self.resolve_missing_parents(&unit_hash);
self.resolve_missing_coord(&unit.coord());
if self
Expand All @@ -477,21 +485,12 @@ where
warn!(target: "AlephBFT-runway", "Creator channel should be open.");
self.exiting = true;
}
if self
.backup_units_for_saver
.unbounded_send(unit.unpack().into())
.is_err()
{
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: UncheckedSignedUnit<H, D, MK::Signature>) {
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone()));
let unit = unit.unpack();
self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone().into()));

if unit.as_signable().creator() == self.index() {
trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.as_signable().hash());
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit));
self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit.into()));
}
}

Expand Down
11 changes: 8 additions & 3 deletions consensus/src/testing/crash_recovery.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
testing::{init_log, spawn_honest_member, HonestMember, Network, ReconnectSender},
units::{UncheckedSignedUnit, Unit, UnitCoord},
NodeCount, NodeIndex, SpawnHandle, TaskHandle,
NodeCount, NodeIndex, NodeMap, SpawnHandle, TaskHandle,
};
use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner};
use aleph_bft_mock::{Data, Hash64, Hasher64, Router, Signature, Spawner};
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -129,7 +129,12 @@ fn verify_backup(buf: &mut &[u8]) -> HashSet<UnitCoord> {
let mut already_saved = HashSet::new();

while !buf.is_empty() {
let unit = UncheckedSignedUnit::<Hasher64, Data, Signature>::decode(buf).unwrap();
let unit = <(
UncheckedSignedUnit<Hasher64, Data, Signature>,
NodeMap<Hash64>,
)>::decode(buf)
.unwrap()
.0;
let full_unit = unit.as_signable();
let coord = full_unit.coord();
let parent_ids = &full_unit.as_pre_unit().control_hash().parents_mask;
Expand Down
Loading
Loading