From ffbcaff65c9c081a1a603a6265854a5eab5e9548 Mon Sep 17 00:00:00 2001 From: timorleph Date: Fri, 12 Apr 2024 10:06:58 +0200 Subject: [PATCH 1/2] Make extender synchronous --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- consensus/src/dag/reconstruction/dag.rs | 28 ++- consensus/src/dag/reconstruction/mod.rs | 34 ++-- consensus/src/dag/reconstruction/parents.rs | 2 +- consensus/src/extension/election.rs | 163 +++++++++-------- consensus/src/extension/extender.rs | 78 ++------ consensus/src/extension/mod.rs | 189 +++----------------- consensus/src/extension/units.rs | 86 ++++----- consensus/src/runway/mod.rs | 69 +------ consensus/src/testing/dag.rs | 142 ++++++--------- consensus/src/units/mod.rs | 24 ++- consensus/src/units/testing.rs | 58 +++++- 13 files changed, 336 insertions(+), 541 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d23cb39e..d45641dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.36.4" +version = "0.36.5" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 5bd5a906..0cc2d57a 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.36.4" +version = "0.36.5" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/dag/reconstruction/dag.rs b/consensus/src/dag/reconstruction/dag.rs index 40871bf8..c22159c0 100644 --- a/consensus/src/dag/reconstruction/dag.rs +++ b/consensus/src/dag/reconstruction/dag.rs @@ -1,20 +1,14 @@ -use crate::{ - dag::reconstruction::ReconstructedUnit, - units::{HashFor, Unit}, -}; +use crate::units::{HashFor, UnitWithParents}; use std::collections::{HashMap, HashSet, VecDeque}; -struct OrphanedUnit { - unit: ReconstructedUnit, +struct OrphanedUnit { + unit: U, missing_parents: HashSet>, } -impl OrphanedUnit { +impl OrphanedUnit { /// If there are no missing parents then returns just the internal unit. - pub fn new( - unit: ReconstructedUnit, - missing_parents: HashSet>, - ) -> Result> { + pub fn new(unit: U, missing_parents: HashSet>) -> Result { match missing_parents.is_empty() { true => Err(unit), false => Ok(OrphanedUnit { @@ -24,8 +18,8 @@ impl OrphanedUnit { } } - /// If this was the last missing parent return the reconstructed unit. - pub fn resolve_parent(self, parent: HashFor) -> Result, Self> { + /// If this was the last missing parent return the unit. + pub fn resolve_parent(self, parent: HashFor) -> Result { let OrphanedUnit { unit, mut missing_parents, @@ -53,13 +47,13 @@ impl OrphanedUnit { /// A structure ensuring that units added to it are output in an order /// in agreement with the DAG order. -pub struct Dag { +pub struct Dag { orphaned_units: HashMap, OrphanedUnit>, waiting_for: HashMap, Vec>>, dag_units: HashSet>, } -impl Dag { +impl Dag { /// Create a new empty DAG. pub fn new() -> Self { Dag { @@ -69,7 +63,7 @@ impl Dag { } } - fn move_to_dag(&mut self, unit: ReconstructedUnit) -> Vec> { + fn move_to_dag(&mut self, unit: U) -> Vec { let mut result = Vec::new(); let mut ready_units = VecDeque::from([unit]); while let Some(unit) = ready_units.pop_front() { @@ -95,7 +89,7 @@ impl Dag { /// Add a unit to the Dag. Returns all the units that now have all their parents in the Dag, /// in an order agreeing with the Dag structure. - pub fn add_unit(&mut self, unit: ReconstructedUnit) -> Vec> { + pub fn add_unit(&mut self, unit: U) -> Vec { if self.dag_units.contains(&unit.hash()) { // Deduplicate. return Vec::new(); diff --git a/consensus/src/dag/reconstruction/mod.rs b/consensus/src/dag/reconstruction/mod.rs index 9123bdab..09bb1ea5 100644 --- a/consensus/src/dag/reconstruction/mod.rs +++ b/consensus/src/dag/reconstruction/mod.rs @@ -1,9 +1,8 @@ use std::collections::HashMap; use crate::{ - extension::ExtenderUnit, - units::{ControlHash, HashFor, Unit, UnitCoord, WrappedUnit}, - Hasher, NodeMap, + units::{ControlHash, HashFor, Unit, UnitCoord, UnitWithParents, WrappedUnit}, + Hasher, NodeMap, SessionId, }; mod dag; @@ -41,21 +40,6 @@ impl ReconstructedUnit { parents: NodeMap::with_size(n_members), } } - - /// The reconstructed parents, guaranteed to be correct. - pub fn parents(&self) -> &NodeMap> { - &self.parents - } - - /// Create an extender unit from this one. - pub fn extender_unit(&self) -> ExtenderUnit { - ExtenderUnit::new( - self.unit.creator(), - self.unit.round(), - self.hash(), - self.parents.clone(), - ) - } } impl Unit for ReconstructedUnit { @@ -72,6 +56,10 @@ impl Unit for ReconstructedUnit { fn control_hash(&self) -> &ControlHash { self.unit.control_hash() } + + fn session_id(&self) -> SessionId { + self.unit.session_id() + } } impl WrappedUnit for ReconstructedUnit { @@ -82,6 +70,12 @@ impl WrappedUnit for ReconstructedUnit { } } +impl UnitWithParents for ReconstructedUnit { + fn parents(&self) -> &NodeMap> { + &self.parents + } +} + /// What we need to request to reconstruct units. #[derive(Debug, PartialEq, Eq)] pub enum Request { @@ -148,7 +142,7 @@ impl ReconstructionResult { /// it eventually outputs versions with explicit parents in an order conforming to the Dag order. pub struct Reconstruction { parents: ParentReconstruction, - dag: Dag, + dag: Dag>, } impl Reconstruction { @@ -194,7 +188,7 @@ mod test { use crate::{ dag::reconstruction::{ReconstructedUnit, Reconstruction, ReconstructionResult, Request}, - units::{random_full_parent_units_up_to, Unit, UnitCoord}, + units::{random_full_parent_units_up_to, Unit, UnitCoord, UnitWithParents}, NodeCount, NodeIndex, }; diff --git a/consensus/src/dag/reconstruction/parents.rs b/consensus/src/dag/reconstruction/parents.rs index b1bed339..8fb6fcd5 100644 --- a/consensus/src/dag/reconstruction/parents.rs +++ b/consensus/src/dag/reconstruction/parents.rs @@ -221,7 +221,7 @@ mod test { dag::reconstruction::{ parents::Reconstruction, ReconstructedUnit, ReconstructionResult, Request, }, - units::{random_full_parent_units_up_to, Unit, UnitCoord}, + units::{random_full_parent_units_up_to, Unit, UnitCoord, UnitWithParents}, NodeCount, NodeIndex, }; diff --git a/consensus/src/extension/election.rs b/consensus/src/extension/election.rs index 501d499c..fca1e225 100644 --- a/consensus/src/extension/election.rs +++ b/consensus/src/extension/election.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use crate::{ - extension::{units::Units, ExtenderUnit}, + extension::units::Units, + units::{HashFor, UnitWithParents}, Hasher, NodeCount, NodeIndex, NodeMap, Round, }; @@ -22,25 +23,25 @@ enum CandidateOutcome { ElectionDone(H::Hash), } -struct CandidateElection { +struct CandidateElection { round: Round, candidate_creator: NodeIndex, - candidate_hash: H::Hash, - votes: HashMap, + candidate_hash: HashFor, + votes: HashMap, bool>, } -impl CandidateElection { +impl CandidateElection { /// Creates an election for the given candidate. /// The candidate will eventually either get elected or eliminated. /// Might immediately return an outcome. pub fn for_candidate( - candidate: &ExtenderUnit, - units: &Units, - ) -> Result> { + candidate: &U, + units: &Units, + ) -> Result> { CandidateElection { - round: candidate.round, - candidate_creator: candidate.creator, - candidate_hash: candidate.hash, + round: candidate.round(), + candidate_creator: candidate.creator(), + candidate_hash: candidate.hash(), votes: HashMap::new(), } .compute_votes(units) @@ -48,8 +49,8 @@ impl CandidateElection { fn parent_votes( &mut self, - parents: &NodeMap, - ) -> Result<(NodeCount, NodeCount), CandidateOutcome> { + parents: &NodeMap>, + ) -> Result<(NodeCount, NodeCount), CandidateOutcome> { let (mut votes_for, mut votes_against) = (NodeCount(0), NodeCount(0)); for parent in parents.values() { match self.votes.get(parent).expect("units are added in order") { @@ -62,9 +63,9 @@ impl CandidateElection { fn vote_from_parents( &mut self, - parents: &NodeMap, + parents: &NodeMap>, relative_round: Round, - ) -> Result> { + ) -> Result> { use CandidateOutcome::*; let threshold = parents.size().consensus_threshold(); // Gather parents' votes. @@ -91,28 +92,28 @@ impl CandidateElection { }) } - fn vote(&mut self, voter: &ExtenderUnit) -> Result<(), CandidateOutcome> { + fn vote(&mut self, voter: &U) -> Result<(), CandidateOutcome> { // If the vote is already computed we are done. - if self.votes.get(&voter.hash).is_some() { + if self.votes.get(&voter.hash()).is_some() { return Ok(()); } // Votes for old units are never used, so we just return. - if voter.round <= self.round { + if voter.round() <= self.round { return Ok(()); } - let relative_round = voter.round - self.round; + let relative_round = voter.round() - self.round; let vote = match relative_round { 0 => unreachable!("just checked that voter and election rounds are not equal"), // Direct descendands vote for, all other units of that round against. - 1 => voter.parents.get(self.candidate_creator) == Some(&self.candidate_hash), + 1 => voter.parents().get(self.candidate_creator) == Some(&self.candidate_hash), // Otherwise we compute the vote based on the parents' votes. - _ => self.vote_from_parents(&voter.parents, relative_round)?, + _ => self.vote_from_parents(voter.parents(), relative_round)?, }; - self.votes.insert(voter.hash, vote); + self.votes.insert(voter.hash(), vote); Ok(()) } - fn compute_votes(mut self, units: &Units) -> Result> { + fn compute_votes(mut self, units: &Units) -> Result> { for round in self.round + 1..=units.highest_round() { for voter in units.in_round(round).expect("units are added in order") { self.vote(voter)?; @@ -123,32 +124,32 @@ impl CandidateElection { /// Add a single voter and compute their vote. This might end up electing or eliminating the candidate. /// Might panic if called for a unit before its parents. - pub fn add_voter(mut self, voter: &ExtenderUnit) -> Result> { + pub fn add_voter(mut self, voter: &U) -> Result> { self.vote(voter).map(|()| self) } } /// Election for a single round. -pub struct RoundElection { - // Remaining candidates for this round's head, in reverese order. - candidates: Vec, - voting: CandidateElection, +pub struct RoundElection { + // Remaining candidates for this round's head, in reverse order. + candidates: Vec>, + voting: CandidateElection, } /// An election result. -pub enum ElectionResult { +pub enum ElectionResult { /// The election is not done yet. - Pending(RoundElection), + Pending(RoundElection), /// The head has been elected. - Elected(H::Hash), + Elected(HashFor), } -impl RoundElection { +impl RoundElection { /// Create a new round election. It might immediately be decided, so this might return an election result rather than a pending election. /// Returns an error when it's too early to finalize the candidate list, i.e. we are not at least 3 rounds ahead of the election round. /// /// Note: it is crucial that units are added to `Units` only when all their parents are there, otherwise this might panic. - pub fn for_round(round: Round, units: &Units) -> Result, ()> { + pub fn for_round(round: Round, units: &Units) -> Result, ()> { // If we don't yet have a unit of round + 3 we might not know about the winning candidate, so we cannot start the election. if units.highest_round() < round + 3 { return Err(()); @@ -159,7 +160,7 @@ impl RoundElection { .in_round(round) .expect("units come in order, so we definitely have units from this round") .iter() - .map(|candidate| candidate.hash) + .map(|candidate| candidate.hash()) .collect(); candidates.sort(); // We will be `pop`ing the candidates from the back. @@ -175,10 +176,10 @@ impl RoundElection { } fn handle_candidate_election_result( - result: Result, CandidateOutcome>, - mut candidates: Vec, - units: &Units, - ) -> ElectionResult { + result: Result, CandidateOutcome>, + mut candidates: Vec>, + units: &Units, + ) -> ElectionResult { use CandidateOutcome::*; use ElectionResult::*; match result { @@ -202,7 +203,7 @@ impl RoundElection { /// Add a single voter to the election. /// Might panic if not all parents were added previously. - pub fn add_voter(self, voter: &ExtenderUnit, units: &Units) -> ElectionResult { + pub fn add_voter(self, voter: &U, units: &Units) -> ElectionResult { let RoundElection { candidates, voting } = self; Self::handle_candidate_election_result(voting.add_voter(voter), candidates, units) } @@ -213,16 +214,18 @@ mod test { use crate::{ extension::{ election::{ElectionResult, RoundElection}, - tests::{construct_unit, construct_unit_all_parents}, units::Units, }, - NodeCount, NodeIndex, + units::{ + random_full_parent_reconstrusted_units_up_to, random_reconstructed_unit_with_parents, + TestingDagUnit, Unit, + }, + NodeCount, }; - use aleph_bft_mock::Hasher64; #[test] fn refuses_to_elect_without_units() { - let units = Units::::new(); + let units = Units::::new(); assert!(RoundElection::for_round(0, &units).is_err()); } @@ -231,9 +234,12 @@ mod test { let mut units = Units::new(); let n_members = NodeCount(4); let max_round = 2; - for round in 0..=max_round { - for creator in n_members.into_iterator() { - units.add_unit(construct_unit_all_parents(creator, round, n_members)); + let session_id = 2137; + for round_units in + random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) + { + for unit in round_units { + units.add_unit(unit); } } assert!(RoundElection::for_round(0, &units).is_err()); @@ -244,10 +250,12 @@ mod test { use ElectionResult::*; let mut units = Units::new(); let n_members = NodeCount(4); - let max_round = 3; - for round in 0..=max_round { - for creator in n_members.into_iterator() { - units.add_unit(construct_unit_all_parents(creator, round, n_members)); + let max_round = 4; + let session_id = 2137; + let dag = random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id); + for round_units in dag.iter().take(4) { + for unit in round_units { + units.add_unit(unit.clone()); } } let election = RoundElection::for_round(0, &units).expect("we have enough rounds"); @@ -255,12 +263,12 @@ mod test { Pending(election) => election, Elected(_) => panic!("elected head without units of round + 4"), }; - let last_voter = construct_unit_all_parents(NodeIndex(0), 4, n_members); + let last_voter = dag[4].last().expect("created all units").clone(); units.add_unit(last_voter.clone()); match election.add_voter(&last_voter, &units) { Pending(_) => panic!("failed to elect obvious head"), Elected(head) => { - assert_eq!(units.get(&head).expect("we have the head").round, 0); + assert_eq!(units.get(&head).expect("we have the head").round(), 0); } } } @@ -271,16 +279,19 @@ mod test { let mut units = Units::new(); let n_members = NodeCount(4); let max_round = 4; - for round in 0..=max_round { - for creator in n_members.into_iterator() { - units.add_unit(construct_unit_all_parents(creator, round, n_members)); + let session_id = 2137; + for round_units in + random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) + { + for unit in round_units { + units.add_unit(unit.clone()); } } let election = RoundElection::for_round(0, &units).expect("we have enough rounds"); match election { Pending(_) => panic!("should have elected"), Elected(head) => { - assert_eq!(units.get(&head).expect("we have the head").round, 0); + assert_eq!(units.get(&head).expect("we have the head").round(), 0); } } } @@ -291,35 +302,37 @@ mod test { let mut units = Units::new(); let n_members = NodeCount(4); let max_round = 4; - for creator in n_members.into_iterator() { - units.add_unit(construct_unit_all_parents(creator, 0, n_members)); + let session_id = 2137; + for unit in random_full_parent_reconstrusted_units_up_to(0, n_members, session_id) + .last() + .expect("just created") + { + units.add_unit(unit.clone()); } let mut candidate_hashes: Vec<_> = units .in_round(0) .expect("just added these") .iter() - .map(|candidate| candidate.hash) + .map(|candidate| candidate.hash()) .collect(); candidate_hashes.sort(); - let skipped_parent = units + let inactive_node = units .get(&candidate_hashes[0]) .expect("we just got it") - .creator; - let active_nodes: Vec<_> = n_members - .into_iterator() - .filter(|parent_id| parent_id != &skipped_parent) - .collect(); + .creator(); for round in 1..=max_round { - for creator in &active_nodes { - units.add_unit(construct_unit( - *creator, - round, - active_nodes - .iter() - .map(|parent_id| (*parent_id, round - 1)) - .collect(), - n_members, - )); + let parents: Vec = units + .in_round(round - 1) + .expect("created in order") + .into_iter() + .filter(|unit| unit.creator() != inactive_node) + .cloned() + .collect(); + for creator in n_members + .into_iterator() + .filter(|node_id| node_id != &inactive_node) + { + units.add_unit(random_reconstructed_unit_with_parents(creator, &parents)); } } let election = RoundElection::for_round(0, &units).expect("we have enough rounds"); diff --git a/consensus/src/extension/extender.rs b/consensus/src/extension/extender.rs index f4e95564..222f15f6 100644 --- a/consensus/src/extension/extender.rs +++ b/consensus/src/extension/extender.rs @@ -2,18 +2,18 @@ use crate::{ extension::{ election::{ElectionResult, RoundElection}, units::Units, - ExtenderUnit, }, - Hasher, Round, + units::UnitWithParents, + Round, }; -pub struct Extender { - election: Option>, - units: Units, +pub struct Extender { + election: Option>, + units: Units, round: Round, } -impl Extender { +impl Extender { /// Create a new extender with no units. pub fn new() -> Self { Extender { @@ -23,7 +23,7 @@ impl Extender { } } - fn handle_election_result(&mut self, result: ElectionResult) -> Option> { + fn handle_election_result(&mut self, result: ElectionResult) -> Option> { use ElectionResult::*; match result { // Wait for more voters for this election. @@ -34,14 +34,14 @@ impl Extender { // Advance to the next round and return the ordered batch. Elected(head) => { self.round += 1; - Some(self.units.remove_batch(head)) + Some(self.units.remove_batch(&head)) } } } /// Add a unit to the extender. Might return several batches of ordered units as a result. - pub fn add_unit(&mut self, u: ExtenderUnit) -> Vec> { - let hash = u.hash; + pub fn add_unit(&mut self, u: U) -> Vec> { + let hash = u.hash(); self.units.add_unit(u); let unit = self.units.get(&hash).expect("just added"); let mut result = Vec::new(); @@ -71,25 +71,22 @@ impl Extender { #[cfg(test)] mod test { use crate::{ - extension::{ - extender::Extender, - tests::{construct_unit, construct_unit_all_parents}, - }, - NodeCount, NodeIndex, Round, + extension::extender::Extender, units::random_full_parent_reconstrusted_units_up_to, + NodeCount, Round, }; - use std::iter; #[test] fn easy_elections() { let mut extender = Extender::new(); let n_members = NodeCount(4); let max_round: Round = 43; + let session_id = 2137; let mut batches = Vec::new(); - for round in 0..=max_round { - for creator in n_members.into_iterator() { - batches.append( - &mut extender.add_unit(construct_unit_all_parents(creator, round, n_members)), - ); + for round_units in + random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) + { + for unit in round_units { + batches.append(&mut extender.add_unit(unit)); } } assert_eq!(batches.len(), (max_round - 3).into()); @@ -98,43 +95,4 @@ mod test { assert_eq!(batch.len(), n_members.0); } } - - // TODO(A0-1047): Rewrite this once we order all the data, even unpopular. - #[test] - fn ignores_sufficiently_unpopular() { - let mut extender = Extender::new(); - let n_members = NodeCount(4); - let max_round: Round = 43; - let active_nodes: Vec<_> = n_members.into_iterator().skip(1).collect(); - let mut batches = Vec::new(); - for round in 0..=max_round { - for creator in n_members.into_iterator() { - let parent_coords = match round.checked_sub(1) { - None => Vec::new(), - Some(parent_round) => match creator { - NodeIndex(0) => n_members - .into_iterator() - .zip(iter::repeat(parent_round)) - .collect(), - _ => active_nodes - .iter() - .cloned() - .zip(iter::repeat(parent_round)) - .collect(), - }, - }; - batches.append(&mut extender.add_unit(construct_unit( - creator, - round, - parent_coords, - n_members, - ))); - } - } - assert_eq!(batches.len(), (max_round - 3).into()); - assert_eq!(batches[0].len(), 1); - for batch in batches.iter().skip(1) { - assert_eq!(batch.len(), n_members.0 - 1); - } - } } diff --git a/consensus/src/extension/mod.rs b/consensus/src/extension/mod.rs index 1102d3d8..f205ec96 100644 --- a/consensus/src/extension/mod.rs +++ b/consensus/src/extension/mod.rs @@ -1,8 +1,8 @@ -use futures::{FutureExt, StreamExt}; - -use log::{debug, warn}; - -use crate::{Hasher, NodeIndex, NodeMap, Receiver, Round, Sender, Terminator}; +use crate::{ + dag::DagUnit, + units::{Unit, WrappedUnit}, + Data, FinalizationHandler, Hasher, MultiKeychain, +}; mod election; mod extender; @@ -10,180 +10,43 @@ mod units; use extender::Extender; -const LOG_TARGET: &str = "AlephBFT-extender"; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ExtenderUnit { - creator: NodeIndex, - round: Round, - parents: NodeMap, - hash: H::Hash, -} - -impl ExtenderUnit { - pub fn new(creator: NodeIndex, round: Round, hash: H::Hash, parents: NodeMap) -> Self { - ExtenderUnit { - creator, - round, - hash, - parents, - } - } -} - -/// A process responsible for executing the Consensus protocol on a local copy of the Dag. -/// It receives units via a channel `electors` which are guaranteed to be eventually in the Dags +/// A struct responsible for executing the Consensus protocol on a local copy of the Dag. +/// It receives units which are guaranteed to be eventually in the Dags /// of all honest nodes. The static Aleph Consensus algorithm is then run on this Dag in order /// to finalize subsequent rounds of the Dag. More specifically whenever a new unit is received /// this process checks whether a new round can be finalized and if so, it computes the batch of -/// hashes of units that should be finalized, and pushes such a batch to a channel via the -/// finalizer_tx endpoint. +/// units that should be finalized, and uses the finalization handler to report that to the user. /// /// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html /// Section 5.4 for a discussion of this component. - -pub struct Service { - node_id: NodeIndex, - extender: Extender, - electors: Receiver>, - finalizer_tx: Sender>, +pub struct Ordering> { + extender: Extender>, + finalization_handler: FH, } -impl Service { - pub fn new( - node_id: NodeIndex, - electors: Receiver>, - finalizer_tx: Sender>, - ) -> Self { +impl> Ordering { + pub fn new(finalization_handler: FH) -> Self { let extender = Extender::new(); - Service { - node_id, + Ordering { extender, - electors, - finalizer_tx, - } - } - - pub async fn run(mut self, mut terminator: Terminator) { - let mut exiting = false; - let mut round = 0; - loop { - futures::select! { - v = self.electors.next() => match v { - Some(u) => { - debug!(target: LOG_TARGET, "{:?} New unit in Extender round {:?} creator {:?} hash {:?}.", self.node_id, u.round, u.creator, u.hash); - for batch in self.extender.add_unit(u) { - let head = *batch.last().expect("all batches are nonempty"); - if self.finalizer_tx.unbounded_send(batch).is_err() { - warn!(target: LOG_TARGET, "{:?} Channel for batches should be open", self.node_id); - exiting = true; - } - debug!(target: LOG_TARGET, "{:?} Finalized round {:?} with head {:?}.", self.node_id, round, head); - round += 1; - } - }, - None => { - warn!(target: LOG_TARGET, "{:?} Units for extender unexpectedly ended.", self.node_id); - exiting = true; - } - }, - _ = terminator.get_exit().fuse() => { - debug!(target: LOG_TARGET, "{:?} received exit signal.", self.node_id); - exiting = true; - } - } - if exiting { - debug!(target: LOG_TARGET, "{:?} Extender decided to exit.", self.node_id); - terminator.terminate_sync().await; - break; - } + finalization_handler, } } -} - -#[cfg(test)] -mod tests { - use crate::{ - extension::{ExtenderUnit, Service}, - NodeCount, NodeIndex, NodeMap, Round, Terminator, - }; - use aleph_bft_mock::Hasher64; - use futures::{ - channel::{mpsc, oneshot}, - StreamExt, - }; - use std::iter; - - fn coord_to_number(creator: NodeIndex, round: Round, n_members: NodeCount) -> u64 { - (round as usize * n_members.0 + creator.0) as u64 - } - pub fn construct_unit( - creator: NodeIndex, - round: Round, - parent_coords: Vec<(NodeIndex, Round)>, - n_members: NodeCount, - ) -> ExtenderUnit { - assert!(round > 0 || parent_coords.is_empty()); - let mut parents = NodeMap::with_size(n_members); - for (creator, round) in parent_coords { - parents.insert( - creator, - coord_to_number(creator, round, n_members).to_ne_bytes(), - ); + fn handle_batch(&mut self, batch: Vec>) { + for unit in batch { + let unit = unit.unpack(); + self.finalization_handler.unit_finalized( + unit.creator(), + unit.round(), + unit.as_signable().data().clone(), + ) } - ExtenderUnit::new( - creator, - round, - coord_to_number(creator, round, n_members).to_ne_bytes(), - parents, - ) } - pub fn construct_unit_all_parents( - creator: NodeIndex, - round: Round, - n_members: NodeCount, - ) -> ExtenderUnit { - let parent_coords = match round.checked_sub(1) { - None => Vec::new(), - Some(parent_round) => n_members - .into_iterator() - .zip(iter::repeat(parent_round)) - .collect(), - }; - - construct_unit(creator, round, parent_coords, n_members) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 3)] - async fn finalize_rounds_01() { - let n_members = NodeCount(4); - let rounds = 6; - let (batch_tx, mut batch_rx) = mpsc::unbounded(); - let (electors_tx, electors_rx) = mpsc::unbounded(); - let extender = Service::::new(0.into(), electors_rx, batch_tx); - let (exit_tx, exit_rx) = oneshot::channel(); - let extender_handle = tokio::spawn(async move { - extender - .run(Terminator::create_root(exit_rx, "AlephBFT-extender")) - .await - }); - - for round in 0..rounds { - for creator in n_members.into_iterator() { - let unit = construct_unit_all_parents(creator, round, n_members); - electors_tx - .unbounded_send(unit) - .expect("Channel should be open"); - } + pub fn add_unit(&mut self, unit: DagUnit) { + for batch in self.extender.add_unit(unit) { + self.handle_batch(batch); } - let batch_round_0 = batch_rx.next().await.unwrap(); - assert!(!batch_round_0.is_empty()); - - let batch_round_1 = batch_rx.next().await.unwrap(); - assert!(!batch_round_1.is_empty()); - let _ = exit_tx.send(()); - let _ = extender_handle.await; } } diff --git a/consensus/src/extension/units.rs b/consensus/src/extension/units.rs index 0d2184b5..4ede12ab 100644 --- a/consensus/src/extension/units.rs +++ b/consensus/src/extension/units.rs @@ -1,15 +1,18 @@ use std::collections::{HashMap, VecDeque}; -use crate::{extension::ExtenderUnit, Hasher, Round}; +use crate::{ + units::{HashFor, UnitWithParents}, + Round, +}; /// Units kept in a way optimized for easy batch extraction. -pub struct Units { - units: HashMap>, - by_round: HashMap>, +pub struct Units { + units: HashMap, U>, + by_round: HashMap>>, highest_round: Round, } -impl Units { +impl Units { /// Create empty unit store. pub fn new() -> Self { Units { @@ -20,24 +23,24 @@ impl Units { } /// Add a unit to the store. - pub fn add_unit(&mut self, u: ExtenderUnit) { - let round = u.round; + pub fn add_unit(&mut self, u: U) { + let round = u.round(); if round > self.highest_round { self.highest_round = round; } - self.by_round.entry(round).or_default().push(u.hash); - self.units.insert(u.hash, u); + self.by_round.entry(round).or_default().push(u.hash()); + self.units.insert(u.hash(), u); } - pub fn get(&self, hash: &H::Hash) -> Option<&ExtenderUnit> { + pub fn get(&self, hash: &HashFor) -> Option<&U> { self.units.get(hash) } /// Get the list of unit hashes from the given round. /// Panics if called for a round greater or equal to the round /// of the highest head of a removed batch. - pub fn in_round(&self, round: Round) -> Option>> { + pub fn in_round(&self, round: Round) -> Option> { self.by_round.get(&round).map(|hashes| { hashes .iter() @@ -52,21 +55,21 @@ impl Units { } /// Remove a batch of units, deterministically ordered based on the given head. - pub fn remove_batch(&mut self, head: H::Hash) -> Vec { + pub fn remove_batch(&mut self, head: &HashFor) -> Vec { let mut batch = Vec::new(); let mut queue = VecDeque::new(); queue.push_back( self.units - .remove(&head) + .remove(head) .expect("head is picked among units we have"), ); while let Some(u) = queue.pop_front() { - batch.push(u.hash); - for u_hash in u.parents.into_values() { + for u_hash in u.parents().clone().into_values() { if let Some(v) = self.units.remove(&u_hash) { queue.push_back(v); } } + batch.push(u); } // Since we construct the batch using BFS, the ordering is canonical and respects the DAG partial order. @@ -79,14 +82,14 @@ impl Units { #[cfg(test)] mod test { use crate::{ - extension::{tests::construct_unit_all_parents, units::Units}, - NodeCount, NodeIndex, + extension::units::Units, + units::{random_full_parent_reconstrusted_units_up_to, TestingDagUnit, Unit}, + NodeCount, }; - use aleph_bft_mock::Hasher64; #[test] fn initially_empty() { - let units = Units::::new(); + let units = Units::::new(); assert!(units.in_round(0).is_none()); assert_eq!(units.highest_round(), 0); } @@ -95,11 +98,12 @@ mod test { fn accepts_unit() { let mut units = Units::new(); let n_members = NodeCount(4); - let unit = construct_unit_all_parents(NodeIndex(0), 0, n_members); + let session_id = 2137; + let unit = &random_full_parent_reconstrusted_units_up_to(0, n_members, session_id)[0][0]; units.add_unit(unit.clone()); assert_eq!(units.highest_round(), 0); - assert_eq!(units.in_round(0), Some(vec![&unit])); - assert_eq!(units.get(&unit.hash), Some(&unit)); + assert_eq!(units.in_round(0), Some(vec![unit])); + assert_eq!(units.get(&unit.hash()), Some(unit)); } #[test] @@ -107,20 +111,22 @@ mod test { let mut units = Units::new(); let n_members = NodeCount(4); let max_round = 43; + let session_id = 2137; let mut heads = Vec::new(); - for round in 0..=max_round { - for creator in n_members.into_iterator() { - let unit = construct_unit_all_parents(creator, round, n_members); - if round as usize % n_members.0 == creator.0 { - heads.push(unit.hash) - } + for (round, round_units) in + random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) + .into_iter() + .enumerate() + { + heads.push(round_units[round % n_members.0].clone()); + for unit in round_units { units.add_unit(unit); } } assert_eq!(units.highest_round(), max_round); assert_eq!(units.in_round(max_round + 1), None); for head in heads { - let mut batch = units.remove_batch(head); + let mut batch = units.remove_batch(&head.hash()); assert_eq!(batch.pop(), Some(head)); } } @@ -131,24 +137,24 @@ mod test { let mut units_but_backwards = Units::new(); let n_members = NodeCount(4); let max_round = 43; + let session_id = 2137; let mut heads = Vec::new(); - for round in 0..=max_round { - let mut round_units = Vec::new(); - for creator in n_members.into_iterator() { - let unit = construct_unit_all_parents(creator, round, n_members); - if round as usize % n_members.0 == creator.0 { - heads.push(unit.hash) - } - round_units.push(unit.clone()); - units.add_unit(unit); + for (round, round_units) in + random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) + .into_iter() + .enumerate() + { + heads.push(round_units[round % n_members.0].clone()); + for unit in &round_units { + units.add_unit(unit.clone()); } for unit in round_units.into_iter().rev() { units_but_backwards.add_unit(unit); } } for head in heads { - let batch1 = units.remove_batch(head); - let batch2 = units_but_backwards.remove_batch(head); + let batch1 = units.remove_batch(&head.hash()); + let batch2 = units_but_backwards.remove_batch(&head.hash()); assert_eq!(batch1, batch2); } } diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index b21d3e2c..49c04660 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -2,12 +2,12 @@ use crate::{ alerts::{Alert, ForkingNotification, NetworkMessage}, creation, dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, - extension::{ExtenderUnit, Service as Extender}, + extension::Ordering, handle_task_termination, member::UnitMessage, units::{ - SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, Validator, - WrappedUnit, + SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, + UnitWithParents, Validator, WrappedUnit, }, Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain, NodeIndex, Receiver, Round, Sender, Signature, Signed, SpawnHandle, Terminator, @@ -112,6 +112,7 @@ where store: UnitStore>, keychain: MK, dag: Dag, + ordering: Ordering, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, unit_messages_from_network: Receiver>, @@ -119,10 +120,7 @@ where responses_for_collection: Sender>, resolved_requests: Sender>, parents_for_creator: Sender>, - ordered_batch_rx: Receiver>, - finalization_handler: FH, backup_units_for_saver: Sender>, - units_for_extender: Sender>, backup_units_from_saver: Receiver>, new_units_from_creation: Receiver>, exiting: bool, @@ -211,7 +209,6 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> { struct RunwayConfig, MK: MultiKeychain> { finalization_handler: FH, backup_units_for_saver: Sender>, - units_for_extender: Sender>, backup_units_from_saver: Receiver>, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, @@ -219,7 +216,6 @@ struct RunwayConfig, MK: MultiKey unit_messages_for_network: Sender>, responses_for_collection: Sender>, parents_for_creator: Sender>, - ordered_batch_rx: Receiver>, resolved_requests: Sender>, new_units_from_creation: Receiver>, } @@ -236,7 +232,6 @@ where let RunwayConfig { finalization_handler, backup_units_for_saver, - units_for_extender, backup_units_from_saver, alerts_for_alerter, notifications_from_alerter, @@ -244,17 +239,18 @@ where unit_messages_for_network, responses_for_collection, parents_for_creator, - ordered_batch_rx, resolved_requests, new_units_from_creation, } = config; let store = UnitStore::new(n_members); let dag = Dag::new(validator); + let ordering = Ordering::new(finalization_handler); Runway { store, keychain, dag, + ordering, missing_coords: HashSet::new(), missing_parents: HashSet::new(), resolved_requests, @@ -263,10 +259,7 @@ where unit_messages_from_network, unit_messages_for_network, parents_for_creator, - ordered_batch_rx, - finalization_handler, backup_units_for_saver, - units_for_extender, backup_units_from_saver, responses_for_collection, new_units_from_creation, @@ -469,14 +462,7 @@ where self.dag.finished_processing(&unit_hash); self.resolve_missing_parents(&unit_hash); self.resolve_missing_coord(&unit.coord()); - if self - .units_for_extender - .unbounded_send(unit.extender_unit()) - .is_err() - { - error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to extender: {:?}.", self.index(), unit_hash); - self.exiting = true; - } + self.ordering.add_unit(unit.clone()); if self .parents_for_creator .unbounded_send(unit.clone()) @@ -513,23 +499,6 @@ where } } - fn on_ordered_batch(&mut self, batch: Vec) { - for hash in batch { - let unit = self - .store - .unit(&hash) - .expect("Ordered units must be in store") - .clone() - .unpack(); - - self.finalization_handler.unit_finalized( - unit.creator(), - unit.round(), - unit.as_signable().data().clone(), - ) - } - } - fn send_message_for_network( &mut self, notification: RunwayNotificationOut, @@ -625,14 +594,6 @@ where } }, - batch = self.ordered_batch_rx.next() => match batch { - Some(batch) => self.on_ordered_batch(batch), - None => { - error!(target: "AlephBFT-runway", "{:?} Ordered batch stream closed.", index); - break; - } - }, - _ = &mut status_ticker => { self.status_report(); status_ticker = Delay::new(status_ticker_delay).fuse(); @@ -767,7 +728,6 @@ pub(crate) async fn run( _phantom: _, } = runway_io; - let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded(); let (new_units_for_runway, new_units_from_creation) = mpsc::unbounded(); let (parents_for_creator, parents_from_runway) = mpsc::unbounded(); @@ -888,28 +848,17 @@ pub(crate) async fn run( }; pin_mut!(starting_round_handle); - let (units_for_extender, units_from_runway) = mpsc::unbounded(); - let extender = Extender::::new(index, units_from_runway, ordered_batch_tx); - let extender_terminator = terminator.add_offspring_connection("AlephBFT-extender"); - let mut extender_handle = spawn_handle - .spawn_essential("runway/extender", async move { - extender.run(extender_terminator).await - }) - .fuse(); - let runway_handle = spawn_handle .spawn_essential("runway", { let runway_config = RunwayConfig { finalization_handler, backup_units_for_saver, - units_for_extender, backup_units_from_saver, alerts_for_alerter, notifications_from_alerter, unit_messages_from_network: network_io.unit_messages_from_network, unit_messages_for_network: network_io.unit_messages_for_network, parents_for_creator, - ordered_batch_rx, responses_for_collection, resolved_requests: network_io.resolved_requests, new_units_from_creation, @@ -930,9 +879,6 @@ pub(crate) async fn run( debug!(target: "AlephBFT-runway", "{:?} Runway task terminated early.", index); break; }, - _ = extender_handle => { - debug!(target: "AlephBFT-runway", "{:?} Extender task terminated early.", index); - }, _ = alerter_handle => { debug!(target: "AlephBFT-runway", "{:?} Alerter task terminated early.", index); break; @@ -960,7 +906,6 @@ pub(crate) async fn run( debug!(target: "AlephBFT-runway", "{:?} Ending run.", index); terminator.terminate_sync().await; - handle_task_termination(extender_handle, "AlephBFT-runway", "Extender", index).await; handle_task_termination(creation_handle, "AlephBFT-runway", "Creator", index).await; handle_task_termination(alerter_handle, "AlephBFT-runway", "Alerter", index).await; handle_task_termination(runway_handle, "AlephBFT-runway", "Runway", index).await; diff --git a/consensus/src/testing/dag.rs b/consensus/src/testing/dag.rs index ec5f92ce..dc9921bb 100644 --- a/consensus/src/testing/dag.rs +++ b/consensus/src/testing/dag.rs @@ -4,25 +4,20 @@ use crate::{ Dag as GenericDag, DagResult, ReconstructedUnit as GenericReconstructedUnit, Request as GenericRequest, }, - extension::Service as Extender, + extension::Ordering, units::{ - ControlHash, FullUnit, PreUnit, SignedUnit as GenericSignedUnit, Unit, UnitStore, Validator, + ControlHash, FullUnit, PreUnit, SignedUnit as GenericSignedUnit, Unit, UnitStore, + UnitWithParents as _, Validator, }, - NodeCount, NodeIndex, NodeMap, NodeSubset, Round, Signed, SpawnHandle, Terminator, + FinalizationHandler, NodeCount, NodeIndex, NodeMap, NodeSubset, Round, Signed, }; -use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, Spawner}; -use futures::{ - channel::{mpsc, oneshot}, - stream::StreamExt, - FutureExt, -}; -use futures_timer::Delay; -use log::{debug, trace}; +use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain}; +use log::debug; +use parking_lot::Mutex; use rand::{distributions::Open01, prelude::*}; use std::{ - cmp, collections::{HashMap, HashSet}, - time::Duration, + sync::Arc, }; type Request = GenericRequest; @@ -206,43 +201,41 @@ impl DagFeeder { } } -async fn run_consensus_on_dag( +struct RecordingHandler { + finalized: Arc>>, +} + +impl RecordingHandler { + fn new() -> (Self, Arc>>) { + let finalized = Arc::new(Mutex::new(Vec::new())); + ( + RecordingHandler { + finalized: finalized.clone(), + }, + finalized, + ) + } +} + +impl FinalizationHandler for RecordingHandler { + fn data_finalized(&mut self, data: Data) { + self.finalized.lock().push(data); + } +} + +fn run_consensus_on_dag( units: Vec, forker_units: HashMap>, - deadline_ms: u64, -) -> Vec> { +) -> Vec { let node_id = NodeIndex(0); let feeder = DagFeeder::new(node_id, units, forker_units); - let spawner = Spawner::new(); - let (_extender_exit_tx, extender_exit_rx) = oneshot::channel(); - let (batch_tx, mut batch_rx) = mpsc::unbounded(); - let (units_for_extender, units_from_us) = mpsc::unbounded(); - let extender = Extender::::new(node_id, units_from_us, batch_tx); - spawner.spawn( - "extender", - extender.run(Terminator::create_root( - extender_exit_rx, - "AlephBFT-extender", - )), - ); + let (recording_handler, finalized) = RecordingHandler::new(); + let mut ordering = Ordering::new(recording_handler); for unit in feeder.feed() { - units_for_extender - .unbounded_send(unit.extender_unit()) - .expect("channel should be open"); + ordering.add_unit(unit); } - let mut batches = Vec::new(); - let mut delay_fut = Delay::new(Duration::from_millis(deadline_ms)).fuse(); - loop { - futures::select! { - batch = batch_rx.next() => { - batches.push(batch.unwrap()); - }, - _ = &mut delay_fut => { - break; - } - }; - } - batches + let finalized = finalized.lock().clone(); + finalized } fn generate_random_dag( @@ -361,15 +354,6 @@ fn generate_random_dag( (dag_units, forker_units) } -fn batch_lists_consistent(batches1: &[Vec], batches2: &[Vec]) -> bool { - for i in 0..cmp::min(batches1.len(), batches2.len()) { - if batches1[i] != batches2[i] { - return false; - } - } - true -} - #[tokio::test] async fn ordering_random_dag_consistency_under_permutations() { for seed in 0..4u64 { @@ -377,50 +361,28 @@ async fn ordering_random_dag_consistency_under_permutations() { let n_members = NodeCount(rng.gen_range(1..11)); let height = rng.gen_range(3..11); let (mut units, forker_units) = generate_random_dag(n_members, height, seed); - let batch_on_sorted = run_consensus_on_dag( - units.clone(), - forker_units.clone(), - 80 + (n_members.0 as u64) * 5, - ) - .await; + let finalized_data = run_consensus_on_dag(units.clone(), forker_units.clone()); debug!(target: "dag-test", - "seed {:?} n_members {:?} height {:?} batch_len {:?}", + "seed {:?} n_members {:?} height {:?} data_len {:?}", seed, n_members, height, - batch_on_sorted.len() + finalized_data.len() ); for i in 0..8 { units.shuffle(&mut rng); - let mut batch = run_consensus_on_dag( - units.clone(), - forker_units.clone(), - 25 + (n_members.0 as u64) * 5, - ) - .await; - if batch != batch_on_sorted { - if batch_lists_consistent(&batch, &batch_on_sorted) { - // there might be some timing issue here, we run it with more time - batch = run_consensus_on_dag(units.clone(), forker_units.clone(), 200).await; - } - if batch != batch_on_sorted { - debug!(target: "dag-test", - "seed {:?} n_members {:?} height {:?} i {:?}", - seed, n_members, height, i - ); - debug!(target: "dag-test", - "batch lens {:?} \n {:?}", - batch_on_sorted.len(), - batch.len() - ); - trace!(target: "dag-test", "batches {:?} \n {:?}", batch_on_sorted, batch); - assert!(batch == batch_on_sorted); - } else { - debug!( - "False alarm at seed {:?} n_members {:?} height {:?}!", - seed, n_members, height - ); - } + let other_finalized_data = run_consensus_on_dag(units.clone(), forker_units.clone()); + if other_finalized_data != finalized_data { + debug!(target: "dag-test", + "seed {:?} n_members {:?} height {:?} i {:?}", + seed, n_members, height, i + ); + debug!(target: "dag-test", + "batch lens {:?} \n {:?}", + finalized_data.len(), + other_finalized_data.len() + ); + assert_eq!(other_finalized_data, finalized_data); } } } diff --git a/consensus/src/units/mod.rs b/consensus/src/units/mod.rs index 218d2248..bfc0fbb3 100644 --- a/consensus/src/units/mod.rs +++ b/consensus/src/units/mod.rs @@ -16,9 +16,10 @@ pub(crate) use store::*; #[cfg(test)] pub use testing::{ create_preunits, creator_set, full_unit_to_unchecked_signed_unit, preunit_to_full_unit, - preunit_to_signed_unit, preunit_to_unchecked_signed_unit, random_full_parent_units_up_to, - random_unit_with_parents, FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit, - WrappedSignedUnit, + preunit_to_signed_unit, preunit_to_unchecked_signed_unit, + random_full_parent_reconstrusted_units_up_to, random_full_parent_units_up_to, + random_reconstructed_unit_with_parents, random_unit_with_parents, DagUnit as TestingDagUnit, + FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit, WrappedSignedUnit, }; pub use validator::{ValidationError, Validator}; @@ -160,9 +161,6 @@ impl FullUnit { pub(crate) fn included_data(&self) -> Vec { self.data.iter().cloned().collect() } - pub(crate) fn session_id(&self) -> SessionId { - self.session_id - } } impl Signable for FullUnit { @@ -192,6 +190,8 @@ pub trait Unit: 'static + Send + Clone { fn control_hash(&self) -> &ControlHash; + fn session_id(&self) -> SessionId; + fn creator(&self) -> NodeIndex { self.coord().creator() } @@ -207,6 +207,10 @@ pub trait WrappedUnit: Unit { fn unpack(self) -> Self::Wrapped; } +pub trait UnitWithParents: Unit { + fn parents(&self) -> &NodeMap>; +} + impl Unit for FullUnit { type Hasher = H; @@ -229,6 +233,10 @@ impl Unit for FullUnit { fn control_hash(&self) -> &ControlHash { self.pre_unit.control_hash() } + + fn session_id(&self) -> SessionId { + self.session_id + } } impl Unit for SignedUnit { @@ -245,6 +253,10 @@ impl Unit for SignedUnit { fn control_hash(&self) -> &ControlHash { self.as_signable().control_hash() } + + fn session_id(&self) -> SessionId { + self.as_signable().session_id() + } } pub type HashFor = <::Hasher as Hasher>::Hash; diff --git a/consensus/src/units/testing.rs b/consensus/src/units/testing.rs index 3aafe119..19bfa8cb 100644 --- a/consensus/src/units/testing.rs +++ b/consensus/src/units/testing.rs @@ -1,5 +1,6 @@ use crate::{ creation::Creator as GenericCreator, + dag::ReconstructedUnit, units::{ ControlHash as GenericControlHash, FullUnit as GenericFullUnit, PreUnit as GenericPreUnit, SignedUnit as GenericSignedUnit, UncheckedSignedUnit as GenericUncheckedSignedUnit, Unit, @@ -15,6 +16,7 @@ type PreUnit = GenericPreUnit; pub type FullUnit = GenericFullUnit; type UncheckedSignedUnit = GenericUncheckedSignedUnit; pub type SignedUnit = GenericSignedUnit; +pub type DagUnit = ReconstructedUnit; #[derive(Clone)] pub struct WrappedSignedUnit(pub SignedUnit); @@ -33,6 +35,10 @@ impl Unit for WrappedSignedUnit { fn control_hash(&self) -> &ControlHash { self.0.control_hash() } + + fn session_id(&self) -> SessionId { + self.0.session_id() + } } impl WrappedUnit for WrappedSignedUnit { @@ -105,17 +111,27 @@ fn initial_preunit(n_members: NodeCount, node_id: NodeIndex) -> PreUnit { ) } -fn random_initial_units(n_members: NodeCount, session_id: SessionId) -> Vec { +fn random_initial_reconstructed_units(n_members: NodeCount, session_id: SessionId) -> Vec { n_members .into_iterator() .map(|node_id| initial_preunit(n_members, node_id)) - .map(|preunit| preunit_to_full_unit(preunit, session_id)) + .map(|preunit| ReconstructedUnit::initial(preunit_to_full_unit(preunit, session_id))) + .collect() +} + +fn random_initial_units(n_members: NodeCount, session_id: SessionId) -> Vec { + random_initial_reconstructed_units(n_members, session_id) + .into_iter() + .map(|unit| unit.unpack()) .collect() } -pub fn random_unit_with_parents(creator: NodeIndex, parents: &Vec) -> FullUnit { +pub fn random_reconstructed_unit_with_parents>( + creator: NodeIndex, + parents: &Vec, +) -> DagUnit { let representative_parent = parents.last().expect("there are parents"); - let n_members = representative_parent.as_pre_unit().n_members(); + let n_members = representative_parent.control_hash().n_members(); let session_id = representative_parent.session_id(); let round = representative_parent.round() + 1; let mut parent_map = NodeMap::with_size(n_members); @@ -123,7 +139,18 @@ pub fn random_unit_with_parents(creator: NodeIndex, parents: &Vec) -> parent_map.insert(parent.creator(), parent.hash()); } let control_hash = ControlHash::new(&parent_map); - preunit_to_full_unit(PreUnit::new(creator, round, control_hash), session_id) + ReconstructedUnit::with_parents( + preunit_to_full_unit(PreUnit::new(creator, round, control_hash), session_id), + parent_map, + ) + .expect("correct parents") +} + +pub fn random_unit_with_parents>( + creator: NodeIndex, + parents: &Vec, +) -> FullUnit { + random_reconstructed_unit_with_parents(creator, parents).unpack() } pub fn random_full_parent_units_up_to( @@ -143,3 +170,24 @@ pub fn random_full_parent_units_up_to( } result } + +pub fn random_full_parent_reconstrusted_units_up_to( + round: Round, + n_members: NodeCount, + session_id: SessionId, +) -> Vec> { + let mut result = vec![random_initial_reconstructed_units(n_members, session_id)]; + for _ in 0..round { + let units = n_members + .into_iterator() + .map(|node_id| { + random_reconstructed_unit_with_parents( + node_id, + result.last().expect("previous round present"), + ) + }) + .collect(); + result.push(units); + } + result +} From 5b1906e1d3c7afabd6cc690b0e687767024c25f3 Mon Sep 17 00:00:00 2001 From: timorleph Date: Mon, 15 Apr 2024 16:39:05 +0200 Subject: [PATCH 2/2] Compute order extension last Probably doesn't actually impact sending delays, but it also doesn't hurt. --- consensus/src/extension/mod.rs | 2 +- consensus/src/runway/mod.rs | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/consensus/src/extension/mod.rs b/consensus/src/extension/mod.rs index f205ec96..708a0df5 100644 --- a/consensus/src/extension/mod.rs +++ b/consensus/src/extension/mod.rs @@ -11,7 +11,7 @@ mod units; use extender::Extender; /// A struct responsible for executing the Consensus protocol on a local copy of the Dag. -/// It receives units which are guaranteed to be eventually in the Dags +/// It receives units which are guaranteed to eventually appear in the Dags /// of all honest nodes. The static Aleph Consensus algorithm is then run on this Dag in order /// to finalize subsequent rounds of the Dag. More specifically whenever a new unit is received /// this process checks whether a new round can be finalized and if so, it computes the batch of diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 49c04660..7dcf0243 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -462,7 +462,6 @@ where self.dag.finished_processing(&unit_hash); self.resolve_missing_parents(&unit_hash); self.resolve_missing_coord(&unit.coord()); - self.ordering.add_unit(unit.clone()); if self .parents_for_creator .unbounded_send(unit.clone()) @@ -471,13 +470,16 @@ where warn!(target: "AlephBFT-runway", "Creator channel should be open."); self.exiting = true; } - let unit = unit.unpack(); - self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone().into())); - - if unit.as_signable().creator() == self.index() { - trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.as_signable().hash()); - self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit.into())); + let unpacked_unit = unit.clone().unpack(); + self.send_message_for_network(RunwayNotificationOut::NewAnyUnit( + unpacked_unit.clone().into(), + )); + + if unit.creator() == self.index() { + trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.hash()); + self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unpacked_unit.into())); } + self.ordering.add_unit(unit.clone()); } fn on_missing_coord(&mut self, coord: UnitCoord) {