From f33250e0b565e949130b137fb82083cc6037ac07 Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 27 Mar 2024 10:43:20 +0100 Subject: [PATCH] Factor unit validation out of runway This required quite a lot of other changes as well, so this is quite large. It also looks... not ideal and in places _more_ convoluted than beforehand, but completely untangling all this in a single commit would require it to be even larger. --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- consensus/src/alerts/handler.rs | 1 + consensus/src/alerts/mod.rs | 5 +- consensus/src/alerts/service.rs | 3 +- consensus/src/backup/loader.rs | 8 +- consensus/src/backup/saver.rs | 2 +- consensus/src/consensus.rs | 47 +- consensus/src/creation/creator.rs | 91 ++-- consensus/src/creation/mod.rs | 52 +-- consensus/src/lib.rs | 1 + consensus/src/member.rs | 2 +- consensus/src/reconstruction/dag.rs | 55 +-- consensus/src/reconstruction/mod.rs | 194 ++++---- consensus/src/reconstruction/parents.rs | 151 +++--- consensus/src/runway/collection.rs | 8 +- consensus/src/runway/mod.rs | 589 +++++++++++------------- consensus/src/testing/byzantine.rs | 2 +- consensus/src/testing/consensus.rs | 294 ++---------- consensus/src/testing/crash_recovery.rs | 2 +- consensus/src/testing/creation.rs | 22 +- consensus/src/testing/dag.rs | 193 +++++--- consensus/src/testing/unreliable.rs | 1 + consensus/src/units/mod.rs | 122 +++-- consensus/src/units/store.rs | 452 +++++++++--------- consensus/src/units/testing.rs | 48 +- consensus/src/units/validator.rs | 13 +- consensus/src/validation.rs | 483 +++++++++++++++++++ docs/src/internals.md | 18 +- 29 files changed, 1572 insertions(+), 1291 deletions(-) create mode 100644 consensus/src/validation.rs diff --git a/Cargo.lock b/Cargo.lock index b93ed13d..ef7c2147 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.36.1" +version = "0.36.2" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 36234e02..31fb3614 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.36.1" +version = "0.36.2" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/alerts/handler.rs b/consensus/src/alerts/handler.rs index 15a755cc..214f462e 100644 --- a/consensus/src/alerts/handler.rs +++ b/consensus/src/alerts/handler.rs @@ -1,5 +1,6 @@ use crate::{ alerts::{Alert, AlertMessage, ForkProof, ForkingNotification}, + units::Unit, Data, Hasher, Keychain, MultiKeychain, Multisigned, NodeIndex, PartialMultisignature, Recipient, SessionId, Signature, Signed, UncheckedSigned, }; diff --git a/consensus/src/alerts/mod.rs b/consensus/src/alerts/mod.rs index c70608b9..52b2f315 100644 --- a/consensus/src/alerts/mod.rs +++ b/consensus/src/alerts/mod.rs @@ -1,6 +1,7 @@ use crate::{ - units::UncheckedSignedUnit, Data, Hasher, Index, Keychain, MultiKeychain, Multisigned, - NodeIndex, PartialMultisignature, Signable, Signature, UncheckedSigned, + units::{UncheckedSignedUnit, Unit}, + Data, Hasher, Index, Keychain, MultiKeychain, Multisigned, NodeIndex, PartialMultisignature, + Signable, Signature, UncheckedSigned, }; use aleph_bft_rmc::Message as RmcMessage; use codec::{Decode, Encode}; diff --git a/consensus/src/alerts/service.rs b/consensus/src/alerts/service.rs index ae319f4a..6d452dd1 100644 --- a/consensus/src/alerts/service.rs +++ b/consensus/src/alerts/service.rs @@ -7,7 +7,7 @@ use crate::{ }; use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage}; use futures::{FutureExt, StreamExt}; -use log::{debug, error, warn}; +use log::{debug, error, trace, warn}; use std::time::Duration; const LOG_TARGET: &str = "AlephBFT-alerter"; @@ -147,6 +147,7 @@ impl Service { } fn handle_alert_from_runway(&mut self, alert: Alert) { + trace!(target: LOG_TARGET, "Handling alert {:?}.", alert); let (message, recipient, hash) = self.handler.on_own_alert(alert.clone()); self.send_message_for_network(message, recipient); if let Some(multisigned) = self.rmc_service.start_rmc(hash) { diff --git a/consensus/src/backup/loader.rs b/consensus/src/backup/loader.rs index dfe945b4..97fefb7b 100644 --- a/consensus/src/backup/loader.rs +++ b/consensus/src/backup/loader.rs @@ -10,7 +10,7 @@ use futures::{channel::oneshot, AsyncRead, AsyncReadExt}; use log::{error, info, warn}; use crate::{ - units::{UncheckedSignedUnit, UnitCoord}, + units::{UncheckedSignedUnit, Unit, UnitCoord}, Data, Hasher, NodeIndex, Round, SessionId, Signature, }; @@ -246,7 +246,7 @@ mod tests { use crate::{ backup::BackupLoader, units::{ - create_preunits, creator_set, preunit_to_unchecked_signed_unit, preunit_to_unit, + create_preunits, creator_set, preunit_to_full_unit, preunit_to_unchecked_signed_unit, UncheckedSignedUnit as GenericUncheckedSignedUnit, }, NodeCount, NodeIndex, Round, SessionId, @@ -277,14 +277,14 @@ mod tests { let units: Vec<_> = pre_units .iter() - .map(|(pre_unit, _)| preunit_to_unit(pre_unit.clone(), session_id)) + .map(|pre_unit| preunit_to_full_unit(pre_unit.clone(), session_id)) .collect(); for creator in creators.iter_mut() { creator.add_units(&units); } let mut unchecked_signed_units = Vec::with_capacity(pre_units.len()); - for ((pre_unit, _), keychain) in pre_units.into_iter().zip(keychains.iter()) { + for (pre_unit, keychain) in pre_units.into_iter().zip(keychains.iter()) { unchecked_signed_units.push(preunit_to_unchecked_signed_unit( pre_unit, session_id, keychain, )) diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index f13e25a6..7e37fefd 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -140,7 +140,7 @@ mod tests { let units: Vec = (0..5) .map(|k| { preunit_to_unchecked_signed_unit( - creators[k].create_unit(0).unwrap().0, + creators[k].create_unit(0).unwrap(), 0, &keychains[k], ) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index fc7dfacf..51f543c7 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -7,20 +7,20 @@ use log::{debug, error}; use crate::{ config::Config, - creation::{self, SignedUnitWithParents}, - extension::Service as Extender, - handle_task_termination, - reconstruction::Service as ReconstructionService, - runway::{NotificationIn, NotificationOut}, + creation, handle_task_termination, + reconstruction::{ReconstructedUnit, Request, Service as ReconstructionService}, + runway::ExplicitParents, + units::SignedUnit, Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, SpawnHandle, Terminator, }; pub struct IO> { - pub incoming_notifications: Receiver>, - pub outgoing_notifications: Sender>, - pub units_for_runway: Sender>, + pub units_from_runway: Receiver>, + pub parents_from_runway: Receiver>, + pub units_for_runway: Sender>>, + pub requests_for_runway: Sender>, + pub new_units_for_runway: Sender>, pub data_provider: DP, - pub ordered_batch_tx: Sender>, pub starting_round: oneshot::Receiver>, } @@ -33,30 +33,22 @@ pub async fn run>( ) { debug!(target: "AlephBFT", "{:?} Starting all services...", conf.node_ix()); let IO { - incoming_notifications, - outgoing_notifications, + units_from_runway, + parents_from_runway, units_for_runway, + requests_for_runway, + new_units_for_runway, data_provider, - ordered_batch_tx, starting_round, } = io; let index = conf.node_ix(); - let (electors_tx, electors_rx) = mpsc::unbounded(); - let extender = Extender::::new(index, electors_rx, ordered_batch_tx); - let extender_terminator = terminator.add_offspring_connection("AlephBFT-extender"); - let mut extender_handle = spawn_handle - .spawn_essential("consensus/extender", async move { - extender.run(extender_terminator).await - }) - .fuse(); - let (parents_for_creator, parents_from_dag) = mpsc::unbounded(); let creator_terminator = terminator.add_offspring_connection("creator"); let io = creation::IO { - outgoing_units: units_for_runway, + outgoing_units: new_units_for_runway, incoming_parents: parents_from_dag, data_provider, }; @@ -75,10 +67,11 @@ pub async fn run>( }; let reconstruction = ReconstructionService::new( - incoming_notifications, - outgoing_notifications, + units_from_runway, + parents_from_runway, + requests_for_runway, + units_for_runway, parents_for_creator, - electors_tx, ); let reconstruction_terminator = terminator.add_offspring_connection("reconstruction"); @@ -97,9 +90,6 @@ pub async fn run>( _ = creator_panic_handle.fuse() => { error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index); }, - _ = extender_handle => { - debug!(target: "AlephBFT-consensus", "{:?} extender task terminated early.", index); - } } debug!(target: "AlephBFT", "{:?} All services stopping.", index); @@ -114,7 +104,6 @@ pub async fn run>( ) .await; handle_task_termination(creator_handle, "AlephBFT-consensus", "Creator", index).await; - handle_task_termination(extender_handle, "AlephBFT-consensus", "Extender", index).await; debug!(target: "AlephBFT", "{:?} All services stopped.", index); } diff --git a/consensus/src/creation/creator.rs b/consensus/src/creation/creator.rs index 5124ea52..594f028d 100644 --- a/consensus/src/creation/creator.rs +++ b/consensus/src/creation/creator.rs @@ -27,7 +27,7 @@ impl UnitsCollector { } } - pub fn add_unit(&mut self, unit: &Unit) { + pub fn add_unit>(&mut self, unit: &U) { let node_id = unit.creator(); let hash = unit.hash(); @@ -51,18 +51,6 @@ impl UnitsCollector { } } -fn create_unit( - node_id: NodeIndex, - parents: NodeMap, - round: Round, -) -> (PreUnit, Vec) { - let control_hash = ControlHash::new(&parents); - let parent_hashes = parents.into_values().collect(); - - let new_preunit = PreUnit::new(node_id, round, control_hash); - (new_preunit, parent_hashes) -} - pub struct Creator { round_collectors: Vec>, node_id: NodeIndex, @@ -95,23 +83,24 @@ impl Creator { /// To create a new unit, we need to have at least the consensus threshold of parents available in previous round. /// Additionally, our unit from previous round must be available. - pub fn create_unit(&self, round: Round) -> Result<(PreUnit, Vec)> { - if round == 0 { - let parents = NodeMap::with_size(self.n_members); - return Ok(create_unit(self.node_id, parents, round)); - } - let prev_round = usize::from(round - 1); - - let parents = self - .round_collectors - .get(prev_round) - .ok_or(ConstraintError::NotEnoughParents)? - .prospective_parents(self.node_id)?; - - Ok(create_unit(self.node_id, parents.clone(), round)) + pub fn create_unit(&self, round: Round) -> Result> { + let parents = match round.checked_sub(1) { + None => NodeMap::with_size(self.n_members), + Some(prev_round) => self + .round_collectors + .get(usize::from(prev_round)) + .ok_or(ConstraintError::NotEnoughParents)? + .prospective_parents(self.node_id)? + .clone(), + }; + Ok(PreUnit::new( + self.node_id, + round, + ControlHash::new(&parents), + )) } - pub fn add_unit(&mut self, unit: &Unit) { + pub fn add_unit>(&mut self, unit: &U) { self.get_or_initialize_collector_for_round(unit.round()) .add_unit(unit); } @@ -122,7 +111,7 @@ mod tests { use super::{Creator as GenericCreator, UnitsCollector}; use crate::{ creation::creator::ConstraintError, - units::{create_preunits, creator_set, preunit_to_unit}, + units::{create_preunits, creator_set, preunit_to_full_unit, Unit}, NodeCount, NodeIndex, }; use aleph_bft_mock::Hasher64; @@ -136,11 +125,10 @@ mod tests { let round = 0; let creator = Creator::new(NodeIndex(0), n_members); assert_eq!(creator.current_round(), round); - let (preunit, parent_hashes) = creator + let preunit = creator .create_unit(round) .expect("Creation should succeed."); assert_eq!(preunit.round(), round); - assert_eq!(parent_hashes.len(), 0); } #[test] @@ -150,18 +138,16 @@ mod tests { let new_units = create_preunits(creators.iter(), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); - let expected_hashes: Vec<_> = new_units.iter().map(|u| u.hash()).collect(); let creator = &mut creators[0]; creator.add_units(&new_units); let round = 1; assert_eq!(creator.current_round(), 0); - let (preunit, parent_hashes) = creator + let preunit = creator .create_unit(round) .expect("Creation should succeed."); assert_eq!(preunit.round(), round); - assert_eq!(parent_hashes, expected_hashes); } fn create_unit_with_minimal_parents(n_members: NodeCount) { @@ -170,18 +156,16 @@ mod tests { let new_units = create_preunits(creators.iter().take(n_parents), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); - let expected_hashes: Vec<_> = new_units.iter().map(|u| u.hash()).collect(); let creator = &mut creators[0]; creator.add_units(&new_units); let round = 1; assert_eq!(creator.current_round(), 0); - let (preunit, parent_hashes) = creator + let preunit = creator .create_unit(round) .expect("Creation should succeed."); assert_eq!(preunit.round(), round); - assert_eq!(parent_hashes, expected_hashes); } #[test] @@ -210,7 +194,7 @@ mod tests { let new_units = create_preunits(creators.iter().take(n_parents.0), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); let creator = &mut creators[0]; creator.add_units(&new_units); @@ -243,38 +227,25 @@ mod tests { fn creates_two_units_when_possible() { let n_members = NodeCount(7); let mut creators = creator_set(n_members); - let mut expected_hashes_per_round = Vec::new(); for round in 0..2 { let new_units = create_preunits(creators.iter().skip(1), round); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); - let expected_hashes: HashSet<_> = new_units.iter().map(|u| u.hash()).collect(); for creator in creators.iter_mut() { creator.add_units(&new_units); } - expected_hashes_per_round.push(expected_hashes); } let creator = &mut creators[0]; assert_eq!(creator.current_round(), 1); for round in 0..3 { - let (preunit, parent_hashes) = creator + let preunit = creator .create_unit(round) .expect("Creation should succeed."); assert_eq!(preunit.round(), round); - let parent_hashes: HashSet<_> = parent_hashes.into_iter().collect(); - if round != 0 { - assert_eq!( - parent_hashes, - expected_hashes_per_round[(round - 1) as usize] - ); - } - let unit = preunit_to_unit(preunit, 0); + let unit = preunit_to_full_unit(preunit, 0); creator.add_unit(&unit); - if round < 2 { - expected_hashes_per_round[round as usize].insert(unit.hash()); - } } } @@ -285,7 +256,7 @@ mod tests { let new_units = create_preunits(creators.iter().skip(1), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); let creator = &mut creators[0]; creator.add_units(&new_units); @@ -300,7 +271,7 @@ mod tests { let new_units = create_preunits(creators.iter(), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); let mut units_collector = UnitsCollector::new(n_members); @@ -325,7 +296,7 @@ mod tests { let new_units = create_preunits(creators.iter().take(2), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); let mut units_collector = UnitsCollector::new(n_members); @@ -347,7 +318,7 @@ mod tests { let new_units = create_preunits(creators.iter().take(3), 0); let new_units: Vec<_> = new_units .into_iter() - .map(|(pu, _)| preunit_to_unit(pu, 0)) + .map(|pu| preunit_to_full_unit(pu, 0)) .collect(); let mut units_collector = UnitsCollector::new(n_members); diff --git a/consensus/src/creation/mod.rs b/consensus/src/creation/mod.rs index 24125b3f..e8400e07 100644 --- a/consensus/src/creation/mod.rs +++ b/consensus/src/creation/mod.rs @@ -1,7 +1,7 @@ use crate::{ config::Config, units::{PreUnit, SignedUnit, Unit}, - Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, Terminator, + Data, DataProvider, MultiKeychain, Receiver, Round, Sender, Terminator, }; use futures::{ channel::{ @@ -32,19 +32,17 @@ impl From> for CreatorError { } } -pub type SignedUnitWithParents = (SignedUnit, Vec<::Hash>); - -pub struct IO> { - pub incoming_parents: Receiver>, - pub outgoing_units: Sender>, +pub struct IO> { + pub incoming_parents: Receiver, + pub outgoing_units: Sender>, pub data_provider: DP, } -async fn create_unit( +async fn create_unit( round: Round, - creator: &mut Creator, - incoming_parents: &mut Receiver>, -) -> Result<(PreUnit, Vec), CreatorError> { + creator: &mut Creator, + incoming_parents: &mut Receiver, +) -> Result, CreatorError> { loop { match creator.create_unit(round) { Ok(unit) => return Ok(unit), @@ -58,9 +56,9 @@ async fn create_unit( /// Tries to process a single parent from given `incoming_parents` receiver. /// Returns error when `incoming_parents` channel is closed. -async fn process_unit( - creator: &mut Creator, - incoming_parents: &mut Receiver>, +async fn process_unit( + creator: &mut Creator, + incoming_parents: &mut Receiver, ) -> anyhow::Result<(), CreatorError> { let unit = incoming_parents .next() @@ -70,18 +68,18 @@ async fn process_unit( Ok(()) } -async fn keep_processing_units( - creator: &mut Creator, - incoming_parents: &mut Receiver>, +async fn keep_processing_units( + creator: &mut Creator, + incoming_parents: &mut Receiver, ) -> anyhow::Result<(), CreatorError> { loop { process_unit(creator, incoming_parents).await?; } } -async fn keep_processing_units_until( - creator: &mut Creator, - incoming_parents: &mut Receiver>, +async fn keep_processing_units_until( + creator: &mut Creator, + incoming_parents: &mut Receiver, until: Delay, ) -> anyhow::Result<(), CreatorError> { futures::select! { @@ -108,9 +106,9 @@ async fn keep_processing_units_until( /// /// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html /// Section 5.1 for a discussion of this component. -pub async fn run>( +pub async fn run>( conf: Config, - mut io: IO, + mut io: IO, keychain: MK, mut starting_round: oneshot::Receiver>, mut terminator: Terminator, @@ -126,13 +124,13 @@ pub async fn run>( } async fn read_starting_round_and_run_creator< - H: Hasher, + U: Unit, D: Data, MK: MultiKeychain, DP: DataProvider, >( conf: Config, - io: &mut IO, + io: &mut IO, keychain: MK, starting_round: &mut oneshot::Receiver>, ) { @@ -161,9 +159,9 @@ async fn read_starting_round_and_run_creator< } } -async fn run_creator>( +async fn run_creator>( conf: Config, - io: &mut IO, + io: &mut IO, keychain: MK, starting_round: Round, ) -> anyhow::Result<(), CreatorError> { @@ -190,13 +188,13 @@ async fn run_creator> keep_processing_units_until(&mut creator, incoming_parents, delay).await?; } - let (preunit, parent_hashes) = create_unit(round, &mut creator, incoming_parents).await?; + let preunit = create_unit(round, &mut creator, incoming_parents).await?; trace!(target: LOG_TARGET, "Created a new preunit {:?} at round {:?}.", preunit, round); let data = data_provider.get_data().await; trace!(target: LOG_TARGET, "Received data: {:?}.", data); let unit = packer.pack(preunit, data); - outgoing_units.unbounded_send((unit, parent_hashes))?; + outgoing_units.unbounded_send(unit)?; } warn!(target: LOG_TARGET, "Maximum round reached. Not creating another unit."); diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 704c9e98..97abfe62 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -13,6 +13,7 @@ mod reconstruction; mod runway; mod terminator; mod units; +mod validation; mod backup; mod task_queue; diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 4ff66d66..73fe67fb 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -7,7 +7,7 @@ use crate::{ RunwayNotificationOut, }, task_queue::TaskQueue, - units::{UncheckedSignedUnit, UnitCoord}, + units::{UncheckedSignedUnit, Unit, UnitCoord}, Config, Data, DataProvider, FinalizationHandler, Hasher, MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned, }; diff --git a/consensus/src/reconstruction/dag.rs b/consensus/src/reconstruction/dag.rs index a47592cc..dcd17323 100644 --- a/consensus/src/reconstruction/dag.rs +++ b/consensus/src/reconstruction/dag.rs @@ -1,17 +1,20 @@ -use crate::{reconstruction::ReconstructedUnit, Hasher}; +use crate::{ + reconstruction::ReconstructedUnit, + units::{HashFor, Unit}, +}; use std::collections::{HashMap, HashSet, VecDeque}; -struct OrphanedUnit { - unit: ReconstructedUnit, - missing_parents: HashSet, +struct OrphanedUnit { + unit: ReconstructedUnit, + missing_parents: HashSet>, } -impl OrphanedUnit { +impl OrphanedUnit { /// If there are no missing parents then returns just the internal unit. pub fn new( - unit: ReconstructedUnit, - missing_parents: HashSet, - ) -> Result> { + unit: ReconstructedUnit, + missing_parents: HashSet>, + ) -> Result> { match missing_parents.is_empty() { true => Err(unit), false => Ok(OrphanedUnit { @@ -22,7 +25,7 @@ impl OrphanedUnit { } /// If this was the last missing parent return the reconstructed unit. - pub fn resolve_parent(self, parent: H::Hash) -> Result, Self> { + pub fn resolve_parent(self, parent: HashFor) -> Result, Self> { let OrphanedUnit { unit, mut missing_parents, @@ -38,27 +41,25 @@ impl OrphanedUnit { } /// The hash of the unit. - pub fn hash(&self) -> H::Hash { + pub fn hash(&self) -> HashFor { self.unit.hash() } /// The set of still missing parents. - pub fn missing_parents(&self) -> &HashSet { + pub fn missing_parents(&self) -> &HashSet> { &self.missing_parents } } /// A structure ensuring that units added to it are output in an order /// in agreement with the DAG order. -/// TODO: This should likely be the final destination of units going through a pipeline. -/// This requires quite a bit more of a refactor. -pub struct Dag { - orphaned_units: HashMap>, - waiting_for: HashMap>, - dag_units: HashSet, +pub struct Dag { + orphaned_units: HashMap, OrphanedUnit>, + waiting_for: HashMap, Vec>>, + dag_units: HashSet>, } -impl Dag { +impl Dag { /// Create a new empty DAG. pub fn new() -> Self { Dag { @@ -68,7 +69,7 @@ impl Dag { } } - fn move_to_dag(&mut self, unit: ReconstructedUnit) -> Vec> { + fn move_to_dag(&mut self, unit: ReconstructedUnit) -> Vec> { let mut result = Vec::new(); let mut ready_units = VecDeque::from([unit]); while let Some(unit) = ready_units.pop_front() { @@ -94,7 +95,7 @@ impl Dag { /// Add a unit to the Dag. Returns all the units that now have all their parents in the Dag, /// in an order agreeing with the Dag structure. - pub fn add_unit(&mut self, unit: ReconstructedUnit) -> Vec> { + pub fn add_unit(&mut self, unit: ReconstructedUnit) -> Vec> { if self.dag_units.contains(&unit.hash()) { // Deduplicate. return Vec::new(); @@ -123,10 +124,10 @@ impl Dag { mod test { use crate::{ reconstruction::{dag::Dag, ReconstructedUnit}, - units::{random_full_parent_units_up_to, FullUnit}, + units::{random_full_parent_units_up_to, TestingFullUnit, Unit}, Hasher, NodeCount, NodeIndex, NodeMap, }; - use aleph_bft_mock::{Data, Hasher64}; + use aleph_bft_mock::Hasher64; use std::collections::HashSet; fn full_parents_to_map( @@ -141,19 +142,19 @@ mod test { // silly clippy, the map below doesn't work with &[..] #[allow(clippy::ptr_arg)] - fn unit_hashes(units: &Vec>) -> Vec<::Hash> { + fn unit_hashes(units: &Vec) -> Vec<::Hash> { units.iter().map(|unit| unit.hash()).collect() } fn reconstructed( - dag: Vec>>, - ) -> Vec>> { + dag: Vec>, + ) -> Vec>> { let hashes: Vec<_> = dag.iter().map(unit_hashes).collect(); let initial_units: Vec<_> = dag .get(0) .expect("only called on nonempty dags") .iter() - .map(|unit| ReconstructedUnit::initial(unit.unit())) + .map(|unit| ReconstructedUnit::initial(unit.clone())) .collect(); let mut result = vec![initial_units]; for (units, parents) in dag.iter().skip(1).zip(hashes) { @@ -161,7 +162,7 @@ mod test { let reconstructed = units .iter() .map(|unit| { - ReconstructedUnit::with_parents(unit.unit(), parents.clone()) + ReconstructedUnit::with_parents(unit.clone(), parents.clone()) .expect("parents are correct") }) .collect(); diff --git a/consensus/src/reconstruction/mod.rs b/consensus/src/reconstruction/mod.rs index 07f76537..51ac5b35 100644 --- a/consensus/src/reconstruction/mod.rs +++ b/consensus/src/reconstruction/mod.rs @@ -1,31 +1,34 @@ use crate::{ extension::ExtenderUnit, - runway::{NotificationIn, NotificationOut}, - units::{ControlHash, Unit}, - Hasher, NodeMap, Receiver, Sender, Terminator, + runway::ExplicitParents, + units::{ControlHash, HashFor, Unit, UnitCoord, WrappedUnit}, + NodeMap, Receiver, Sender, Terminator, }; use futures::{FutureExt, StreamExt}; -use log::{debug, warn}; +use log::{debug, trace, warn}; mod dag; mod parents; use dag::Dag; -use parents::{Reconstruction, ReconstructionResult, Request}; +pub use parents::Request; +use parents::{Reconstruction, ReconstructionResult}; const LOG_TARGET: &str = "AlephBFT-reconstruction"; /// A unit with its parents represented explicitly. #[derive(Debug, PartialEq, Eq, Clone)] -pub struct ReconstructedUnit { - unit: Unit, - parents: NodeMap, +pub struct ReconstructedUnit { + unit: U, + parents: NodeMap>, } -impl ReconstructedUnit { +impl ReconstructedUnit { /// Returns a reconstructed unit if the parents agree with the hash, errors out otherwise. - pub fn with_parents(unit: Unit, parents: NodeMap) -> Result> { - match unit.control_hash().combined_hash == ControlHash::::combine_hashes(&parents) { + pub fn with_parents(unit: U, parents: NodeMap>) -> Result { + match unit.control_hash().combined_hash + == ControlHash::::combine_hashes(&parents) + { true => Ok(ReconstructedUnit { unit, parents }), false => Err(unit), } @@ -34,7 +37,7 @@ impl ReconstructedUnit { /// Reconstructs empty parents for a round 0 unit. /// Assumes obviously incorrect units with wrong control hashes have been rejected earlier. /// Will panic if called for any other kind of unit. - pub fn initial(unit: Unit) -> Self { + pub fn initial(unit: U) -> Self { let n_members = unit.control_hash().n_members(); assert!(unit.round() == 0, "Only the zeroth unit can be initial."); ReconstructedUnit { @@ -44,20 +47,12 @@ impl ReconstructedUnit { } /// The reconstructed parents, guaranteed to be correct. - pub fn parents(&self) -> &NodeMap { + pub fn parents(&self) -> &NodeMap> { &self.parents } - /// The hash of the unit. - pub fn hash(&self) -> H::Hash { - self.unit.hash() - } - - fn unit(&self) -> Unit { - self.unit.clone() - } - - fn extender_unit(&self) -> ExtenderUnit { + /// Create an extender unit from this one. + pub fn extender_unit(&self) -> ExtenderUnit { ExtenderUnit::new( self.unit.creator(), self.unit.round(), @@ -67,109 +62,84 @@ impl ReconstructedUnit { } } +impl Unit for ReconstructedUnit { + type Hasher = U::Hasher; + + fn hash(&self) -> HashFor { + self.unit.hash() + } + + fn coord(&self) -> UnitCoord { + self.unit.coord() + } + + fn control_hash(&self) -> &ControlHash { + self.unit.control_hash() + } +} + +impl WrappedUnit for ReconstructedUnit { + type Wrapped = U; + + fn unpack(self) -> U { + self.unit + } +} + /// The service responsible for reconstructing the structure of the Dag. /// Receives units containing control hashes and eventually outputs versions /// with explicit parents in an order conforming to the Dag order. -pub struct Service { - reconstruction: Reconstruction, - dag: Dag, - notifications_from_runway: Receiver>, - notifications_for_runway: Sender>, - units_for_creator: Sender>, - units_for_extender: Sender>, -} - -enum Output { - Unit(ReconstructedUnit), - Request(Request), +pub struct Service { + reconstruction: Reconstruction, + dag: Dag, + units_from_runway: Receiver, + parents_from_runway: Receiver>, + requests_for_runway: Sender>, + units_for_runway: Sender>, + units_for_creator: Sender>, } -impl Service { +impl Service { /// Create a new reconstruction service with the provided IO channels. pub fn new( - notifications_from_runway: Receiver>, - notifications_for_runway: Sender>, - units_for_creator: Sender>, - units_for_extender: Sender>, + units_from_runway: Receiver, + parents_from_runway: Receiver>, + requests_for_runway: Sender>, + units_for_runway: Sender>, + units_for_creator: Sender>, ) -> Self { let reconstruction = Reconstruction::new(); let dag = Dag::new(); Service { reconstruction, dag, - notifications_from_runway, - notifications_for_runway, + units_from_runway, + parents_from_runway, + requests_for_runway, + units_for_runway, units_for_creator, - units_for_extender, } } fn handle_reconstruction_result( &mut self, - reconstruction_result: ReconstructionResult, - ) -> Vec> { - use Output::*; - let mut result = Vec::new(); + reconstruction_result: ReconstructionResult, + ) -> bool { let (units, requests) = reconstruction_result.into(); - result.append(&mut requests.into_iter().map(Request).collect()); - for unit in units { - result.append(&mut self.dag.add_unit(unit).into_iter().map(Unit).collect()); - } - result - } - - fn handle_notification(&mut self, notification: NotificationIn) -> Vec> { - use NotificationIn::*; - let mut result = Vec::new(); - match notification { - NewUnits(units) => { - for unit in units { - let reconstruction_result = self.reconstruction.add_unit(unit); - result.append(&mut self.handle_reconstruction_result(reconstruction_result)); - } - } - UnitParents(unit, parents) => { - let reconstruction_result = self.reconstruction.add_parents(unit, parents); - result.append(&mut self.handle_reconstruction_result(reconstruction_result)) + trace!(target: LOG_TARGET, "Reconstructed {} units, and have {} requests.", units.len(), requests.len()); + for request in requests { + if self.requests_for_runway.unbounded_send(request).is_err() { + warn!(target: LOG_TARGET, "Request channel should be open."); + return false; } } - result - } - - fn handle_output(&mut self, output: Output) -> bool { - use Output::*; - match output { - Request(request) => { - if self - .notifications_for_runway - .unbounded_send(request.into()) - .is_err() - { - warn!(target: LOG_TARGET, "Notification channel should be open."); - return false; - } - } - Unit(unit) => { - if self.units_for_creator.unbounded_send(unit.unit()).is_err() { + for unit in units { + for unit in self.dag.add_unit(unit) { + if self.units_for_creator.unbounded_send(unit.clone()).is_err() { warn!(target: LOG_TARGET, "Creator channel should be open."); return false; } - if self - .units_for_extender - .unbounded_send(unit.extender_unit()) - .is_err() - { - warn!(target: LOG_TARGET, "Extender channel should be open."); - return false; - } - if self - .notifications_for_runway - .unbounded_send(NotificationOut::AddedToDag( - unit.hash(), - unit.parents().clone().into_values().collect(), - )) - .is_err() - { + if self.units_for_runway.unbounded_send(unit).is_err() { warn!(target: LOG_TARGET, "Notification channel should be open."); return false; } @@ -181,15 +151,18 @@ impl Service { /// Run the reconstruction service until terminated. pub async fn run(mut self, mut terminator: Terminator) { loop { - futures::select! { - n = self.notifications_from_runway.next() => match n { - Some(notification) => for output in self.handle_notification(notification) { - if !self.handle_output(output) { - return; - } - }, + let reconstruction_result = futures::select! { + unit = self.units_from_runway.next() => match unit { + Some(unit) => self.reconstruction.add_unit(unit), + None => { + warn!(target: LOG_TARGET, "Units for reconstruction unexpectedly ended."); + return; + } + }, + parents = self.parents_from_runway.next() => match parents { + Some((unit, parents)) => self.reconstruction.add_parents(unit, parents), None => { - warn!(target: LOG_TARGET, "Notifications for reconstruction unexpectedly ended."); + warn!(target: LOG_TARGET, "Parents for reconstruction unexpectedly ended."); return; } }, @@ -197,6 +170,9 @@ impl Service { debug!(target: LOG_TARGET, "Received exit signal."); break; } + }; + if !self.handle_reconstruction_result(reconstruction_result) { + return; } } debug!(target: LOG_TARGET, "Reconstruction decided to exit."); diff --git a/consensus/src/reconstruction/parents.rs b/consensus/src/reconstruction/parents.rs index 2c25b0af..1692806a 100644 --- a/consensus/src/reconstruction/parents.rs +++ b/consensus/src/reconstruction/parents.rs @@ -1,29 +1,28 @@ use crate::{ reconstruction::ReconstructedUnit, - runway::NotificationOut, - units::{ControlHash, Unit, UnitCoord}, + units::{ControlHash, HashFor, Unit, UnitCoord}, Hasher, NodeIndex, NodeMap, }; use std::collections::{hash_map::Entry, HashMap}; /// A unit in the process of reconstructing its parents. #[derive(Debug, PartialEq, Eq, Clone)] -enum ReconstructingUnit { +enum ReconstructingUnit { /// We are trying to optimistically reconstruct the unit from potential parents we get. - Reconstructing(Unit, NodeMap), + Reconstructing(U, NodeMap>), /// We are waiting for receiving an explicit list of unit parents. - WaitingForParents(Unit), + WaitingForParents(U), } -enum SingleParentReconstructionResult { - Reconstructed(ReconstructedUnit), - InProgress(ReconstructingUnit), - RequestParents(ReconstructingUnit), +enum SingleParentReconstructionResult { + Reconstructed(ReconstructedUnit), + InProgress(ReconstructingUnit), + RequestParents(ReconstructingUnit), } -impl ReconstructingUnit { +impl ReconstructingUnit { /// Produces a new reconstructing unit and a list of coordinates of parents we need for the reconstruction. Will panic if called for units of round 0. - fn new(unit: Unit) -> (Self, Vec) { + fn new(unit: U) -> (Self, Vec) { let n_members = unit.control_hash().n_members(); let round = unit.round(); assert!( @@ -44,8 +43,8 @@ impl ReconstructingUnit { fn reconstruct_parent( self, parent_id: NodeIndex, - parent_hash: H::Hash, - ) -> SingleParentReconstructionResult { + parent_hash: HashFor, + ) -> SingleParentReconstructionResult { use ReconstructingUnit::*; use SingleParentReconstructionResult::*; match self { @@ -66,24 +65,34 @@ impl ReconstructingUnit { } } - fn control_hash(&self) -> &ControlHash { + fn control_hash(&self) -> &ControlHash { self.as_unit().control_hash() } - fn as_unit(&self) -> &Unit { + fn as_unit(&self) -> &U { use ReconstructingUnit::*; match self { Reconstructing(unit, _) | WaitingForParents(unit) => unit, } } - fn with_parents(self, parent_hashes: Vec) -> Result, Self> { - let control_hash = self.control_hash(); - let mut parents = NodeMap::with_size(control_hash.n_members()); - for (parent_id, parent_hash) in control_hash.parents().zip(parent_hashes.into_iter()) { - parents.insert(parent_id, parent_hash); + fn with_parents( + self, + parents: HashMap>, + ) -> Result, Self> { + let control_hash = self.control_hash().clone(); + if parents.len() != control_hash.parents().count() { + return Err(self); + } + let mut parents_map = NodeMap::with_size(control_hash.n_members()); + for parent_id in control_hash.parents() { + match parents.get(&UnitCoord::new(self.as_unit().round() - 1, parent_id)) { + Some(parent_hash) => parents_map.insert(parent_id, *parent_hash), + // The parents were inconsistent with the control hash. + None => return Err(self), + } } - ReconstructedUnit::with_parents(self.as_unit().clone(), parents).map_err(|_| self) + ReconstructedUnit::with_parents(self.as_unit().clone(), parents_map).map_err(|_| self) } } @@ -97,27 +106,15 @@ pub enum Request { ParentsOf(H::Hash), } -impl From> for NotificationOut { - fn from(request: Request) -> Self { - use NotificationOut::*; - use Request::*; - match request { - // This is a tad weird, but should get better after the runway refactor. - Coord(coord) => MissingUnits(vec![coord]), - ParentsOf(unit) => WrongControlHash(unit), - } - } -} - /// The result of a reconstruction attempt. Might contain multiple reconstructed units, /// as well as requests for some data that is needed for further reconstruction. #[derive(Debug, PartialEq, Eq)] -pub struct ReconstructionResult { - reconstructed_units: Vec>, - requests: Vec>, +pub struct ReconstructionResult { + reconstructed_units: Vec>, + requests: Vec>, } -impl ReconstructionResult { +impl ReconstructionResult { fn new() -> Self { ReconstructionResult { reconstructed_units: Vec::new(), @@ -125,29 +122,29 @@ impl ReconstructionResult { } } - fn reconstructed(unit: ReconstructedUnit) -> Self { + fn reconstructed(unit: ReconstructedUnit) -> Self { ReconstructionResult { reconstructed_units: vec![unit], requests: Vec::new(), } } - fn request(request: Request) -> Self { + fn request(request: Request) -> Self { ReconstructionResult { reconstructed_units: Vec::new(), requests: vec![request], } } - fn add_unit(&mut self, unit: ReconstructedUnit) { + fn add_unit(&mut self, unit: ReconstructedUnit) { self.reconstructed_units.push(unit); } - fn add_request(&mut self, request: Request) { + fn add_request(&mut self, request: Request) { self.requests.push(request); } - fn accumulate(&mut self, other: ReconstructionResult) { + fn accumulate(&mut self, other: ReconstructionResult) { let ReconstructionResult { mut reconstructed_units, mut requests, @@ -157,8 +154,10 @@ impl ReconstructionResult { } } -impl From> for (Vec>, Vec>) { - fn from(result: ReconstructionResult) -> Self { +impl From> + for (Vec>, Vec>) +{ + fn from(result: ReconstructionResult) -> Self { let ReconstructionResult { reconstructed_units, requests, @@ -168,13 +167,13 @@ impl From> for (Vec>, Ve } /// Receives units with control hashes and reconstructs their parents. -pub struct Reconstruction { - reconstructing_units: HashMap>, - units_by_coord: HashMap, - waiting_for_coord: HashMap>, +pub struct Reconstruction { + reconstructing_units: HashMap, ReconstructingUnit>, + units_by_coord: HashMap>, + waiting_for_coord: HashMap>>, } -impl Reconstruction { +impl Reconstruction { /// A new parent reconstruction widget. pub fn new() -> Self { Reconstruction { @@ -186,10 +185,10 @@ impl Reconstruction { fn reconstruct_parent( &mut self, - child_hash: H::Hash, + child_hash: HashFor, parent_id: NodeIndex, - parent_hash: H::Hash, - ) -> ReconstructionResult { + parent_hash: HashFor, + ) -> ReconstructionResult { use SingleParentReconstructionResult::*; match self.reconstructing_units.remove(&child_hash) { Some(child) => match child.reconstruct_parent(parent_id, parent_hash) { @@ -211,7 +210,7 @@ impl Reconstruction { } /// Add a unit and start reconstructing its parents. - pub fn add_unit(&mut self, unit: Unit) -> ReconstructionResult { + pub fn add_unit(&mut self, unit: U) -> ReconstructionResult { let mut result = ReconstructionResult::new(); let unit_hash = unit.hash(); if self.reconstructing_units.contains_key(&unit_hash) { @@ -267,9 +266,9 @@ impl Reconstruction { /// Add an explicit list of a units' parents, perhaps reconstructing it. pub fn add_parents( &mut self, - unit_hash: H::Hash, - parents: Vec, - ) -> ReconstructionResult { + unit_hash: HashFor, + parents: HashMap>, + ) -> ReconstructionResult { // If we don't have the unit, just ignore this response. match self.reconstructing_units.remove(&unit_hash) { Some(unit) => match unit.with_parents(parents) { @@ -286,12 +285,14 @@ impl Reconstruction { #[cfg(test)] mod test { + use std::collections::HashMap; + use crate::{ reconstruction::{ parents::{Reconstruction, Request}, ReconstructedUnit, }, - units::{random_full_parent_units_up_to, UnitCoord}, + units::{random_full_parent_units_up_to, Unit, UnitCoord}, NodeCount, NodeIndex, }; @@ -299,12 +300,11 @@ mod test { fn reconstructs_initial_units() { let mut reconstruction = Reconstruction::new(); for unit in &random_full_parent_units_up_to(0, NodeCount(4), 43)[0] { - let unit = unit.unit(); let (mut reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); assert!(requests.is_empty()); assert_eq!(reconstructed_units.len(), 1); let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); - assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit)); + assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit.clone())); assert_eq!(reconstructed_unit.parents().item_count(), 0); } } @@ -315,7 +315,6 @@ mod test { let dag = random_full_parent_units_up_to(7, NodeCount(4), 43); for units in &dag { for unit in units { - let unit = unit.unit(); let round = unit.round(); let (mut reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); @@ -324,7 +323,7 @@ mod test { let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); match round { 0 => { - assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit)); + assert_eq!(reconstructed_unit, ReconstructedUnit::initial(unit.clone())); assert_eq!(reconstructed_unit.parents().item_count(), 0); } round => { @@ -351,8 +350,7 @@ mod test { .get(1) .expect("just created") .last() - .expect("we have a unit") - .unit(); + .expect("we have a unit"); let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); assert!(reconstructed_units.is_empty()); assert_eq!(requests.len(), 4); @@ -363,15 +361,13 @@ mod test { let mut reconstruction = Reconstruction::new(); let dag = random_full_parent_units_up_to(1, NodeCount(4), 43); for unit in dag.get(0).expect("just created").iter().skip(1) { - let unit = unit.unit(); reconstruction.add_unit(unit.clone()); } let unit = dag .get(1) .expect("just created") .last() - .expect("we have a unit") - .unit(); + .expect("we have a unit"); let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); assert!(reconstructed_units.is_empty()); assert_eq!(requests.len(), 1); @@ -387,7 +383,6 @@ mod test { let mut dag = random_full_parent_units_up_to(7, NodeCount(4), 43); dag.reverse(); for unit in dag.get(0).expect("we have the top units") { - let unit = unit.unit(); let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); assert!(reconstructed_units.is_empty()); assert_eq!(requests.len(), 4); @@ -396,12 +391,10 @@ mod test { for mut units in dag.into_iter().skip(1) { let last_unit = units.pop().expect("we have the unit"); for unit in units { - let unit = unit.unit(); let (reconstructed_units, _) = reconstruction.add_unit(unit.clone()).into(); total_reconstructed += reconstructed_units.len(); } - let unit = last_unit.unit(); - let (reconstructed_units, _) = reconstruction.add_unit(unit.clone()).into(); + let (reconstructed_units, _) = reconstruction.add_unit(last_unit.clone()).into(); total_reconstructed += reconstructed_units.len(); assert!(reconstructed_units.len() >= 4); } @@ -413,7 +406,6 @@ mod test { let mut reconstruction = Reconstruction::new(); let dag = random_full_parent_units_up_to(0, NodeCount(4), 43); for unit in dag.get(0).expect("just created") { - let unit = unit.unit(); reconstruction.add_unit(unit.clone()); } let other_dag = random_full_parent_units_up_to(1, NodeCount(4), 43); @@ -421,8 +413,7 @@ mod test { .get(1) .expect("just created") .last() - .expect("we have a unit") - .unit(); + .expect("we have a unit"); let unit_hash = unit.hash(); let (reconstructed_units, requests) = reconstruction.add_unit(unit.clone()).into(); assert!(reconstructed_units.is_empty()); @@ -431,11 +422,11 @@ mod test { requests.last().expect("just checked"), &Request::ParentsOf(unit_hash), ); - let parent_hashes: Vec<_> = other_dag + let parent_hashes: HashMap<_, _> = other_dag .get(0) .expect("other dag has initial units") .iter() - .map(|unit| unit.hash()) + .map(|unit| (unit.coord(), unit.hash())) .collect(); let (mut reconstructed_units, requests) = reconstruction .add_parents(unit_hash, parent_hashes.clone()) @@ -444,11 +435,11 @@ mod test { assert_eq!(reconstructed_units.len(), 1); let reconstructed_unit = reconstructed_units.pop().expect("just checked its there"); assert_eq!(reconstructed_unit.parents().item_count(), 4); - for (parent, reconstructed_parent) in parent_hashes - .iter() - .zip(reconstructed_unit.parents().values()) - { - assert_eq!(parent, reconstructed_parent); + for (coord, parent_hash) in parent_hashes { + assert_eq!( + Some(&parent_hash), + reconstructed_unit.parents().get(coord.creator()) + ); } } } diff --git a/consensus/src/runway/collection.rs b/consensus/src/runway/collection.rs index b03392b7..8de52016 100644 --- a/consensus/src/runway/collection.rs +++ b/consensus/src/runway/collection.rs @@ -1,6 +1,6 @@ use crate::{ runway::Request, - units::{UncheckedSignedUnit, ValidationError, Validator}, + units::{UncheckedSignedUnit, Unit, ValidationError, Validator}, Data, Hasher, Keychain, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signable, Signature, SignatureError, UncheckedSigned, }; @@ -456,7 +456,7 @@ mod tests { let creator = Creator::new(creator_id, n_members); let validator = Validator::new(session_id, *keychain, max_round); let (mut collection, salt) = Collection::new(keychain, &validator); - let (preunit, _) = creator.create_unit(0).expect("Creation should succeed."); + let preunit = creator.create_unit(0).expect("Creation should succeed."); let unit = preunit_to_unchecked_signed_unit(preunit, session_id, keychain); let responses = create_responses( keychains @@ -519,7 +519,7 @@ mod tests { let creator = Creator::new(creator_id, n_members); let validator = Validator::new(session_id, *keychain, max_round); let (mut collection, salt) = Collection::new(keychain, &validator); - let (preunit, _) = creator.create_unit(0).expect("Creation should succeed."); + let preunit = creator.create_unit(0).expect("Creation should succeed."); let unit = preunit_to_unchecked_signed_unit(preunit, wrong_session_id, keychain); let responses = create_responses( keychains.iter().skip(1).zip(repeat(Some(unit))), @@ -547,7 +547,7 @@ mod tests { let creator = Creator::new(other_creator_id, n_members); let validator = Validator::new(session_id, *keychain, max_round); let (mut collection, salt) = Collection::new(keychain, &validator); - let (preunit, _) = creator.create_unit(0).expect("Creation should succeed."); + let preunit = creator.create_unit(0).expect("Creation should succeed."); let unit = preunit_to_unchecked_signed_unit(preunit, session_id, &keychains[1]); let responses = create_responses( keychains.iter().skip(1).zip(repeat(Some(unit))), diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 73740882..4564f9d0 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -1,16 +1,18 @@ use crate::{ - alerts::{Alert, ForkProof, ForkingNotification, NetworkMessage}, + alerts::{Alert, ForkingNotification, NetworkMessage}, consensus, - creation::SignedUnitWithParents, + extension::{ExtenderUnit, Service as Extender}, handle_task_termination, member::UnitMessage, + reconstruction::{ReconstructedUnit, Request as ReconstructionRequest}, units::{ - ControlHash, SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, - Validator, + SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, + Validator as UnitValidator, WrappedUnit, }, + validation::{Error as ValidationError, Validator, ValidatorStatus}, Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain, - NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, Signed, SpawnHandle, - Terminator, UncheckedSigned, + NodeIndex, Receiver, Round, Sender, Signature, Signed, SpawnHandle, Terminator, + UncheckedSigned, }; use aleph_bft_types::Recipient; use futures::AsyncWrite; @@ -21,7 +23,14 @@ use futures::{ use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; -use std::{collections::HashSet, convert::TryFrom, fmt, marker::PhantomData, time::Duration}; +use std::{ + cmp::max, + collections::{HashMap, HashSet}, + convert::TryFrom, + fmt::{Display, Formatter, Result as FmtResult}, + marker::PhantomData, + time::Duration, +}; mod collection; @@ -30,27 +39,7 @@ use crate::backup::{BackupLoader, BackupSaver}; use collection::{Collection, IO as CollectionIO}; pub use collection::{NewestUnitResponse, Salt}; -/// Type for incoming notifications: Runway to Consensus. -#[derive(Clone, Eq, PartialEq)] -pub(crate) enum NotificationIn { - /// A notification carrying units. This might come either from multicast or - /// from a response to a request. This is of no importance at this layer. - NewUnits(Vec>), - /// Response to a request to decode parents when the control hash is wrong. - UnitParents(H::Hash, Vec), -} - -/// Type for outgoing notifications: Consensus to Runway. -#[derive(Debug, Eq, PartialEq)] -pub(crate) enum NotificationOut { - /// Notification that some units are needed but missing. The role of the Member - /// is to fetch these unit (somehow). - MissingUnits(Vec), - /// Notification that Consensus has parents incompatible with the control hash. - WrongControlHash(H::Hash), - /// Notification that a new unit has been added to the DAG, list of decoded parents provided - AddedToDag(H::Hash, Vec), -} +pub type ExplicitParents = (::Hash, HashMap::Hash>); /// Possible requests for information from other nodes. pub enum Request { @@ -123,45 +112,42 @@ where { missing_coords: HashSet, missing_parents: HashSet, - store: UnitStore, + store: UnitStore>>, keychain: MK, - validator: Validator, + validator: Validator, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, unit_messages_from_network: Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, resolved_requests: Sender>, - tx_consensus: Sender>, - rx_consensus: Receiver>, + units_for_reconstruction: Sender>, + parents_for_reconstruction: Sender>, + units_from_reconstruction: Receiver>>, + requests_from_reconstruction: Receiver>, ordered_batch_rx: Receiver>, finalization_handler: FH, backup_units_for_saver: Sender>, + units_for_extender: Sender>, backup_units_from_saver: Receiver>, - signed_units_from_creation: Receiver>, + new_units_from_creation: Receiver>, exiting: bool, } struct RunwayStatus<'a, H: Hasher> { - status: UnitStoreStatus<'a>, missing_coords: &'a HashSet, missing_parents: &'a HashSet, + validator_status: ValidatorStatus, + dag_status: UnitStoreStatus, } impl<'a, H: Hasher> RunwayStatus<'a, H> { - fn new( - status: UnitStoreStatus<'a>, - missing_coords: &'a HashSet, - missing_parents: &'a HashSet, - ) -> Self { - Self { - status, - missing_coords, - missing_parents, - } - } - - fn short_report(rounds_behind: Round, missing_coords: usize) -> String { + fn short_report(&self) -> String { + let rounds_behind = max( + self.validator_status.top_round(), + self.dag_status.top_round(), + ) - self.dag_status.top_round(); + let missing_coords = self.missing_coords.len(); match (rounds_behind, missing_coords) { (0..=2, 0) => "healthy".to_string(), (0..=2, 1..) => format!("syncing - missing {missing_coords} unit(s)"), @@ -207,14 +193,9 @@ impl<'a, H: Hasher> RunwayStatus<'a, H> { } } -impl<'a, H: Hasher> fmt::Display for RunwayStatus<'a, H> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "Runway status report: {}", - Self::short_report(self.status.rounds_behind(), self.missing_coords.len()) - )?; - write!(f, ". {}", self.status)?; +impl<'a, H: Hasher> Display for RunwayStatus<'a, H> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "Runway status report: {}", self.short_report())?; if !self.missing_coords.is_empty() { let v_coords: Vec<(usize, Round)> = self .missing_coords @@ -236,20 +217,22 @@ impl<'a, H: Hasher> fmt::Display for RunwayStatus<'a, H> { } struct RunwayConfig, MK: MultiKeychain> { - max_round: Round, finalization_handler: FH, backup_units_for_saver: Sender>, + units_for_extender: Sender>, backup_units_from_saver: Receiver>, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, - tx_consensus: Sender>, - rx_consensus: Receiver>, + units_for_reconstruction: Sender>, + parents_for_reconstruction: Sender>, + units_from_reconstruction: Receiver>>, + requests_from_reconstruction: Receiver>, unit_messages_from_network: Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, ordered_batch_rx: Receiver>, resolved_requests: Sender>, - signed_units_from_creation: Receiver>, + new_units_from_creation: Receiver>, } impl Runway @@ -259,25 +242,28 @@ where FH: FinalizationHandler, MK: MultiKeychain, { - fn new(config: RunwayConfig, keychain: MK, validator: Validator) -> Self { + fn new(config: RunwayConfig, keychain: MK, validator: UnitValidator) -> Self { let n_members = keychain.node_count(); let RunwayConfig { - max_round, finalization_handler, backup_units_for_saver, + units_for_extender, backup_units_from_saver, alerts_for_alerter, notifications_from_alerter, - tx_consensus, - rx_consensus, + units_for_reconstruction, + parents_for_reconstruction, + units_from_reconstruction, + requests_from_reconstruction, unit_messages_from_network, unit_messages_for_network, responses_for_collection, ordered_batch_rx, resolved_requests, - signed_units_from_creation, + new_units_from_creation, } = config; - let store = UnitStore::new(n_members, max_round); + let store = UnitStore::new(n_members); + let validator = Validator::new(validator); Runway { store, @@ -290,14 +276,17 @@ where notifications_from_alerter, unit_messages_from_network, unit_messages_for_network, - tx_consensus, - rx_consensus, + units_for_reconstruction, + parents_for_reconstruction, + units_from_reconstruction, + requests_from_reconstruction, ordered_batch_rx, finalization_handler, backup_units_for_saver, + units_for_extender, backup_units_from_saver, responses_for_collection, - signed_units_from_creation, + new_units_from_creation, exiting: false, } } @@ -306,15 +295,18 @@ where self.keychain.index() } - fn node_count(&self) -> NodeCount { - self.keychain.node_count() + fn on_unit_received(&mut self, unit: UncheckedSignedUnit) { + match self.validator.validate(unit, &self.store) { + Ok(unit) => self.send_unit_for_reconstruction(unit), + Err(e) => self.handle_validation_error(e), + } } fn on_unit_message(&mut self, message: RunwayNotificationIn) { match message { RunwayNotificationIn::NewUnit(u) => { trace!(target: "AlephBFT-runway", "{:?} New unit received {:?}.", self.index(), &u); - self.on_unit_received(u, false) + self.on_unit_received(u) } RunwayNotificationIn::Request(request, node_id) => match request { @@ -335,7 +327,7 @@ where RunwayNotificationIn::Response(res) => match res { Response::Coord(u) => { trace!(target: "AlephBFT-runway", "{:?} Fetch response received {:?}.", self.index(), &u); - self.on_unit_received(u, false) + self.on_unit_received(u) } Response::Parents(u_hash, parents) => { trace!(target: "AlephBFT-runway", "{:?} Response parents received {:?}.", self.index(), u_hash); @@ -352,117 +344,63 @@ where } } - fn on_unit_received(&mut self, uu: UncheckedSignedUnit, alert: bool) { - match self.validator.validate_unit(uu) { - Ok(su) => { - self.resolve_missing_coord(&su.as_signable().coord()); - if alert { - // Units from alerts explicitly come from forkers, and we want them anyway. - self.store.add_unit(su, true); - } else { - self.add_unit_to_store_unless_fork(su); - } - } - Err(e) => warn!(target: "AlephBFT-member", "Received unit failing validation: {}", e), - } - } - fn resolve_missing_coord(&mut self, coord: &UnitCoord) { if self.missing_coords.remove(coord) { self.send_resolved_request_notification(Request::Coord(*coord)); } } - fn add_unit_to_store_unless_fork(&mut self, su: SignedUnit) { - let full_unit = su.as_signable(); - trace!(target: "AlephBFT-member", "{:?} Adding member unit to store {:?}", self.index(), full_unit); - if self.store.is_forker(full_unit.creator()) { - trace!(target: "AlephBFT-member", "{:?} Ignoring forker's unit {:?}", self.index(), full_unit); - return; - } - - if let Some(sv) = self.store.is_new_fork(full_unit) { - let creator = full_unit.creator(); - if !self.store.is_forker(creator) { - // We need to mark the forker if it is not known yet. - let proof = (su.into(), sv.into()); - self.on_new_forker_detected(creator, proof); - } - // We ignore this unit. If it is legit, it will arrive in some alert and we need to wait anyway. - // There is no point in keeping this unit in any kind of buffer. - return; - } - - self.store.add_unit(su, false); - } - - fn on_new_forker_detected(&mut self, forker: NodeIndex, proof: ForkProof) { - let alerted_units = self.store.mark_forker(forker); - let alert = self.form_alert(proof, alerted_units); - if self.alerts_for_alerter.unbounded_send(alert).is_err() { - warn!(target: "AlephBFT-runway", "{:?} Channel to alerter should be open", self.index()); - self.exiting = true; - } - } - - fn form_alert( - &self, - proof: ForkProof, - units: Vec>, - ) -> Alert { - Alert::new( - self.index(), - proof, - units.into_iter().map(|signed| signed.into()).collect(), - ) - } - fn on_request_coord(&mut self, node_id: NodeIndex, coord: UnitCoord) { debug!(target: "AlephBFT-runway", "{:?} Received fetch request for coord {:?} from {:?}.", self.index(), coord, node_id); - let maybe_su = (self.store.unit_by_coord(coord)).cloned(); - - if let Some(su) = maybe_su { - trace!(target: "AlephBFT-runway", "{:?} Answering fetch request for coord {:?} from {:?}.", self.index(), coord, node_id); - self.send_message_for_network(RunwayNotificationOut::Response( - Response::Coord(su.into()), - node_id, - )); - } else { - trace!(target: "AlephBFT-runway", "{:?} Not answering fetch request for coord {:?}. Unit not in store.", self.index(), coord); + match self.store.canonical_unit(coord).cloned() { + Some(su) => { + trace!(target: "AlephBFT-runway", "{:?} Answering fetch request for coord {:?} from {:?}.", self.index(), coord, node_id); + self.send_message_for_network(RunwayNotificationOut::Response( + Response::Coord(su.unpack().into()), + node_id, + )); + } + None => { + trace!(target: "AlephBFT-runway", "{:?} Not answering fetch request for coord {:?}. Unit not in store.", self.index(), coord); + } } } fn on_request_parents(&mut self, node_id: NodeIndex, u_hash: H::Hash) { debug!(target: "AlephBFT-runway", "{:?} Received parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id); - if let Some(p_hashes) = self.store.get_parents(u_hash) { - let p_hashes = p_hashes.clone(); - trace!(target: "AlephBFT-runway", "{:?} Answering parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id); - let mut full_units = Vec::new(); - for hash in p_hashes.iter() { - if let Some(fu) = self.store.unit_by_hash(hash) { - full_units.push(fu.clone().into()); - } else { - debug!(target: "AlephBFT-runway", "{:?} Not answering parents request, one of the parents missing from store.", self.index()); - //This can happen if we got a parents response from someone, but one of the units was a fork and we dropped it. - //Either this parent is legit and we will soon get it in alert or the parent is not legit in which case - //the unit u, whose parents are beeing seeked here is not legit either. - //In any case, if a node added u to its Dag, then it should never reach this place in code when answering - //a parents request (as all the parents must be legit an thus must be in store). - return; - } + match self.store.unit(&u_hash) { + Some(unit) => { + trace!(target: "AlephBFT-runway", "{:?} Answering parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id); + let parents = unit + .parents() + .values() + .map(|parent_hash| { + self.store + .unit(parent_hash) + .expect("We add units to the store in order.") + .clone() + .unpack() + .into_unchecked() + }) + .collect(); + self.send_message_for_network(RunwayNotificationOut::Response( + Response::Parents(u_hash, parents), + node_id, + )); + } + None => { + trace!(target: "AlephBFT-runway", "{:?} Not answering parents request for hash {:?}. Unit not in DAG yet.", self.index(), u_hash); } - self.send_message_for_network(RunwayNotificationOut::Response( - Response::Parents(u_hash, full_units), - node_id, - )); - } else { - trace!(target: "AlephBFT-runway", "{:?} Not answering parents request for hash {:?}. Unit not in DAG yet.", self.index(), u_hash); } } fn on_request_newest(&mut self, requester: NodeIndex, salt: u64) { - let unit = self.store.newest_unit(requester); + let unit = self + .store + .canonical_units(requester) + .last() + .map(|unit| unit.clone().unpack().into_unchecked()); let response = NewestUnitResponse::new(requester, self.index(), unit, salt); let signed_response = Signed::sign(response, &self.keychain).into_unchecked(); @@ -478,71 +416,75 @@ where } } + fn handle_validation_error(&mut self, e: ValidationError) { + use ValidationError::*; + match e { + Invalid(e) => { + warn!(target: "AlephBFT-runway", "Received unit failing validation: {}", e) + } + Duplicate(su) => { + trace!(target: "AlephBFT-runway", "Received unit with hash {:?} again.", su.hash()) + } + Uncommitted(su) => { + debug!(target: "AlephBFT-runway", "Received unit with hash {:?} created by known forker {:?} for which we don't have a commitment, discarding.", su.hash(), su.creator()) + } + NewForker(alert) => { + warn!(target: "AlephBFT-runway", "New forker detected."); + trace!(target: "AlephBFT-runway", "Created alert: {:?}.", alert); + if self.alerts_for_alerter.unbounded_send(*alert).is_err() { + warn!(target: "AlephBFT-runway", "{:?} Channel to alerter should be open", self.index()); + self.exiting = true; + } + } + } + } + fn on_parents_response( &mut self, u_hash: H::Hash, parents: Vec>, ) { - if self.store.get_parents(u_hash).is_some() { - trace!(target: "AlephBFT-runway", "{:?} We got parents response but already know the parents.", self.index()); + use ValidationError::*; + if self.store.unit(&u_hash).is_some() { + trace!(target: "AlephBFT-runway", "{:?} We got parents response but already imported the unit.", self.index()); return; } - let (u_round, u_control_hash, parent_ids) = match self.store.unit_by_hash(&u_hash) { - Some(su) => { - let full_unit = su.as_signable(); - let parent_ids: Vec<_> = full_unit.control_hash().parents().collect(); - ( - full_unit.round(), - full_unit.control_hash().combined_hash, - parent_ids, - ) - } - None => { - trace!(target: "AlephBFT-runway", "{:?} We got parents but don't even know the unit. Ignoring.", self.index()); - return; - } - }; - - if parent_ids.len() != parents.len() { - warn!(target: "AlephBFT-runway", "{:?} In received parent response expected {} parents got {} for unit {:?}.", self.index(), parents.len(), parent_ids.len(), u_hash); - return; - } - - let mut p_hashes_node_map = NodeMap::with_size(self.node_count()); - for (i, uu) in parents.into_iter().enumerate() { - let su = match self.validator.validate_unit(uu) { - Ok(su) => su, - Err(e) => { + let mut parent_hashes = HashMap::new(); + for unit in parents.into_iter() { + let su = match self.validator.validate(unit, &self.store) { + Ok(su) => { + self.send_unit_for_reconstruction(su.clone()); + su + } + Err(Duplicate(su)) => { + trace!(target: "AlephBFT-runway", "Already have parent {:?}.", su.hash()); + su + } + Err(Uncommitted(su)) => { + debug!(target: "AlephBFT-runway", "Received uncommitted parent {:?}, we should get the commitment soon.", su.hash()); + su + } + Err(NewForker(alert)) => { + warn!(target: "AlephBFT-runway", "New forker detected."); + trace!(target: "AlephBFT-runway", "Created alert: {:?}.", alert); + if self.alerts_for_alerter.unbounded_send(*alert).is_err() { + warn!(target: "AlephBFT-runway", "{:?} Channel to alerter should be open", self.index()); + self.exiting = true; + } + // technically this was a correct unit, so we could have passed it on, + // but this will happen at most once and we will receive the parent + // response again, so we just discard it now + return; + } + Err(Invalid(e)) => { warn!(target: "AlephBFT-runway", "{:?} In received parent response received a unit that does not pass validation: {}", self.index(), e); return; } }; - let full_unit = su.as_signable(); - if full_unit.round() + 1 != u_round { - warn!(target: "AlephBFT-runway", "{:?} In received parent response received a unit with wrong round.", self.index()); - return; - } - if full_unit.creator() != parent_ids[i] { - warn!(target: "AlephBFT-runway", "{:?} In received parent response received a unit with wrong creator.", self.index()); - return; - } - let p_hash = full_unit.hash(); - let ix = full_unit.creator(); - p_hashes_node_map.insert(ix, p_hash); - // There might be some optimization possible here to not validate twice, but overall - // this piece of code should be executed extremely rarely. - self.resolve_missing_coord(&su.as_signable().coord()); - self.add_unit_to_store_unless_fork(su); + parent_hashes.insert(su.coord(), su.hash()); } - if ControlHash::::combine_hashes(&p_hashes_node_map) != u_control_hash { - warn!(target: "AlephBFT-runway", "{:?} In received parent response the control hash is incorrect {:?}.", self.index(), p_hashes_node_map); - return; - } - let p_hashes: Vec<_> = p_hashes_node_map.into_values().collect(); - self.store.add_parents(u_hash, p_hashes.clone()); - trace!(target: "AlephBFT-runway", "{:?} Succesful parents response for {:?}.", self.index(), u_hash); - self.send_consensus_notification(NotificationIn::UnitParents(u_hash, p_hashes)); + self.send_parents_for_reconstruction((u_hash, parent_hashes)); } fn resolve_missing_parents(&mut self, u_hash: &H::Hash) { @@ -551,52 +493,50 @@ where } } - fn on_created(&mut self, signed_unit: SignedUnit) { + fn on_created(&mut self, unit: SignedUnit) { debug!(target: "AlephBFT-runway", "{:?} On create notification.", self.index()); - self.store.add_unit(signed_unit, false); + let signed_unit = self + .validator + .validate(unit.into_unchecked(), &self.store) + .expect("unit created by us is correct and not a fork"); + // we are guaranteed this will immediately succeed, because creation uses the first units we add to the dag, + // i.e. the canonical units in the store + self.send_unit_for_reconstruction(signed_unit); } - fn on_alert_notification(&mut self, notification: ForkingNotification) { - use ForkingNotification::*; - match notification { - Forker(proof) => { - let forker = proof.0.index(); - if !self.store.is_forker(forker) { - self.on_new_forker_detected(forker, proof); - } + fn on_reconstruction_request(&mut self, request: ReconstructionRequest) { + use ReconstructionRequest::*; + match request { + Coord(coord) => { + self.on_missing_coord(coord); } - - Units(units) => { - for uu in units { - self.on_unit_received(uu, true); - } + ParentsOf(h) => { + self.on_wrong_control_hash(h); } } } - fn on_consensus_notification(&mut self, notification: NotificationOut) { - match notification { - NotificationOut::MissingUnits(coords) => { - self.on_missing_coords(coords); - } - NotificationOut::WrongControlHash(h) => { - self.on_wrong_control_hash(h); - } - NotificationOut::AddedToDag(h, p_hashes) => { - self.store.add_parents(h, p_hashes); - self.resolve_missing_parents(&h); - if let Some(su) = self.store.unit_by_hash(&h).cloned() { - if self - .backup_units_for_saver - .unbounded_send(su.into()) - .is_err() - { - error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), h); - } - } else { - error!(target: "AlephBFT-runway", "{:?} A unit already added to DAG is not in our store: {:?}.", self.index(), h); - } - } + fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit>) { + let unit_hash = unit.hash(); + trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord()); + self.validator.finished_processing(&unit_hash); + self.store.insert(unit.clone()); + self.resolve_missing_parents(&unit_hash); + self.resolve_missing_coord(&unit.coord()); + if self + .units_for_extender + .unbounded_send(unit.extender_unit()) + .is_err() + { + error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to extender: {:?}.", self.index(), unit_hash); + self.exiting = true; + } + if self + .backup_units_for_saver + .unbounded_send(unit.unpack().into()) + .is_err() + { + error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash); } } @@ -609,11 +549,11 @@ where } } - fn on_missing_coords(&mut self, mut coords: Vec) { - trace!(target: "AlephBFT-runway", "{:?} Dealing with missing coords notification {:?}.", self.index(), coords); - coords.retain(|coord| !self.store.contains_coord(coord)); - for coord in coords { - if self.missing_coords.insert(coord) { + fn on_missing_coord(&mut self, coord: UnitCoord) { + trace!(target: "AlephBFT-runway", "{:?} Dealing with missing coord notification {:?}.", self.index(), coord); + if self.store.canonical_unit(coord).is_none() { + let new_request = self.missing_coords.insert(coord); + if new_request { self.send_message_for_network(RunwayNotificationOut::Request(Request::Coord( coord, ))); @@ -623,14 +563,7 @@ where fn on_wrong_control_hash(&mut self, u_hash: H::Hash) { trace!(target: "AlephBFT-runway", "{:?} Dealing with wrong control hash notification {:?}.", self.index(), u_hash); - if let Some(p_hashes) = self.store.get_parents(u_hash) { - // We have the parents by some strange reason (someone sent us parents - // without us requesting them). - let p_hashes = p_hashes.clone(); - trace!(target: "AlephBFT-runway", "{:?} We have the parents for {:?} even though we did not request them.", self.index(), u_hash); - let notification = NotificationIn::UnitParents(u_hash, p_hashes); - self.send_consensus_notification(notification); - } else if self.missing_parents.insert(u_hash) { + if self.missing_parents.insert(u_hash) { self.send_message_for_network(RunwayNotificationOut::Request(Request::Parents(u_hash))); } } @@ -639,14 +572,15 @@ where for hash in batch { let unit = self .store - .unit_by_hash(&hash) + .unit(&hash) .expect("Ordered units must be in store") - .as_signable(); + .clone() + .unpack(); self.finalization_handler.unit_finalized( unit.creator(), unit.round(), - unit.data().clone(), + unit.as_signable().data().clone(), ) } } @@ -672,30 +606,36 @@ where } } - fn send_consensus_notification(&mut self, notification: NotificationIn) { - if self.tx_consensus.unbounded_send(notification).is_err() { - warn!(target: "AlephBFT-runway", "{:?} Channel to consensus should be open", self.index()); + fn send_unit_for_reconstruction(&mut self, unit: SignedUnit) { + trace!(target: "AlephBFT-runway", "Sending unit {:?} {} to reconstruction.", unit.hash(), unit.coord()); + if self.units_for_reconstruction.unbounded_send(unit).is_err() { + warn!(target: "AlephBFT-runway", "{:?} Unit channel to reconstruction should be open", self.index()); self.exiting = true; } } - fn move_units_to_consensus(&mut self) { - let units_to_move = self - .store - .yield_buffer_units() - .into_iter() - .map(|su| su.as_signable().unit()) - .collect(); - self.send_consensus_notification(NotificationIn::NewUnits(units_to_move)) + fn send_parents_for_reconstruction(&mut self, parents: ExplicitParents) { + if self + .parents_for_reconstruction + .unbounded_send(parents) + .is_err() + { + warn!(target: "AlephBFT-runway", "{:?} Parents channel to reconstruction should be open", self.index()); + self.exiting = true; + } + } + + fn status(&self) -> RunwayStatus<'_, H> { + RunwayStatus { + missing_coords: &self.missing_coords, + missing_parents: &self.missing_parents, + validator_status: self.validator.status(), + dag_status: self.store.status(), + } } fn status_report(&self) { - let runway_status: RunwayStatus = RunwayStatus::new( - self.store.get_status_of(self.index()), - &self.missing_coords, - &self.missing_parents, - ); - info!(target: "AlephBFT-runway", "{}", runway_status); + info!(target: "AlephBFT-runway", "{}", self.status()); } async fn run( @@ -713,7 +653,7 @@ where match data_from_backup.await { Ok(units) => { for unit in units { - self.on_unit_received(unit, false); + self.on_unit_received(unit); } } Err(e) => { @@ -725,16 +665,23 @@ where debug!(target: "AlephBFT-runway", "{:?} Runway started.", index); loop { futures::select! { - notification = self.rx_consensus.next() => match notification { - Some(notification) => self.on_consensus_notification(notification), + unit = self.units_from_reconstruction.next() => match unit { + Some(unit) => self.on_unit_reconstructed(unit), None => { - error!(target: "AlephBFT-runway", "{:?} Consensus notification stream closed.", index); + error!(target: "AlephBFT-runway", "{:?} Reconstructed unit stream closed.", index); + break; + } + }, + request = self.requests_from_reconstruction.next() => match request { + Some(request) => self.on_reconstruction_request(request), + None => { + error!(target: "AlephBFT-runway", "{:?} Reconstruction request stream closed.", index); break; } }, - signed_unit = self.signed_units_from_creation.next() => match signed_unit { - Some((signed_unit, _)) => self.on_created(signed_unit), + signed_unit = self.new_units_from_creation.next() => match signed_unit { + Some(signed_unit) => self.on_created(signed_unit), None => { error!(target: "AlephBFT-runway", "{:?} Creation stream closed.", index); break; @@ -743,7 +690,16 @@ where notification = self.notifications_from_alerter.next() => match notification { Some(notification) => { - self.on_alert_notification(notification) + trace!(target: "AlephBFT-runway", "Received alerter notification: {:?}.", notification); + for result in self.validator.process_forking_notification(notification, &self.store) { + match result { + Ok(unit) => { + trace!(target: "AlephBFT-runway", "Validated unit {:?} from alerter.", unit.hash()); + self.send_unit_for_reconstruction(unit) + }, + Err(e) => self.handle_validation_error(e), + } + } }, None => { error!(target: "AlephBFT-runway", "{:?} Alert notification stream closed.", index); @@ -784,7 +740,6 @@ where self.exiting = true; } } - self.move_units_to_consensus(); if self.exiting { debug!(target: "AlephBFT-runway", "{:?} Runway decided to exit.", index); @@ -808,7 +763,7 @@ pub(crate) struct NetworkIO { #[cfg(feature = "initial_unit_collection")] fn initial_unit_collection<'a, H: Hasher, D: Data, MK: MultiKeychain>( keychain: &'a MK, - validator: &'a Validator, + validator: &'a UnitValidator, unit_messages_for_network: &Sender>, unit_collection_sender: oneshot::Sender, responses_from_runway: Receiver>, @@ -909,10 +864,12 @@ pub(crate) async fn run( _phantom: _, } = runway_io; - let (tx_consensus, consensus_stream) = mpsc::unbounded(); - let (consensus_sink, rx_consensus) = mpsc::unbounded(); + let (units_for_reconstruction, units_from_runway) = mpsc::unbounded(); + let (parents_for_reconstruction, parents_from_runway) = mpsc::unbounded(); + let (units_for_runway, units_from_reconstruction) = mpsc::unbounded(); + let (requests_for_runway, requests_from_reconstruction) = mpsc::unbounded(); let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded(); - let (signed_units_for_runway, signed_units_from_creation) = mpsc::unbounded(); + let (new_units_for_runway, new_units_from_creation) = mpsc::unbounded(); let consensus_terminator = terminator.add_offspring_connection("AlephBFT-consensus"); let consensus_config = config.clone(); @@ -924,11 +881,12 @@ pub(crate) async fn run( consensus::run( consensus_config, consensus::IO { - incoming_notifications: consensus_stream, - outgoing_notifications: consensus_sink, - units_for_runway: signed_units_for_runway, + units_from_runway, + parents_from_runway, + units_for_runway, + requests_for_runway, + new_units_for_runway, data_provider, - ordered_batch_tx, starting_round, }, consensus_keychain, @@ -983,7 +941,7 @@ pub(crate) async fn run( .fuse(); let index = keychain.index(); - let validator = Validator::new(config.session_id(), keychain.clone(), config.max_round()); + let validator = UnitValidator::new(config.session_id(), keychain.clone(), config.max_round()); let (responses_for_collection, responses_from_runway) = mpsc::unbounded(); let (unit_collections_sender, unit_collection_result) = oneshot::channel(); let (loaded_data_tx, loaded_data_rx) = oneshot::channel(); @@ -1024,23 +982,34 @@ pub(crate) async fn run( }; pin_mut!(starting_round_handle); + let (units_for_extender, units_from_runway) = mpsc::unbounded(); + let extender = Extender::::new(index, units_from_runway, ordered_batch_tx); + let extender_terminator = terminator.add_offspring_connection("AlephBFT-extender"); + let mut extender_handle = spawn_handle + .spawn_essential("runway/extender", async move { + extender.run(extender_terminator).await + }) + .fuse(); + let runway_handle = spawn_handle .spawn_essential("runway", { let runway_config = RunwayConfig { finalization_handler, backup_units_for_saver, + units_for_extender, backup_units_from_saver, alerts_for_alerter, notifications_from_alerter, - tx_consensus, - rx_consensus, + units_for_reconstruction, + parents_for_reconstruction, + units_from_reconstruction, + requests_from_reconstruction, unit_messages_from_network: network_io.unit_messages_from_network, unit_messages_for_network: network_io.unit_messages_for_network, ordered_batch_rx, responses_for_collection, resolved_requests: network_io.resolved_requests, - max_round: config.max_round(), - signed_units_from_creation, + new_units_from_creation, }; let runway_terminator = terminator.add_offspring_connection("AlephBFT-runway"); let validator = validator.clone(); @@ -1058,6 +1027,9 @@ pub(crate) async fn run( debug!(target: "AlephBFT-runway", "{:?} Runway task terminated early.", index); break; }, + _ = extender_handle => { + debug!(target: "AlephBFT-runway", "{:?} Extender task terminated early.", index); + }, _ = alerter_handle => { debug!(target: "AlephBFT-runway", "{:?} Alerter task terminated early.", index); break; @@ -1085,6 +1057,7 @@ pub(crate) async fn run( debug!(target: "AlephBFT-runway", "{:?} Ending run.", index); terminator.terminate_sync().await; + handle_task_termination(extender_handle, "AlephBFT-runway", "Extender", index).await; handle_task_termination(consensus_handle, "AlephBFT-runway", "Consensus", index).await; handle_task_termination(alerter_handle, "AlephBFT-runway", "Alerter", index).await; handle_task_termination(runway_handle, "AlephBFT-runway", "Runway", index).await; diff --git a/consensus/src/testing/byzantine.rs b/consensus/src/testing/byzantine.rs index 9b8bad88..aaa77263 100644 --- a/consensus/src/testing/byzantine.rs +++ b/consensus/src/testing/byzantine.rs @@ -2,7 +2,7 @@ use crate::{ member::UnitMessage::NewUnit, network::NetworkDataInner::Units, testing::{init_log, spawn_honest_member, HonestMember, Network, NetworkData}, - units::{ControlHash, FullUnit, PreUnit, SignedUnit, UnitCoord}, + units::{ControlHash, FullUnit, PreUnit, SignedUnit, Unit, UnitCoord}, Hasher, Network as NetworkT, NetworkData as NetworkDataT, NodeCount, NodeIndex, NodeMap, Recipient, Round, SessionId, Signed, SpawnHandle, TaskHandle, }; diff --git a/consensus/src/testing/consensus.rs b/consensus/src/testing/consensus.rs index 269838bf..3c3ea75b 100644 --- a/consensus/src/testing/consensus.rs +++ b/consensus/src/testing/consensus.rs @@ -1,235 +1,19 @@ use crate::{ consensus, - creation::SignedUnitWithParents as GenericSignedUnitWithParents, - runway::{NotificationIn, NotificationOut}, + reconstruction::Request as GenericRequest, testing::{complete_oneshot, gen_config, gen_delay_config, init_log}, - units::{ControlHash, PreUnit, Unit, UnitCoord}, - Hasher, NodeCount, NodeIndex, NodeMap, SpawnHandle, Terminator, + units::{preunit_to_full_unit, random_full_parent_units_up_to, ControlHash, PreUnit, Unit}, + Hasher, NodeCount, NodeIndex, NodeMap, Signed, SpawnHandle, Terminator, }; -use aleph_bft_mock::{Data, DataProvider, Hasher64, Keychain, Spawner}; +use aleph_bft_mock::{DataProvider, Hasher64, Keychain, Spawner}; use futures::{ - channel::{ - mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, - oneshot, - }, + channel::{mpsc::unbounded, oneshot}, sink::SinkExt, stream::StreamExt, - Future, }; use log::trace; -use std::{ - collections::HashMap, - pin::Pin, - task::{Context, Poll}, -}; - -type SignedUnitWithParents = GenericSignedUnitWithParents; - -// This struct allows to create a Hub to interconnect several instances of the Consensus engine, without -// requiring the Member wrapper. The Hub notifies all connected instances about newly created units and -// is able to answer unit requests as well. WrongControlHashes are not supported, which means that this -// Hub should be used to run simple tests in honest scenarios only. -// Usage: 1) create an instance using new(n_members), 2) connect all n_members instances, 0, 1, 2, ..., n_members - 1. -// 3) run the HonestHub instance as a Future. -pub(crate) struct HonestHub { - n_members: NodeCount, - ntfct_out_rxs: HashMap>>, - ntfct_in_txs: HashMap>>, - units_from_consensus: HashMap>, - units_by_coord: HashMap>, -} - -impl HonestHub { - pub(crate) fn new(n_members: NodeCount) -> Self { - HonestHub { - n_members, - ntfct_out_rxs: HashMap::new(), - ntfct_in_txs: HashMap::new(), - units_from_consensus: HashMap::new(), - units_by_coord: HashMap::new(), - } - } - - pub(crate) fn connect( - &mut self, - node_ix: NodeIndex, - ) -> ( - UnboundedSender>, - UnboundedReceiver>, - UnboundedSender, - ) { - let (tx_in, rx_in) = unbounded(); - let (tx_out, rx_out) = unbounded(); - let (units_for_hub, units_from_consensus) = unbounded(); - self.ntfct_in_txs.insert(node_ix, tx_in); - self.ntfct_out_rxs.insert(node_ix, rx_out); - self.units_from_consensus - .insert(node_ix, units_from_consensus); - (tx_out, rx_in, units_for_hub) - } - - fn send_to_all(&mut self, ntfct: NotificationIn) { - assert!( - self.ntfct_in_txs.len() == self.n_members.0, - "Must connect to all nodes before running the hub." - ); - for (_ix, tx) in self.ntfct_in_txs.iter() { - tx.unbounded_send(ntfct.clone()).ok(); - } - } - - fn send_to_node(&mut self, node_ix: NodeIndex, ntfct: NotificationIn) { - let tx = self - .ntfct_in_txs - .get(&node_ix) - .expect("Must connect to all nodes before running the hub."); - tx.unbounded_send(ntfct).expect("Channel should be open"); - } - - fn on_notification(&mut self, node_ix: NodeIndex, ntfct: NotificationOut) { - match ntfct { - NotificationOut::MissingUnits(coords) => { - let mut response_units = Vec::new(); - for coord in coords { - match self.units_by_coord.get(&coord) { - Some(unit) => { - response_units.push(unit.clone()); - } - None => { - panic!("Unit requested that the hub does not know."); - } - } - } - let ntfct = NotificationIn::NewUnits(response_units); - self.send_to_node(node_ix, ntfct); - } - NotificationOut::WrongControlHash(_u_hash) => { - panic!("No support for forks in testing."); - } - NotificationOut::AddedToDag(_u_hash, _hashes) => { - // Safe to ignore in testing. - // Normally this is used in Member to answer parents requests. - } - } - } - - fn on_unit(&mut self, (unit, _): SignedUnitWithParents) { - let u = unit.into_unchecked().as_signable().unit(); - let coord = UnitCoord::new(u.round(), u.creator()); - self.units_by_coord.insert(coord, u.clone()); - self.send_to_all(NotificationIn::NewUnits(vec![u])); - } -} - -impl Future for HonestHub { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready_ixs: Vec = Vec::new(); - let mut ready_unit_ixs: Vec = Vec::new(); - let mut buffer = Vec::new(); - let mut unit_buffer = Vec::new(); - for (ix, rx) in self.ntfct_out_rxs.iter_mut() { - loop { - match rx.poll_next_unpin(cx) { - Poll::Ready(Some(ntfct)) => { - buffer.push((*ix, ntfct)); - } - Poll::Ready(None) => { - ready_ixs.push(*ix); - break; - } - Poll::Pending => { - break; - } - } - } - } - for (ix, units_from_consensus) in self.units_from_consensus.iter_mut() { - loop { - match units_from_consensus.poll_next_unpin(cx) { - Poll::Ready(Some(unit)) => { - unit_buffer.push(unit); - } - Poll::Ready(None) => { - ready_unit_ixs.push(*ix); - break; - } - Poll::Pending => { - break; - } - } - } - } - for unit in unit_buffer { - self.on_unit(unit); - } - for (ix, ntfct) in buffer { - self.on_notification(ix, ntfct); - } - for ix in ready_ixs { - self.ntfct_out_rxs.remove(&ix); - } - if self.ntfct_out_rxs.is_empty() { - return Poll::Ready(()); - } - Poll::Pending - } -} -#[tokio::test] -async fn agree_on_first_batch() { - init_log(); - let n_members = NodeCount(16); - let mut hub = HonestHub::new(n_members); - - let mut exits = vec![]; - let mut batch_rxs = vec![]; - let spawner = Spawner::new(); - - let mut handles = vec![]; - - for node_ix in n_members.into_iterator() { - let (tx, rx, units_for_hub) = hub.connect(node_ix); - let conf = gen_config(node_ix, n_members, gen_delay_config()); - let (exit_tx, exit_rx) = oneshot::channel(); - exits.push(exit_tx); - let (batch_tx, batch_rx) = unbounded(); - batch_rxs.push(batch_rx); - let starting_round = complete_oneshot(Some(0)); - let keychain = Keychain::new(n_members, node_ix); - let data_provider = DataProvider::new(); - handles.push(spawner.spawn_essential( - "consensus", - consensus::run( - conf, - consensus::IO { - incoming_notifications: rx, - outgoing_notifications: tx, - units_for_runway: units_for_hub, - data_provider, - ordered_batch_tx: batch_tx, - starting_round, - }, - keychain, - spawner, - Terminator::create_root(exit_rx, "AlephBFT-consensus"), - ), - )); - } - - spawner.spawn("hub", hub); - - let mut batches = vec![]; - for mut rx in batch_rxs.drain(..) { - let batch = rx.next().await.unwrap(); - assert!(!batch.is_empty()); - batches.push(batch); - } - - for node_ix in 1..n_members.0 { - assert_eq!(batches[0], batches[node_ix]); - } -} +type Request = GenericRequest; #[tokio::test] async fn catches_wrong_control_hash() { @@ -237,15 +21,20 @@ async fn catches_wrong_control_hash() { let n_nodes = NodeCount(4); let spawner = Spawner::new(); let node_ix = NodeIndex(0); - let (mut tx_in, rx_in) = unbounded(); - let (tx_out, mut rx_out) = unbounded(); + let (mut units_for_consensus, units_from_us) = unbounded(); + let (_parents_for_consensus, parents_from_us) = unbounded(); let (units_for_us, _units_from_consensus) = unbounded(); + let (requests_for_us, mut requests_from_consensus) = unbounded(); + let (new_units_for_us, _new_units_from_consensus) = unbounded(); let conf = gen_config(node_ix, n_nodes, gen_delay_config()); let (exit_tx, exit_rx) = oneshot::channel(); - let (batch_tx, _batch_rx) = unbounded(); let starting_round = complete_oneshot(Some(0)); - let keychain = Keychain::new(n_nodes, node_ix); + let keychains: Vec<_> = n_nodes + .into_iterator() + .map(|node_id| Keychain::new(n_nodes, node_id)) + .collect(); + let keychain = keychains[node_ix.0]; let data_provider = DataProvider::new(); let consensus_handle = spawner.spawn_essential( @@ -253,11 +42,12 @@ async fn catches_wrong_control_hash() { consensus::run( conf, consensus::IO { - incoming_notifications: rx_in, - outgoing_notifications: tx_out, + units_from_runway: units_from_us, + parents_from_runway: parents_from_us, units_for_runway: units_for_us, + requests_for_runway: requests_for_us, + new_units_for_runway: new_units_for_us, data_provider, - ordered_batch_tx: batch_tx, starting_round, }, keychain, @@ -265,16 +55,23 @@ async fn catches_wrong_control_hash() { Terminator::create_root(exit_rx, "AlephBFT-consensus"), ), ); - let empty_control_hash = ControlHash::new(&(vec![None; n_nodes.0]).into()); - let other_initial_units: Vec<_> = n_nodes - .into_iterator() - .skip(1) - .map(|creator| PreUnit::::new(creator, 0, empty_control_hash.clone())) - .map(|pu| Unit::new(pu, rand::random())) + let other_initial_units: Vec<_> = random_full_parent_units_up_to(0, n_nodes, 0) + .pop() + .expect("created initial units") + .into_iter() + .map(|unit| { + let keychain = keychains + .get(unit.creator().0) + .expect("we have the keychains"); + Signed::sign(unit, keychain) + }) .collect(); - let _ = tx_in - .send(NotificationIn::NewUnits(other_initial_units.clone())) - .await; + for unit in &other_initial_units { + units_for_consensus + .send(unit.clone()) + .await + .expect("channel works"); + } let mut parent_hashes = NodeMap::with_size(n_nodes); for unit in other_initial_units.into_iter() { parent_hashes.insert(unit.creator(), unit.hash()); @@ -288,14 +85,21 @@ async fn catches_wrong_control_hash() { let mut control_hash = bad_pu.control_hash().clone(); control_hash.combined_hash = bad_control_hash; let bad_pu = PreUnit::new(bad_pu.creator(), bad_pu.round(), control_hash); - let some_hash: ::Hash = [0, 1, 0, 1, 0, 1, 0, 1]; - let bad_unit = Unit::new(bad_pu, some_hash); - let _ = tx_in.send(NotificationIn::NewUnits(vec![bad_unit])).await; + let keychain = &keychains[bad_pu.creator().0]; + let bad_unit = Signed::sign(preunit_to_full_unit(bad_pu, 0), keychain); + let unit_hash = bad_unit.hash(); + units_for_consensus + .send(bad_unit) + .await + .expect("channel is open"); loop { - let notification = rx_out.next().await.unwrap(); - trace!(target: "consensus-test", "notification {:?}", notification); - if let NotificationOut::WrongControlHash(h) = notification { - assert_eq!(h, some_hash, "Expected notification for our bad unit."); + let request = requests_from_consensus + .next() + .await + .expect("channel is open"); + trace!(target: "consensus-test", "request {:?}", request); + if let Request::ParentsOf(h) = request { + assert_eq!(h, unit_hash, "Expected notification for our bad unit."); break; } } diff --git a/consensus/src/testing/crash_recovery.rs b/consensus/src/testing/crash_recovery.rs index 5b0eb023..9c531dab 100644 --- a/consensus/src/testing/crash_recovery.rs +++ b/consensus/src/testing/crash_recovery.rs @@ -1,6 +1,6 @@ use crate::{ testing::{init_log, spawn_honest_member, HonestMember, Network, ReconnectSender}, - units::{UncheckedSignedUnit, UnitCoord}, + units::{UncheckedSignedUnit, Unit, UnitCoord}, NodeCount, NodeIndex, SpawnHandle, TaskHandle, }; use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner}; diff --git a/consensus/src/testing/creation.rs b/consensus/src/testing/creation.rs index 6ffe3c68..4fe4c235 100644 --- a/consensus/src/testing/creation.rs +++ b/consensus/src/testing/creation.rs @@ -1,7 +1,7 @@ use crate::{ - creation::{run, SignedUnitWithParents as GenericSignedUnitWithParents, IO}, + creation::{run, IO}, testing::{gen_config, gen_delay_config}, - units::Unit as GenericUnit, + units::{SignedUnit as GenericSignedUnit, Unit as GenericUnit}, NodeCount, Receiver, Round, Sender, Terminator, }; use aleph_bft_mock::{Data, DataProvider, Hasher64, Keychain}; @@ -10,19 +10,18 @@ use futures::{ FutureExt, StreamExt, }; -type Unit = GenericUnit; -type SignedUnitWithParents = GenericSignedUnitWithParents; +type SignedUnit = GenericSignedUnit; struct TestController { max_round_per_creator: Vec, - parents_for_creators: Sender, - units_from_creators: Receiver, + parents_for_creators: Sender, + units_from_creators: Receiver, } impl TestController { fn new( - parents_for_creators: Sender, - units_from_creators: Receiver, + parents_for_creators: Sender, + units_from_creators: Receiver, n_members: NodeCount, ) -> Self { TestController { @@ -35,12 +34,11 @@ impl TestController { async fn control_until(&mut self, max_round: Round) { let mut round_reached = 0; while round_reached < max_round { - let (unit, _) = self + let unit = self .units_from_creators .next() .await .expect("Creator output channel isn't closed."); - let unit = unit.into_unchecked().as_signable().unit(); if unit.round() > round_reached { round_reached = unit.round(); } @@ -56,8 +54,8 @@ struct TestSetup { test_controller: TestController, killers: Vec>, handles: Vec>, - units_from_controller: Receiver, - units_for_creators: Vec>, + units_from_controller: Receiver, + units_for_creators: Vec>, } fn setup_test(n_members: NodeCount) -> TestSetup { diff --git a/consensus/src/testing/dag.rs b/consensus/src/testing/dag.rs index 8de2a1cc..07943c2c 100644 --- a/consensus/src/testing/dag.rs +++ b/consensus/src/testing/dag.rs @@ -1,10 +1,12 @@ use crate::{ consensus, - creation::SignedUnitWithParents as GenericSignedUnitWithParents, - runway::{NotificationIn as GenericNotificationIn, NotificationOut as GenericNotificationOut}, + extension::Service as Extender, + reconstruction::{ReconstructedUnit as GenericReconstructedUnit, Request as GenericRequest}, + runway::ExplicitParents as GenericExplicitParents, testing::{complete_oneshot, gen_config, gen_delay_config}, - units::{ControlHash, PreUnit, Unit}, - NodeCount, NodeIndex, NodeMap, NodeSubset, Receiver, Round, Sender, SpawnHandle, Terminator, + units::{ControlHash, FullUnit, PreUnit, SignedUnit as GenericSignedUnit, Unit, UnitCoord}, + NodeCount, NodeIndex, NodeMap, NodeSubset, Receiver, Round, Sender, Signed, SpawnHandle, + Terminator, }; use aleph_bft_mock::{Data, DataProvider, Hash64, Hasher64, Keychain, Spawner}; use futures::{ @@ -13,39 +15,32 @@ use futures::{ FutureExt, }; use futures_timer::Delay; -use log::{debug, error, trace}; +use log::{debug, trace}; use rand::{distributions::Open01, prelude::*}; use std::{cmp, collections::HashMap, time::Duration}; -type NotificationIn = GenericNotificationIn; -type NotificationOut = GenericNotificationOut; -type SignedUnitWithParents = GenericSignedUnitWithParents; +type Request = GenericRequest; +type ExplicitParents = GenericExplicitParents; +type SignedUnit = GenericSignedUnit; +type ReconstructedUnit = GenericReconstructedUnit; #[derive(Clone)] struct UnitWithParents { - unit: Unit, + unit: SignedUnit, parent_hashes: NodeMap, } -fn unit_hash(round: Round, creator: NodeIndex, variant: usize) -> Hash64 { - let mut hash = Hash64::default(); - hash[0] = round as u8; - hash[1] = creator.0 as u8; - hash[2] = variant as u8; - hash -} - impl UnitWithParents { fn new( round: Round, creator: NodeIndex, - variant: usize, + variant: Data, parent_hashes: NodeMap, ) -> Self { + let keychain = Keychain::new(parent_hashes.size(), creator); let control_hash = ControlHash::new(&parent_hashes); let pre_unit = PreUnit::new(creator, round, control_hash); - let hash = unit_hash(round, creator, variant); - let unit = Unit::new(pre_unit, hash); + let unit = Signed::sign(FullUnit::new(pre_unit, Some(variant), 0), &keychain); UnitWithParents { unit, parent_hashes, @@ -55,83 +50,117 @@ impl UnitWithParents { self.unit.hash() } - fn parent_hashes_vec(&self) -> Vec { - self.parent_hashes.values().cloned().collect() + fn parent_hashes_map(&self) -> HashMap { + let mut result = HashMap::new(); + let round = self.unit.round(); + for (creator, hash) in self.parent_hashes.iter() { + result.insert(UnitCoord::new(round - 1, creator), *hash); + } + result } } struct ConsensusDagFeeder { - tx_in: Sender, - rx_out: Receiver, - units_from_creator: Receiver, + units_for_consensus: Sender, + parents_for_consensus: Sender, + requests_from_consensus: Receiver, + reconstructed_units_from_consensus: Receiver, + units_from_creator: Receiver, units: Vec, units_map: HashMap, } +type DagFeederParts = ( + ConsensusDagFeeder, + Receiver, + Receiver, + Sender, + Sender, + Sender, +); + impl ConsensusDagFeeder { - fn new( - units: Vec, - ) -> ( - Self, - Receiver, - Sender, - Sender, - ) { + fn new(units: Vec) -> DagFeederParts { let units_map = units.iter().map(|u| (u.hash(), u.clone())).collect(); - let (tx_in, rx_in) = mpsc::unbounded(); - let (tx_out, rx_out) = mpsc::unbounded(); + let (units_for_consensus, units_from_feeder) = mpsc::unbounded(); + let (parents_for_consensus, parents_from_feeder) = mpsc::unbounded(); + let (requests_for_feeder, requests_from_consensus) = mpsc::unbounded(); + let (reconstructed_units_for_feeder, reconstructed_units_from_consensus) = + mpsc::unbounded(); let (units_for_feeder, units_from_creator) = mpsc::unbounded(); let cdf = ConsensusDagFeeder { - tx_in, - rx_out, + units_for_consensus, + parents_for_consensus, + requests_from_consensus, + reconstructed_units_from_consensus, units_from_creator, units, units_map, }; - (cdf, rx_in, tx_out, units_for_feeder) + ( + cdf, + units_from_feeder, + parents_from_feeder, + requests_for_feeder, + reconstructed_units_for_feeder, + units_for_feeder, + ) } - fn on_consensus_notification(&self, notification: NotificationOut) { - match notification { - NotificationOut::WrongControlHash(h) => { + fn on_request(&self, request: Request) { + use GenericRequest::*; + match request { + ParentsOf(h) => { // We need to answer these requests as otherwise reconstruction cannot make progress - let parent_hashes = self.units_map.get(&h).unwrap().parent_hashes_vec(); - let notification = NotificationIn::UnitParents(h, parent_hashes); - self.tx_in.unbounded_send(notification).unwrap(); - } - NotificationOut::AddedToDag(h, p_hashes) => { - let expected_hashes = self.units_map.get(&h).unwrap().parent_hashes_vec(); - assert!(p_hashes == expected_hashes); + let parent_hashes = self.units_map.get(&h).unwrap().parent_hashes_map(); + let parents = (h, parent_hashes); + self.parents_for_consensus.unbounded_send(parents).unwrap(); } - _ => { - //We ignore the remaining notifications. We don't need to answer missing units requests. + Coord(_) => { + // We don't need to answer missing units requests. } } } + fn on_reconstructed_unit(&self, unit: ReconstructedUnit) { + let h = unit.hash(); + let round = unit.round(); + let parents = unit.parents(); + let expected_hashes = self + .units_map + .get(&h) + .expect("we have the unit") + .parent_hashes_map(); + assert_eq!(parents.item_count(), expected_hashes.len()); + for (creator, hash) in parents { + assert_eq!( + Some(hash), + expected_hashes.get(&UnitCoord::new(round - 1, creator)) + ); + } + } + async fn run(mut self) { for unit in &self.units { - let notification = NotificationIn::NewUnits(vec![unit.unit.clone()]); - self.tx_in.unbounded_send(notification).unwrap(); + self.units_for_consensus + .unbounded_send(unit.unit.clone()) + .expect("channel should be open"); } loop { - let notification = loop { - futures::select! { - notification = self.rx_out.next() => { - break notification; + futures::select! { + request = self.requests_from_consensus.next() => match request { + Some(request) => self.on_request(request), + None => break, + }, + unit = self.reconstructed_units_from_consensus.next() => match unit { + Some(unit) => self.on_reconstructed_unit(unit), + None => break, }, _ = self.units_from_creator.next() => continue, - } }; - match notification { - Some(notification) => self.on_consensus_notification(notification), - None => { - error!(target: "dag-test", "Consensus notification stream closed."); - break; - } - } } + debug!(target: "dag-test", "Consensus stream closed."); } } @@ -141,23 +170,35 @@ async fn run_consensus_on_dag( deadline_ms: u64, ) -> Vec> { let node_id = NodeIndex(0); - let (feeder, rx_in, tx_out, units_for_feeder) = ConsensusDagFeeder::new(units); + let ( + feeder, + units_from_feeder, + parents_from_feeder, + requests_for_feeder, + reconstructed_units_for_feeder, + units_for_feeder, + ) = ConsensusDagFeeder::new(units); let conf = gen_config(node_id, n_members, gen_delay_config()); let keychain = Keychain::new(n_members, node_id); let (_exit_tx, exit_rx) = oneshot::channel(); + let (_extender_exit_tx, extender_exit_rx) = oneshot::channel(); + let (reconstructed_units_for_us, mut reconstructed_units_from_consensus) = mpsc::unbounded(); let (batch_tx, mut batch_rx) = mpsc::unbounded(); let spawner = Spawner::new(); let starting_round = complete_oneshot(Some(0)); + let (units_for_extender, units_from_us) = mpsc::unbounded(); + let extender = Extender::::new(node_id, units_from_us, batch_tx); spawner.spawn( "consensus", consensus::run( conf, consensus::IO { - incoming_notifications: rx_in, - outgoing_notifications: tx_out, - units_for_runway: units_for_feeder, + units_from_runway: units_from_feeder, + parents_from_runway: parents_from_feeder, + units_for_runway: reconstructed_units_for_us, + requests_for_runway: requests_for_feeder, + new_units_for_runway: units_for_feeder, data_provider: DataProvider::new(), - ordered_batch_tx: batch_tx, starting_round, }, keychain, @@ -165,6 +206,13 @@ async fn run_consensus_on_dag( Terminator::create_root(exit_rx, "AlephBFT-consensus"), ), ); + spawner.spawn( + "extender", + extender.run(Terminator::create_root( + extender_exit_rx, + "AlephBFT-extender", + )), + ); spawner.spawn("feeder", feeder.run()); let mut batches = Vec::new(); let mut delay_fut = Delay::new(Duration::from_millis(deadline_ms)).fuse(); @@ -173,6 +221,11 @@ async fn run_consensus_on_dag( batch = batch_rx.next() => { batches.push(batch.unwrap()); }, + unit = reconstructed_units_from_consensus.next() => { + let unit = unit.expect("consensus is operating"); + units_for_extender.unbounded_send(unit.extender_unit()).expect("extender is operating"); + reconstructed_units_for_feeder.unbounded_send(unit).expect("feeder is operating"); + } _ = &mut delay_fut => { break; } @@ -202,7 +255,7 @@ fn generate_random_dag(n_members: NodeCount, height: Round, seed: u64) -> Vec FmtResult { + write!(f, "(#{} by {})", self.round, self.creator.0) + } +} + /// Combined hashes of the parents of a unit together with the set of indices of creators of the /// parents #[derive(Clone, Eq, PartialEq, Hash, Debug, Decode, Encode)] @@ -146,18 +153,6 @@ impl FullUnit { pub(crate) fn as_pre_unit(&self) -> &PreUnit { &self.pre_unit } - pub(crate) fn creator(&self) -> NodeIndex { - self.pre_unit.creator() - } - pub(crate) fn round(&self) -> Round { - self.pre_unit.round() - } - pub(crate) fn control_hash(&self) -> &ControlHash { - self.pre_unit.control_hash() - } - pub(crate) fn coord(&self) -> UnitCoord { - self.pre_unit.coord - } pub(crate) fn data(&self) -> &Option { &self.data } @@ -167,26 +162,12 @@ impl FullUnit { pub(crate) fn session_id(&self) -> SessionId { self.session_id } - pub(crate) fn hash(&self) -> H::Hash { - let hash = *self.hash.read(); - match hash { - Some(hash) => hash, - None => { - let hash = self.using_encoded(H::hash); - *self.hash.write() = Some(hash); - hash - } - } - } - pub(crate) fn unit(&self) -> Unit { - Unit::new(self.pre_unit.clone(), self.hash()) - } } impl Signable for FullUnit { type Hash = H::Hash; fn hash(&self) -> H::Hash { - self.hash() + Unit::hash(self) } } @@ -200,34 +181,77 @@ pub(crate) type UncheckedSignedUnit = UncheckedSigned, S pub(crate) type SignedUnit = Signed, K>; -#[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)] -pub struct Unit { - pre_unit: PreUnit, - hash: H::Hash, -} +/// Abstract representation of a unit from the Dag point of view. +pub trait Unit: 'static + Send + Clone { + type Hasher: Hasher; + + fn hash(&self) -> ::Hash; + + fn coord(&self) -> UnitCoord; -impl Unit { - pub(crate) fn new(pre_unit: PreUnit, hash: H::Hash) -> Self { - Unit { pre_unit, hash } + fn control_hash(&self) -> &ControlHash; + + fn creator(&self) -> NodeIndex { + self.coord().creator() } - pub(crate) fn creator(&self) -> NodeIndex { - self.pre_unit.creator() + + fn round(&self) -> Round { + self.coord().round() } - pub(crate) fn round(&self) -> Round { - self.pre_unit.round() +} + +pub trait WrappedUnit: Unit { + type Wrapped: Unit; + + fn unpack(self) -> Self::Wrapped; +} + +impl Unit for FullUnit { + type Hasher = H; + + fn hash(&self) -> H::Hash { + let hash = *self.hash.read(); + match hash { + Some(hash) => hash, + None => { + let hash = self.using_encoded(H::hash); + *self.hash.write() = Some(hash); + hash + } + } } - pub(crate) fn control_hash(&self) -> &ControlHash { + + fn coord(&self) -> UnitCoord { + self.pre_unit.coord + } + + fn control_hash(&self) -> &ControlHash { self.pre_unit.control_hash() } - pub(crate) fn hash(&self) -> H::Hash { - self.hash +} + +impl Unit for SignedUnit { + type Hasher = H; + + fn hash(&self) -> H::Hash { + Unit::hash(self.as_signable()) + } + + fn coord(&self) -> UnitCoord { + self.as_signable().coord() + } + + fn control_hash(&self) -> &ControlHash { + self.as_signable().control_hash() } } +pub type HashFor = <::Hasher as Hasher>::Hash; + #[cfg(test)] pub mod tests { use crate::{ - units::{random_full_parent_units_up_to, ControlHash, FullUnit}, + units::{random_full_parent_units_up_to, ControlHash, FullUnit, Unit}, Hasher, NodeCount, }; use aleph_bft_mock::{Data, Hasher64}; diff --git a/consensus/src/units/store.rs b/consensus/src/units/store.rs index 2792410c..a6ab8eea 100644 --- a/consensus/src/units/store.rs +++ b/consensus/src/units/store.rs @@ -1,291 +1,273 @@ -use super::*; -use itertools::Itertools; -use log::{trace, warn}; -use std::{collections::HashSet, fmt}; - -#[derive(Clone, Eq, PartialEq, Hash)] -pub struct UnitStoreStatus<'a> { - index: NodeIndex, - forkers: &'a NodeSubset, +use std::{ + collections::HashMap, + fmt::{Display, Formatter, Result as FmtResult}, +}; + +use crate::{ + units::{HashFor, Unit, UnitCoord}, + NodeCount, NodeIndex, NodeMap, Round, +}; + +/// An overview of what is in the unit store. +pub struct UnitStoreStatus { size: usize, - height: Round, top_row: NodeMap, - first_missing_rounds: NodeMap, } -impl<'a> UnitStoreStatus<'a> { - fn new( - index: NodeIndex, - forkers: &'a NodeSubset, - size: usize, - height: Round, - top_row: NodeMap, - first_missing_rounds: NodeMap, - ) -> Self { - Self { - index, - forkers, - size, - height, - top_row, - first_missing_rounds, - } - } - - pub fn rounds_behind(&self) -> Round { - self.height - .saturating_sub(self.top_row.get(self.index).cloned().unwrap_or(0)) +impl UnitStoreStatus { + /// Highest round among units in the store. + pub fn top_round(&self) -> Round { + self.top_row.values().max().cloned().unwrap_or(0) } } -impl<'a> fmt::Display for UnitStoreStatus<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "DAG size - {}; DAG height - {}", self.size, self.height)?; - if self.first_missing_rounds.item_count() > 0 { - write!( - f, - "; DAG first missing rounds - {}", - self.first_missing_rounds - )?; - } - write!(f, "; DAG top row - {}", self.top_row)?; - if !self.forkers.is_empty() { - write!(f, "; forkers - {}", self.forkers)?; - } - Ok(()) +impl Display for UnitStoreStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "total units: {}, top row: {}", self.size, self.top_row) } } -/// A component for temporarily storing units before they are declared "legit" and sent -/// to the Terminal. We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html -/// Section 5.4 for a discussion of this component and the notion of "legit" units. - -pub(crate) struct UnitStore { - by_coord: HashMap>, - by_hash: HashMap>, - parents: HashMap>, - //the number of unique nodes that we hold units for a given round - is_forker: NodeSubset, - legit_buffer: Vec>, - max_round: Round, +/// Stores units and remembers the first instance of any unit with a specific coordinate inserted. +pub struct UnitStore { + by_hash: HashMap, U>, + by_coord: NodeMap>>, } -impl UnitStore { - pub(crate) fn new(n_nodes: NodeCount, max_round: Round) -> Self { +impl UnitStore { + /// Create a new unit store for the given number of nodes. + pub fn new(node_count: NodeCount) -> Self { + let mut by_coord = NodeMap::with_size(node_count); + for node_id in node_count.into_iterator() { + by_coord.insert(node_id, HashMap::new()); + } UnitStore { - by_coord: HashMap::new(), by_hash: HashMap::new(), - parents: HashMap::new(), - // is_forker is initialized with default values for bool, i.e., false - is_forker: NodeSubset::with_size(n_nodes), - legit_buffer: Vec::new(), - max_round, + by_coord, } } - pub fn get_status_of(&self, node: NodeIndex) -> UnitStoreStatus { - let n_nodes: NodeCount = self.is_forker.size().into(); - let gm = self - .by_coord - .keys() - .map(|c| (c.creator, c.round)) - .into_grouping_map(); - let top_row = NodeMap::from_hashmap(n_nodes, gm.clone().max()); - let first_missing_rounds = NodeMap::from_hashmap( - n_nodes, - gm.collect::>() - .into_iter() - .filter_map(|(id, rounds)| match top_row.get(id) { - Some(&row) => (0..row) - .position(|round| !rounds.contains(&round)) - .map(|round| (id, round as Round)), - None => None, - }) - .collect(), - ); - UnitStoreStatus::new( - node, - &self.is_forker, - self.by_coord.len(), - self.by_coord.keys().map(|k| k.round).max().unwrap_or(0), - top_row, - first_missing_rounds, - ) - } - - pub(crate) fn unit_by_coord(&self, coord: UnitCoord) -> Option<&SignedUnit> { - self.by_coord.get(&coord) + fn mut_hashes_by(&mut self, creator: NodeIndex) -> &mut HashMap> { + self.by_coord + .get_mut(creator) + .expect("all hashmaps initialized") } - pub(crate) fn unit_by_hash(&self, hash: &H::Hash) -> Option<&SignedUnit> { - self.by_hash.get(hash) - } - - pub(crate) fn contains_hash(&self, hash: &H::Hash) -> bool { - self.by_hash.contains_key(hash) + fn hashes_by(&self, creator: NodeIndex) -> &HashMap> { + self.by_coord + .get(creator) + .expect("all hashmaps initialized") } - pub(crate) fn contains_coord(&self, coord: &UnitCoord) -> bool { - self.by_coord.contains_key(coord) + // only call this for canonical units + fn canonical_by_hash(&self, hash: &HashFor) -> &U { + self.by_hash.get(hash).expect("we have all canonical units") } - pub(crate) fn newest_unit( - &self, - index: NodeIndex, - ) -> Option> { - Some( - self.by_coord - .values() - .filter(|su| su.as_signable().creator() == index) - .max_by_key(|su| su.as_signable().round())? - .clone() - .into_unchecked(), - ) - } - - // Outputs new legit units that are supposed to be sent to Consensus and empties the buffer. - pub(crate) fn yield_buffer_units(&mut self) -> Vec> { - std::mem::take(&mut self.legit_buffer) + /// Insert a unit. If no other unit with this coord is in the store it becomes canonical. + pub fn insert(&mut self, unit: U) { + let unit_hash = unit.hash(); + let unit_coord = unit.coord(); + if self.canonical_unit(unit_coord).is_none() { + self.mut_hashes_by(unit_coord.creator()) + .insert(unit.coord().round(), unit_hash); + } + self.by_hash.insert(unit_hash, unit); } - // Outputs None if this is not a newly-discovered fork or Some(sv) where (su, sv) form a fork - pub(crate) fn is_new_fork(&self, fu: &FullUnit) -> Option> { - if self.contains_hash(&fu.hash()) { - return None; + /// Remove a unit with a given hash. Notably if you remove a unit another might become canonical in its place in the future. + pub fn remove(&mut self, hash: &HashFor) { + if let Some(unit) = self.by_hash.remove(hash) { + let creator_hashes = self.mut_hashes_by(unit.creator()); + if creator_hashes.get(&unit.round()) == Some(&unit.hash()) { + creator_hashes.remove(&unit.round()); + } } - self.unit_by_coord(fu.coord()).cloned() } - pub(crate) fn is_forker(&self, node_id: NodeIndex) -> bool { - self.is_forker[node_id] + /// The canonical unit for the given coord if it exists. + pub fn canonical_unit(&self, coord: UnitCoord) -> Option<&U> { + self.hashes_by(coord.creator()) + .get(&coord.round()) + .map(|hash| self.canonical_by_hash(hash)) } - // Marks a node as a forker and outputs all units in store created by this node. - // The returned vector is sorted w.r.t. increasing rounds. - pub(crate) fn mark_forker(&mut self, forker: NodeIndex) -> Vec> { - if self.is_forker[forker] { - warn!(target: "AlephBFT-unit-store", "Trying to mark the node {:?} as forker for the second time.", forker); - } - self.is_forker.insert(forker); - (0..=self.max_round) - .filter_map(|r| self.unit_by_coord(UnitCoord::new(r, forker)).cloned()) - .collect() + /// All the canonical units for the given creator, in order of rounds. + pub fn canonical_units(&self, creator: NodeIndex) -> impl Iterator { + let canonical_hashes = self.hashes_by(creator); + let max_round = canonical_hashes.keys().max().cloned().unwrap_or(0); + (0..=max_round) + .filter_map(|round| canonical_hashes.get(&round)) + .map(|hash| self.canonical_by_hash(hash)) } - pub(crate) fn add_unit(&mut self, su: SignedUnit, alert: bool) { - let hash = su.as_signable().hash(); - let creator = su.as_signable().creator(); + /// The unit for the given hash, if present. + pub fn unit(&self, hash: &HashFor) -> Option<&U> { + self.by_hash.get(hash) + } - if alert { - trace!(target: "AlephBFT-unit-store", "Adding unit with alert {:?}.", su.as_signable()); - assert!( - self.is_forker[creator], - "The forker must be marked before adding alerted units." - ); - } - if self.contains_hash(&hash) { - // Ignoring a duplicate. - trace!(target: "AlephBFT-unit-store", "A unit ignored as a duplicate {:?}.", su.as_signable()); - return; + /// The status summary of this store. + pub fn status(&self) -> UnitStoreStatus { + let mut top_row = NodeMap::with_size(self.by_coord.size()); + for (creator, units) in self.by_coord.iter() { + if let Some(round) = units.keys().max() { + top_row.insert(creator, *round); + } } - self.by_hash.insert(hash, su.clone()); - self.by_coord.insert(su.as_signable().coord(), su.clone()); - - if alert || !self.is_forker[creator] { - self.legit_buffer.push(su); + UnitStoreStatus { + size: self.by_hash.len(), + top_row, } } - - pub(crate) fn add_parents(&mut self, hash: H::Hash, parents: Vec) { - self.parents.insert(hash, parents); - } - - pub(crate) fn get_parents(&mut self, hash: H::Hash) -> Option<&Vec> { - self.parents.get(&hash) - } } #[cfg(test)] -mod tests { +mod test { + use std::collections::HashSet; + use crate::{ - units::{ControlHash, FullUnit, PreUnit, SignedUnit, UnitCoord, UnitStore}, - NodeCount, NodeIndex, NodeMap, Round, Signed, + units::{random_full_parent_units_up_to, TestingFullUnit, Unit, UnitCoord, UnitStore}, + NodeCount, NodeIndex, }; - use aleph_bft_mock::{Data, Hasher64, Keychain}; - fn create_unit( - round: Round, - node_idx: NodeIndex, - count: NodeCount, - session_id: u64, - keychain: &Keychain, - ) -> SignedUnit { - let preunit = PreUnit::::new( - node_idx, - round, - ControlHash::new(&NodeMap::with_size(count)), - ); - let full_unit = FullUnit::new(preunit, Some(0), session_id); - Signed::sign(full_unit, keychain) + #[test] + fn empty_has_no_units() { + let node_count = NodeCount(7); + let store = UnitStore::::new(node_count); + assert!(store + .canonical_unit(UnitCoord::new(0, NodeIndex(0))) + .is_none()); + assert!(store.canonical_units(NodeIndex(0)).next().is_none()); } #[test] - fn mark_forker_restore_state() { - let n_nodes = NodeCount(10); - - let mut store = UnitStore::::new(n_nodes, 100); + fn single_unit_basic_operations() { + let node_count = NodeCount(7); + let mut store = UnitStore::new(node_count); + let unit = random_full_parent_units_up_to(0, node_count, 43) + .get(0) + .expect("we have the first round") + .get(0) + .expect("we have the initial unit for the zeroth creator") + .clone(); + store.insert(unit.clone()); + assert_eq!(store.unit(&unit.hash()), Some(&unit)); + assert_eq!(store.canonical_unit(unit.coord()), Some(&unit)); + { + // in block to drop the iterator + let mut canonical_units = store.canonical_units(unit.creator()); + assert_eq!(canonical_units.next(), Some(&unit)); + assert_eq!(canonical_units.next(), None); + } + store.remove(&unit.hash()); + assert_eq!(store.unit(&unit.hash()), None); + assert_eq!(store.canonical_unit(unit.coord()), None); + assert_eq!(store.canonical_units(unit.creator()).next(), None); + } - let keychains: Vec<_> = (0..=4) - .map(|i| Keychain::new(n_nodes, NodeIndex(i))) + #[test] + fn first_variant_is_canonical() { + let node_count = NodeCount(7); + let mut store = UnitStore::new(node_count); + // only unique variants + #[allow(clippy::mutable_key_type)] + let variants: HashSet<_> = (0..15) + .map(|_| { + random_full_parent_units_up_to(0, node_count, 43) + .get(0) + .expect("we have the first round") + .get(0) + .expect("we have the initial unit for the zeroth creator") + .clone() + }) .collect(); + let variants: Vec<_> = variants.into_iter().collect(); + for unit in &variants { + store.insert(unit.clone()); + } + for unit in &variants { + assert_eq!(store.unit(&unit.hash()), Some(unit)); + } + let canonical_unit = variants.get(0).expect("we have the unit").clone(); + assert_eq!( + store.canonical_unit(canonical_unit.coord()), + Some(&canonical_unit) + ); + { + // in block to drop the iterator + let mut canonical_units = store.canonical_units(canonical_unit.creator()); + assert_eq!(canonical_units.next(), Some(&canonical_unit)); + assert_eq!(canonical_units.next(), None); + } + store.remove(&canonical_unit.hash()); + assert_eq!(store.unit(&canonical_unit.hash()), None); + // we don't have a canonical unit any more + assert_eq!(store.canonical_unit(canonical_unit.coord()), None); + assert_eq!(store.canonical_units(canonical_unit.creator()).next(), None); + // we still have all this other units + for unit in variants.iter().skip(1) { + assert_eq!(store.unit(&unit.hash()), Some(unit)); + } + } - let mut forker_hashes = Vec::new(); - - for round in 0..4 { - for (i, keychain) in keychains.iter().enumerate() { - let unit = create_unit(round, NodeIndex(i), n_nodes, 0, keychain); - if i == 0 { - forker_hashes.push(unit.as_signable().hash()); - } - store.add_unit(unit, false); + #[test] + fn stores_lots_of_units() { + let node_count = NodeCount(7); + let mut store = UnitStore::new(node_count); + let max_round = 15; + let units = random_full_parent_units_up_to(max_round, node_count, 43); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); } } - - // Forker's units - for round in 4..7 { - let unit = create_unit(round, NodeIndex(0), n_nodes, 0, &keychains[0]); - forker_hashes.push(unit.as_signable().hash()); - store.add_unit(unit, false); + for round_units in &units { + for unit in round_units { + assert_eq!(store.unit(&unit.hash()), Some(unit)); + assert_eq!(store.canonical_unit(unit.coord()), Some(unit)); + } } - - let forker_units: Vec<_> = store - .mark_forker(NodeIndex(0)) - .iter() - .map(|unit| unit.clone().into_unchecked().as_signable().round()) - .collect(); - - assert_eq!(vec![0, 1, 2, 3, 4, 5, 6], forker_units); - assert!(store.is_forker[NodeIndex(0)]); - - // All rounds still have forker's units - for (round, hash) in forker_hashes[0..4].iter().enumerate() { - let round = round as Round; - let coord = UnitCoord::new(round, NodeIndex(0)); - assert!(store.by_coord.contains_key(&coord)); - assert!(store.by_hash.contains_key(hash)); + for node_id in node_count.into_iterator() { + let mut canonical_units = store.canonical_units(node_id); + for round in 0..=max_round { + assert_eq!( + canonical_units.next(), + Some(&units[round as usize][node_id.0]) + ); + } + assert_eq!(canonical_units.next(), None); } + } - assert!(store - .by_coord - .contains_key(&UnitCoord::new(4, NodeIndex(0)))); - assert!(store.by_hash.contains_key(&forker_hashes[4])); - - for (round, hash) in forker_hashes[5..7].iter().enumerate() { - let round = round as Round; - let round = round + 5; - let coord = UnitCoord::new(round, NodeIndex(0)); - assert!(store.by_coord.contains_key(&coord)); - assert!(store.by_hash.contains_key(hash)); + #[test] + fn handles_fragmented_canonical() { + let node_count = NodeCount(7); + let mut store = UnitStore::new(node_count); + let max_round = 15; + let units = random_full_parent_units_up_to(max_round, node_count, 43); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); + } + } + for round_units in &units { + for unit in round_units { + // remove some units with a weird criterion + if unit.round() as usize % (unit.creator().0 + 1) == 0 { + store.remove(&unit.hash()); + } + } + } + for node_id in node_count.into_iterator() { + let mut canonical_units = store.canonical_units(node_id); + for round in 0..=max_round { + if round as usize % (node_id.0 + 1) != 0 { + assert_eq!( + canonical_units.next(), + Some(&units[round as usize][node_id.0]) + ); + } + } + assert_eq!(canonical_units.next(), None); } } } diff --git a/consensus/src/units/testing.rs b/consensus/src/units/testing.rs index 2c7cdfc3..99fa56b5 100644 --- a/consensus/src/units/testing.rs +++ b/consensus/src/units/testing.rs @@ -2,18 +2,46 @@ use crate::{ creation::Creator as GenericCreator, units::{ ControlHash as GenericControlHash, FullUnit as GenericFullUnit, PreUnit as GenericPreUnit, - UncheckedSignedUnit as GenericUncheckedSignedUnit, Unit as GenericUnit, + SignedUnit as GenericSignedUnit, UncheckedSignedUnit as GenericUncheckedSignedUnit, Unit, + UnitCoord, WrappedUnit, }, - Hasher, NodeCount, NodeIndex, NodeMap, Round, SessionId, Signed, + NodeCount, NodeIndex, NodeMap, Round, SessionId, Signed, }; -use aleph_bft_mock::{Data, Hasher64, Keychain, Signature}; +use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, Signature}; type ControlHash = GenericControlHash; type Creator = GenericCreator; type PreUnit = GenericPreUnit; -type Unit = GenericUnit; -type FullUnit = GenericFullUnit; +pub type FullUnit = GenericFullUnit; type UncheckedSignedUnit = GenericUncheckedSignedUnit; +pub type SignedUnit = GenericSignedUnit; + +#[derive(Clone)] +pub struct WrappedSignedUnit(pub SignedUnit); + +impl Unit for WrappedSignedUnit { + type Hasher = Hasher64; + + fn hash(&self) -> Hash64 { + self.0.hash() + } + + fn coord(&self) -> UnitCoord { + self.0.coord() + } + + fn control_hash(&self) -> &ControlHash { + self.0.control_hash() + } +} + +impl WrappedUnit for WrappedSignedUnit { + type Wrapped = SignedUnit; + + fn unpack(self) -> Self::Wrapped { + self.0 + } +} pub fn creator_set(n_members: NodeCount) -> Vec { (0..n_members.0) @@ -24,22 +52,18 @@ pub fn creator_set(n_members: NodeCount) -> Vec { pub fn create_preunits<'a, C: Iterator>( creators: C, round: Round, -) -> Vec<(PreUnit, Vec<::Hash>)> { +) -> Vec { creators .map(|c| c.create_unit(round).expect("Creation should succeed.")) .collect() } -fn preunit_to_full_unit(preunit: PreUnit, session_id: SessionId) -> FullUnit { +pub fn preunit_to_full_unit(preunit: PreUnit, session_id: SessionId) -> FullUnit { FullUnit::new(preunit, rand::random(), session_id) } -pub fn preunit_to_unit(preunit: PreUnit, session_id: SessionId) -> Unit { - preunit_to_full_unit(preunit, session_id).unit() -} - impl Creator { - pub fn add_units(&mut self, units: &[Unit]) { + pub fn add_units>(&mut self, units: &[U]) { for unit in units { self.add_unit(unit); } diff --git a/consensus/src/units/validator.rs b/consensus/src/units/validator.rs index 545b1de5..53c9cebd 100644 --- a/consensus/src/units/validator.rs +++ b/consensus/src/units/validator.rs @@ -1,6 +1,7 @@ use crate::{ - units::{ControlHash, FullUnit, PreUnit, SignedUnit, UncheckedSignedUnit}, - Data, Hasher, Keychain, NodeCount, NodeMap, Round, SessionId, Signature, SignatureError, + units::{ControlHash, FullUnit, PreUnit, SignedUnit, UncheckedSignedUnit, Unit}, + Data, Hasher, Keychain, NodeCount, NodeIndex, NodeMap, Round, SessionId, Signature, + SignatureError, }; use std::{ fmt::{Display, Formatter, Result as FmtResult}, @@ -79,6 +80,14 @@ impl Validator { } } + pub fn node_count(&self) -> NodeCount { + self.keychain.node_count() + } + + pub fn index(&self) -> NodeIndex { + self.keychain.index() + } + pub fn validate_unit( &self, uu: UncheckedSignedUnit, diff --git a/consensus/src/validation.rs b/consensus/src/validation.rs new file mode 100644 index 00000000..bc0ba262 --- /dev/null +++ b/consensus/src/validation.rs @@ -0,0 +1,483 @@ +use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; + +use crate::{ + alerts::{Alert, ForkingNotification}, + units::{ + SignedUnit, UncheckedSignedUnit, Unit, UnitStore, UnitStoreStatus, ValidationError, + Validator as UnitValidator, WrappedUnit, + }, + Data, Hasher, MultiKeychain, NodeIndex, NodeSubset, Round, +}; + +/// What can go wrong when validating a unit. +#[derive(Eq, PartialEq)] +pub enum Error { + Invalid(ValidationError), + Duplicate(SignedUnit), + Uncommitted(SignedUnit), + NewForker(Box>), +} + +impl Debug for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + use Error::*; + match self { + Invalid(e) => write!(f, "Invalid({:?})", e), + Duplicate(u) => write!(f, "Duplicate({:?})", u.clone().into_unchecked()), + Uncommitted(u) => write!(f, "Uncommitted({:?})", u.clone().into_unchecked()), + NewForker(a) => write!(f, "NewForker({:?})", a), + } + } +} + +impl From> + for Error +{ + fn from(e: ValidationError) -> Self { + Error::Invalid(e) + } +} + +/// The summary status of the validator. +pub struct ValidatorStatus { + processing_units: UnitStoreStatus, + known_forkers: NodeSubset, +} + +impl ValidatorStatus { + /// The highest round among the units that are currently processing. + pub fn top_round(&self) -> Round { + self.processing_units.top_round() + } +} + +impl Display for ValidatorStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!( + f, + "processing units: ({}), forkers: {}", + self.processing_units, self.known_forkers + ) + } +} + +type ValidatorResult = Result, Error>; + +/// A validator that checks basic properties of units and catches forks. +pub struct Validator { + unit_validator: UnitValidator, + processing_units: UnitStore>, + known_forkers: NodeSubset, +} + +impl Validator { + /// A new validator using the provided unit validator under the hood. + pub fn new(unit_validator: UnitValidator) -> Self { + let node_count = unit_validator.node_count(); + Validator { + unit_validator, + processing_units: UnitStore::new(node_count), + known_forkers: NodeSubset::with_size(node_count), + } + } + + fn is_forker(&self, node_id: NodeIndex) -> bool { + self.known_forkers[node_id] + } + + fn mark_forker>>( + &mut self, + forker: NodeIndex, + store: &UnitStore, + ) -> Vec> { + assert!(!self.is_forker(forker), "we shouldn't mark a forker twice"); + self.known_forkers.insert(forker); + store + .canonical_units(forker) + .cloned() + .map(WrappedUnit::unpack) + // In principle we can have "canonical" processing units that are forks of store canonical units, + // but only after we already marked a node as a forker, so not yet. + // Also note that these units can be from different branches and we still commit to them here. + // This is somewhat confusing, but not a problem for any theoretical guarantees. + .chain(self.processing_units.canonical_units(forker).cloned()) + .map(|unit| unit.into_unchecked()) + .collect() + } + + fn pre_validate>>( + &mut self, + unit: UncheckedSignedUnit, + store: &UnitStore, + ) -> ValidatorResult { + let unit = self.unit_validator.validate_unit(unit)?; + let unit_hash = unit.as_signable().hash(); + if store.unit(&unit_hash).is_some() || self.processing_units.unit(&unit_hash).is_some() { + return Err(Error::Duplicate(unit)); + } + Ok(unit) + } + + /// Validate an incoming unit. + pub fn validate>>( + &mut self, + unit: UncheckedSignedUnit, + store: &UnitStore, + ) -> ValidatorResult { + use Error::*; + let unit = self.pre_validate(unit, store)?; + let unit_coord = unit.as_signable().coord(); + if self.is_forker(unit_coord.creator()) { + return Err(Uncommitted(unit)); + } + if let Some(canonical_unit) = store + .canonical_unit(unit_coord) + .map(|unit| unit.clone().unpack()) + .or(self.processing_units.canonical_unit(unit_coord).cloned()) + { + let proof = (canonical_unit.into(), unit.into()); + let committed_units = self.mark_forker(unit_coord.creator(), store); + return Err(NewForker(Box::new(Alert::new( + self.unit_validator.index(), + proof, + committed_units, + )))); + } + self.processing_units.insert(unit.clone()); + Ok(unit) + } + + fn validate_committed>>( + &mut self, + unit: UncheckedSignedUnit, + store: &UnitStore, + ) -> ValidatorResult { + let unit = self.pre_validate(unit, store)?; + assert!( + self.is_forker(unit.creator()), + "We should only receive committed units for known forkers." + ); + self.processing_units.insert(unit.clone()); + Ok(unit) + } + + /// Process a forking notification, potentially returning a lot of unit processing results. + pub fn process_forking_notification>>( + &mut self, + notification: ForkingNotification, + store: &UnitStore, + ) -> Vec> { + use ForkingNotification::*; + match notification { + Forker((unit, other_unit)) => { + // Just treat them as normal incoming units, if they are a forking proof + // this will either trigger a new forker or we already knew about this one. + vec![self.validate(unit, store), self.validate(other_unit, store)] + } + Units(units) => units + .into_iter() + .map(|unit| self.validate_committed(unit, store)) + .collect(), + } + } + + /// Signal that a unit finished processing and thus it's copy no longer has to be kept for fork detection. + /// NOTE: This is only a memory optimization, if the units stay there forever everything still works. + pub fn finished_processing(&mut self, unit: &H::Hash) { + self.processing_units.remove(unit) + } + + /// The status summary of this validator. + pub fn status(&self) -> ValidatorStatus { + ValidatorStatus { + processing_units: self.processing_units.status(), + known_forkers: self.known_forkers.clone(), + } + } +} + +#[cfg(test)] +mod test { + use crate::{ + alerts::ForkingNotification, + units::{ + random_full_parent_units_up_to, Unit, UnitStore, Validator as UnitValidator, + WrappedSignedUnit, + }, + validation::{Error, Validator}, + NodeCount, NodeIndex, Signed, + }; + use aleph_bft_mock::Keychain; + + #[test] + fn validates_trivially_correct() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + for unit in random_full_parent_units_up_to(4, node_count, session_id) + .iter() + .flatten() + .map(|unit| Signed::sign(unit.clone(), &keychains[unit.creator().0])) + { + assert_eq!( + validator.validate(unit.clone().into(), &store), + Ok(unit.clone()) + ); + } + } + + #[test] + fn refuses_processing_duplicates() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + let unit = random_full_parent_units_up_to(0, node_count, session_id) + .get(0) + .expect("we have the first round") + .get(0) + .expect("we have the initial unit for the zeroth creator") + .clone(); + let unit = Signed::sign(unit, &keychains[0]); + assert_eq!( + validator.validate(unit.clone().into(), &store), + Ok(unit.clone()) + ); + assert_eq!( + validator.validate(unit.clone().into(), &store), + Err(Error::Duplicate(unit.clone())) + ); + } + + #[test] + fn refuses_external_duplicates() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let mut store = UnitStore::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + let unit = random_full_parent_units_up_to(0, node_count, session_id) + .get(0) + .expect("we have the first round") + .get(0) + .expect("we have the initial unit for the zeroth creator") + .clone(); + let unit = Signed::sign(unit, &keychains[0]); + store.insert(WrappedSignedUnit(unit.clone())); + assert_eq!( + validator.validate(unit.clone().into(), &store), + Err(Error::Duplicate(unit.clone())) + ); + } + + #[test] + fn detects_processing_fork() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let produced_round = 4; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + for unit in random_full_parent_units_up_to(produced_round, node_count, session_id) + .iter() + .flatten() + .map(|unit| Signed::sign(unit.clone(), &keychains[unit.creator().0])) + { + assert_eq!( + validator.validate(unit.clone().into(), &store), + Ok(unit.clone()) + ); + } + let fork = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(0) + .expect("we have the unit for the zeroth creator") + .clone(); + let fork = Signed::sign(fork, &keychains[0]); + assert!(matches!( + validator.validate(fork.clone().into(), &store), + Err(Error::NewForker(_)) + )); + } + + #[test] + fn detects_external_fork() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let produced_round = 4; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let mut store = UnitStore::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + for unit in random_full_parent_units_up_to(produced_round, node_count, session_id) + .iter() + .flatten() + .map(|unit| Signed::sign(unit.clone(), &keychains[unit.creator().0])) + { + store.insert(WrappedSignedUnit(unit)); + } + let fork = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(0) + .expect("we have the unit for the zeroth creator") + .clone(); + let fork = Signed::sign(fork, &keychains[0]); + assert!(matches!( + validator.validate(fork.clone().into(), &store), + Err(Error::NewForker(_)) + )); + } + + #[test] + fn refuses_uncommitted() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let produced_round = 4; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + let fork = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(0) + .expect("we have the unit for the zeroth creator") + .clone(); + let fork = Signed::sign(fork, &keychains[0]); + for unit in random_full_parent_units_up_to(produced_round, node_count, session_id) + .iter() + .flatten() + .filter(|unit| unit.creator() == NodeIndex(0)) + .map(|unit| Signed::sign(unit.clone(), &keychains[unit.creator().0])) + { + match unit.round() { + 0..=1 => assert_eq!( + validator.validate(unit.clone().into(), &store), + Ok(unit.clone()) + ), + 2 => { + assert_eq!( + validator.validate(unit.clone().into(), &store), + Ok(unit.clone()) + ); + assert!(matches!( + validator.validate(fork.clone().into(), &store), + Err(Error::NewForker(_)) + )) + } + 3.. => assert_eq!( + validator.validate(unit.clone().into(), &store), + Err(Error::Uncommitted(unit.clone())) + ), + } + } + } + + #[test] + fn detects_fork_through_notification() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + let unit = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(0) + .expect("we have the unit for the zeroth creator") + .clone(); + let unit = Signed::sign(unit, &keychains[0]); + let fork = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(0) + .expect("we have the unit for the zeroth creator") + .clone(); + let fork = Signed::sign(fork, &keychains[0]); + let results = validator.process_forking_notification( + ForkingNotification::Forker((unit.clone().into(), fork.into())), + &store, + ); + assert_eq!(results.len(), 2); + assert_eq!(results[0], Ok(unit.clone())); + assert!(matches!(results[1], Err(Error::NewForker(_)))); + } + + #[test] + fn accepts_committed() { + let node_count = NodeCount(7); + let session_id = 0; + let max_round = 2137; + let produced_round = 4; + let keychains: Vec<_> = node_count + .into_iterator() + .map(|node_id| Keychain::new(node_count, node_id)) + .collect(); + let store = UnitStore::::new(node_count); + let mut validator = Validator::new(UnitValidator::new(session_id, keychains[0], max_round)); + let fork = random_full_parent_units_up_to(2, node_count, session_id) + .get(2) + .expect("we have the requested round") + .get(0) + .expect("we have the unit for the zeroth creator") + .clone(); + let fork = Signed::sign(fork, &keychains[0]); + let units: Vec<_> = random_full_parent_units_up_to(produced_round, node_count, session_id) + .iter() + .flatten() + .filter(|unit| unit.creator() == NodeIndex(0)) + .map(|unit| Signed::sign(unit.clone(), &keychains[unit.creator().0])) + .collect(); + for unit in units.iter().take(3) { + assert_eq!( + validator.validate(unit.clone().into(), &store), + Ok(unit.clone()) + ); + } + assert!(matches!( + validator.validate(fork.clone().into(), &store), + Err(Error::NewForker(_)) + )); + let results = validator.process_forking_notification( + ForkingNotification::Units(units.clone().into_iter().map(|unit| unit.into()).collect()), + &store, + ); + for (unit, result) in units.iter().zip(results.iter()).take(3) { + assert_eq!(result, &Err(Error::Duplicate(unit.clone()))); + } + for (unit, result) in units.iter().zip(results.iter()).skip(3) { + assert_eq!(result, &Ok(unit.clone())); + } + } +} diff --git a/docs/src/internals.md b/docs/src/internals.md index 9801bf50..d7ceca79 100644 --- a/docs/src/internals.md +++ b/docs/src/internals.md @@ -4,20 +4,20 @@ To explain the inner workings of AlephBFT it is instructive to follow the path o 1. The unit is created by one of the node's `Creator` component -- implemented in `creation/`. Creator sends the produced unit to `runway/`, which then sends it to `member.rs`. 2. A recurring task of broadcasting this unit is put in the task queue. The unit will be broadcast to all other nodes a few times (with some delays in between). -3. The unit is received by another node -- happens in `member.rs` and immediately send to `runway/` where it passes some validation (signature checks etc.). If all these checks pass and the unit is not detected to be a fork, then it is placed in the `UnitStore` -- the `store` field of the `Runway` struct. -4. The idea is that this store keeps only **legit units** in the sense defined in [the section on alerts](how_alephbft_does_it.md#25-alerts----dealing-with-fork-spam). Thus no fork is ever be put there unless coming from an alert. -5. At a suitable moment the units from the store are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `reconstruction/parents.rs`. -6. Each unit whose parents are successfully decoded, is added to the "Dag". Each unit in the Dag is legit + has all its parents in the Dag. This is ensured by the implementation in `reconstruction/dag.rs`. -7. Dag units are passed to a component called the `Extender` -- see the files in `extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). -8. Once a unit's data is placed in one of batches by the `Extender` then its path is over and can be safely discarded. +3. The unit is received by another node -- happens in `member.rs` and immediately send to `runway/` for further processing in `validation.rs`. +4. Validation checks signatures and basic unit properties, plus catches forks. This means that only **legit units**, in the sense defined in [the section on alerts](how_alephbft_does_it.md#25-alerts----dealing-with-fork-spam), are sent further. Thus no fork is ever passed on unless coming from an alert. +5. The units are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `reconstruction/parents.rs`. +6. Each unit whose parents are successfully decoded, is passed on to `reconstruction/dag.rs`, which ensures that units are passed on only when their parents already were. They are then put in a store in `runway/`. Each unit in the store is legit + has all its parents in the store. +7. Such units are passed to a component called the `Extender` -- see the files in `extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). +8. Once a unit's data is placed in one of batches by the `Extender` then its path is over, although we keep it in the runway store to be able to send it to other nodes on request. ### 5.1 Creator The creator produces units according to the AlephBFT protocol rules. It will wait until the prespecified delay has passed and attempt to create a unit using a maximal number of parents. If it is not possible yet, it will wait till the first moment enough parents are available. After creating the last unit, the creator stops producing new ones, although this is never expected to happen during correct execution. -### 5.2 Unit Store in Runway +### 5.2 Validation -As mentioned, the idea is that this stores only legit units and passes them to the reconstructing component. In case a fork is detected by a node `i`, all `i`'s units are attached to the appropriate alert. +The validation process consists of checking basic properties of units (correct number of parents, correct session etc.), the signatures, and whether the unit is a fork based on the units that the node either already has or at least started processing. As mentioned, the idea is that only legit units are passed to the reconstructing component. In case a fork by a node `i` is detected, all of `i`'s units are attached to the appropriate alert, so that other nodes can accept them as legit. ### 5.3 Reconstruction @@ -36,7 +36,7 @@ In any case the reconstruction triggers a request to `Member` to download the fu #### 5.3.2 Dag -The units parents might, for many reasons, not be reconstructed in an order agreeing with the Dag order, i.e. some of their ancestors might not yet be reconstructed. The Dag component ensures that units are only added to the Dag after their parents were already added, and thus any units emitted by this component are in an order agreeing with the Dag order. +The units parents might, for many reasons, not be reconstructed in an order agreeing with the Dag order, i.e. some of their ancestors might not yet be reconstructed. The Dag component ensures that units are only added to the store after their parents were already added, and thus any units emitted by this component are in an order agreeing with the Dag order. ### 5.4 Extender