diff --git a/Cargo.toml b/Cargo.toml index 0b691eff..cb74a0a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["primary", "node", "store", "crypto", "worker", "consensus", "network", "config", "hotstuff"] +members = ["primary", "node", "store", "crypto", "worker", "consensus", "network", "config"] diff --git a/README.md b/README.md index f4f83b29..b26cd810 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,60 @@ -# Narwhal HotStuff +# Narwhal and Tusk +[![build status](https://img.shields.io/github/workflow/status/facebookresearch/narwhal/Rust/master?style=flat-square&logo=github)](https://github.com/facebookresearch/narwhal/actions) [![rustc](https://img.shields.io/badge/rustc-1.51+-blue?style=flat-square&logo=rust)](https://www.rust-lang.org) [![license](https://img.shields.io/badge/license-Apache-blue.svg?style=flat-square)](LICENSE) -The code in this branch is a prototype of Narwhal HotStuff (Hotstuff-over-Narwhal). It supplements the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf) enabling reproducible results. There are no plans to maintain this branch. The [master branch](https://github.com/asonnino/narwhal) contains the most recent and polished version of this codebase. +This repo provides an implementation of [Narwhal and Tusk](https://arxiv.org/pdf/2105.11827.pdf). The codebase has been designed to be small, efficient, and easy to benchmark and modify. It has not been designed to run in production but uses real cryptography ([dalek](https://doc.dalek.rs/ed25519_dalek)), networking ([tokio](https://docs.rs/tokio)), and storage ([rocksdb](https://docs.rs/rocksdb)). + +## Quick Start +The core protocols are written in Rust, but all benchmarking scripts are written in Python and run with [Fabric](http://www.fabfile.org/). +To deploy and benchmark a testbed of 4 nodes on your local machine, clone the repo and install the python dependencies: +``` +$ git clone https://github.com/facebookresearch/narwhal.git +$ cd narwhal/benchmark +$ pip install -r requirements.txt +``` +You also need to install Clang (required by rocksdb) and [tmux](https://linuxize.com/post/getting-started-with-tmux/#installing-tmux) (which runs all nodes and clients in the background). Finally, run a local benchmark using fabric: +``` +$ fab local +``` +This command may take a long time the first time you run it (compiling rust code in `release` mode may be slow) and you can customize a number of benchmark parameters in `fabfile.py`. When the benchmark terminates, it displays a summary of the execution similarly to the one below. +``` +----------------------------------------- + SUMMARY: +----------------------------------------- + + CONFIG: + Faults: 0 node(s) + Committee size: 4 node(s) + Worker(s) per node: 1 worker(s) + Collocate primary and workers: True + Input rate: 50,000 tx/s + Transaction size: 512 B + Execution time: 19 s + + Header size: 1,000 B + Max header delay: 100 ms + GC depth: 50 round(s) + Sync retry delay: 10,000 ms + Sync retry nodes: 3 node(s) + batch size: 500,000 B + Max batch delay: 100 ms + + + RESULTS: + Consensus TPS: 46,478 tx/s + Consensus BPS: 23,796,531 B/s + Consensus latency: 464 ms + + End-to-end TPS: 46,149 tx/s + End-to-end BPS: 23,628,541 B/s + End-to-end latency: 557 ms +----------------------------------------- +``` + +## Next Steps +The next step is to read the paper [Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus](https://arxiv.org/pdf/2105.11827.pdf). It is then recommended to have a look at the README files of the [worker](https://github.com/facebookresearch/narwhal/tree/master/worker) and [primary](https://github.com/facebookresearch/narwhal/tree/master/primary) crates. An additional resource to better understand the Tusk consensus protocol is the paper [All You Need is DAG](https://arxiv.org/abs/2102.08325) as it describes a similar protocol. + +The README file of the [benchmark folder](https://github.com/facebookresearch/narwhal/tree/master/benchmark) explains how to benchmark the codebase and read benchmarks' results. It also provides a step-by-step tutorial to run benchmarks on [Amazon Web Services (AWS)](https://aws.amazon.com) accross multiple data centers (WAN). ## License This software is licensed as [Apache 2.0](LICENSE). diff --git a/benchmark/benchmark/config.py b/benchmark/benchmark/config.py index bdd03d33..eb39bb88 100644 --- a/benchmark/benchmark/config.py +++ b/benchmark/benchmark/config.py @@ -25,9 +25,6 @@ class Committee: "authorities: { "name": { "stake": 1, - "consensus: { - "consensus_to_consensus": x.x.x.x:x, - }, "primary: { "primary_to_primary": x.x.x.x:x, "worker_to_primary": x.x.x.x:x, @@ -67,11 +64,6 @@ def __init__(self, addresses, base_port): self.json = {'authorities': OrderedDict()} for name, hosts in addresses.items(): host = hosts.pop(0) - consensus_addr = { - 'consensus_to_consensus': f'{host}:{port}', - } - port += 1 - primary_addr = { 'primary_to_primary': f'{host}:{port}', 'worker_to_primary': f'{host}:{port + 1}' @@ -89,7 +81,6 @@ def __init__(self, addresses, base_port): self.json['authorities'][name] = { 'stake': 1, - 'consensus': consensus_addr, 'primary': primary_addr, 'workers': workers_addr } @@ -124,9 +115,6 @@ def ips(self, name=None): ips = set() for name in names: - addresses = self.json['authorities'][name]['consensus'] - ips.add(self.ip(addresses['consensus_to_consensus'])) - addresses = self.json['authorities'][name]['primary'] ips.add(self.ip(addresses['primary_to_primary'])) ips.add(self.ip(addresses['worker_to_primary'])) @@ -169,7 +157,7 @@ def __init__(self, names, port, workers): assert all(isinstance(x, str) for x in names) assert isinstance(port, int) assert isinstance(workers, int) and workers > 0 - addresses = OrderedDict((x, ['127.0.0.1']*(2+workers)) for x in names) + addresses = OrderedDict((x, ['127.0.0.1']*(1+workers)) for x in names) super().__init__(addresses, port) @@ -177,7 +165,6 @@ class NodeParameters: def __init__(self, json): inputs = [] try: - inputs += [json['timeout_delay']] inputs += [json['header_size']] inputs += [json['max_header_delay']] inputs += [json['gc_depth']] @@ -216,6 +203,7 @@ def __init__(self, json): raise ConfigError('Missing input rate') self.rate = [int(x) for x in rate] + self.workers = int(json['workers']) if 'collocate' in json: @@ -224,7 +212,7 @@ def __init__(self, json): self.collocate = True self.tx_size = int(json['tx_size']) - + self.duration = int(json['duration']) self.runs = int(json['runs']) if 'runs' in json else 1 diff --git a/benchmark/benchmark/logs.py b/benchmark/benchmark/logs.py index c3b0671c..47867ad3 100644 --- a/benchmark/benchmark/logs.py +++ b/benchmark/benchmark/logs.py @@ -23,7 +23,7 @@ def __init__(self, clients, primaries, workers, faults=0): self.faults = faults if isinstance(faults, int): self.committee_size = len(primaries) + int(faults) - self.workers = len(workers) // len(primaries) + self.workers = len(workers) // len(primaries) else: self.committee_size = '?' self.workers = '?' @@ -107,9 +107,6 @@ def _parse_primaries(self, log): commits = self._merge_results([tmp]) configs = { - 'timeout_delay': int( - search(r'Timeout delay .* (\d+)', log).group(1) - ), 'header_size': int( search(r'Header size .* (\d+)', log).group(1) ), @@ -134,7 +131,7 @@ def _parse_primaries(self, log): } ip = search(r'booted on (\d+.\d+.\d+.\d+)', log).group(1) - + return proposals, commits, configs, ip def _parse_workers(self, log): @@ -191,7 +188,6 @@ def _end_to_end_latency(self): return mean(latency) if latency else 0 def result(self): - timeout_delay = self.configs[0]['timeout_delay'] header_size = self.configs[0]['header_size'] max_header_delay = self.configs[0]['max_header_delay'] gc_depth = self.configs[0]['gc_depth'] @@ -219,7 +215,6 @@ def result(self): f' Transaction size: {self.size[0]:,} B\n' f' Execution time: {round(duration):,} s\n' '\n' - f' Timeout delay: {timeout_delay:,} ms\n' f' Header size: {header_size:,} B\n' f' Max header delay: {max_header_delay:,} ms\n' f' GC depth: {gc_depth:,} round(s)\n' diff --git a/benchmark/benchmark/plot.py b/benchmark/benchmark/plot.py index 6f219ff9..3d205111 100644 --- a/benchmark/benchmark/plot.py +++ b/benchmark/benchmark/plot.py @@ -7,7 +7,7 @@ from itertools import cycle from benchmark.utils import PathMaker -from benchmark.config import PlotParameters, ConfigError +from benchmark.config import PlotParameters from benchmark.aggregate import LogAggregator @@ -162,8 +162,8 @@ def plot_tps(cls, files, scalability): def plot(cls, params_dict): try: params = PlotParameters(params_dict) - except ConfigError as e: - raise PlotError(e) + except PlotError as e: + raise PlotError('Invalid nodes or bench parameters', e) # Aggregate the logs. LogAggregator(params.max_latency).print() diff --git a/benchmark/fabfile.py b/benchmark/fabfile.py index 8758eea0..b6425f26 100644 --- a/benchmark/fabfile.py +++ b/benchmark/fabfile.py @@ -16,16 +16,15 @@ def local(ctx, debug=True): 'faults': 0, 'nodes': 4, 'workers': 1, - 'rate': 10_000, + 'rate': 50_000, 'tx_size': 512, 'duration': 20, } node_params = { - 'timeout_delay': 500, # ms 'header_size': 1_000, # bytes 'max_header_delay': 200, # ms 'gc_depth': 50, # rounds - 'sync_retry_delay': 5_000, # ms + 'sync_retry_delay': 10_000, # ms 'sync_retry_nodes': 3, # number of nodes 'batch_size': 500_000, # bytes 'max_batch_delay': 200 # ms @@ -38,7 +37,7 @@ def local(ctx, debug=True): @task -def create(ctx, nodes=6): +def create(ctx, nodes=2): ''' Create a testbed''' try: InstanceManager.make().create_instances(nodes) @@ -92,24 +91,23 @@ def install(ctx): @task -def remote(ctx, debug=True): +def remote(ctx, debug=False): ''' Run benchmarks on AWS ''' bench_params = { - 'faults': 0, - 'nodes': [4], - 'workers': 4, - 'collocate': False, - 'rate': [50_000], + 'faults': 3, + 'nodes': [10], + 'workers': 1, + 'collocate': True, + 'rate': [10_000, 110_000], 'tx_size': 512, 'duration': 300, - 'runs': 1, + 'runs': 2, } node_params = { - 'timeout_delay': 5_000, # ms 'header_size': 1_000, # bytes 'max_header_delay': 200, # ms 'gc_depth': 50, # rounds - 'sync_retry_delay': 5_000, # ms + 'sync_retry_delay': 10_000, # ms 'sync_retry_nodes': 3, # number of nodes 'batch_size': 500_000, # bytes 'max_batch_delay': 200 # ms @@ -125,11 +123,11 @@ def plot(ctx): ''' Plot performance using the logs generated by "fab remote" ''' plot_params = { 'faults': [0], - 'nodes': [4], - 'workers': [1, 4, 7, 10], + 'nodes': [10, 20, 50], + 'workers': [1], 'collocate': True, 'tx_size': 512, - 'max_latency': [2_000, 2_500] + 'max_latency': [3_500, 4_500] } try: Ploter.plot(plot_params) diff --git a/benchmark/settings.json b/benchmark/settings.json index a8c248f8..d13629f2 100644 --- a/benchmark/settings.json +++ b/benchmark/settings.json @@ -1,13 +1,13 @@ { "key": { - "name": "aws", - "path": "/Users/asonnino/.ssh/aws" + "name": "aws-fb", + "path": "/Users/asonnino/.ssh/aws-fb.pem" }, "port": 5000, "repo": { "name": "narwhal", - "url": "https://github.com/asonnino/narwhal", - "branch": "narwhal-hs" + "url": "https://github.com/facebookresearch/narwhal", + "branch": "master" }, "instances": { "type": "m5d.8xlarge", diff --git a/config/src/lib.rs b/config/src/lib.rs index a04fc753..b7fed7bb 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -60,8 +60,6 @@ pub type WorkerId = u32; #[derive(Deserialize, Clone)] pub struct Parameters { - /// The timeout delay of the consensus protocol. - pub timeout_delay: u64, /// The preferred header size. The primary creates a new header when it has enough parents and /// enough batches' digests to reach `header_size`. Denominated in bytes. pub header_size: usize, @@ -86,7 +84,6 @@ pub struct Parameters { impl Default for Parameters { fn default() -> Self { Self { - timeout_delay: 5_000, header_size: 1_000, max_header_delay: 100, gc_depth: 50, @@ -102,8 +99,6 @@ impl Import for Parameters {} impl Parameters { pub fn log(&self) { - // NOTE: These log entries are needed to compute performance. - info!("Timeout delay set to {} ms", self.timeout_delay); info!("Header size set to {} B", self.header_size); info!("Max header delay set to {} ms", self.max_header_delay); info!("Garbage collection depth set to {} rounds", self.gc_depth); @@ -114,12 +109,6 @@ impl Parameters { } } -#[derive(Clone, Deserialize)] -pub struct ConsensusAddresses { - /// Address to receive messages from other consensus nodes (WAN). - pub consensus_to_consensus: SocketAddr, -} - #[derive(Clone, Deserialize)] pub struct PrimaryAddresses { /// Address to receive messages from other primaries (WAN). @@ -142,8 +131,6 @@ pub struct WorkerAddresses { pub struct Authority { /// The voting power of this authority. pub stake: Stake, - /// The network addresses of the consensus protocol. - pub consensus: ConsensusAddresses, /// The network addresses of the primary. pub primary: PrimaryAddresses, /// Map of workers' id and their network addresses. @@ -193,23 +180,6 @@ impl Committee { (total_votes + 2) / 3 } - /// Returns the consensus addresses of the target consensus node. - pub fn consensus(&self, to: &PublicKey) -> Result { - self.authorities - .get(to) - .map(|x| x.consensus.clone()) - .ok_or_else(|| ConfigError::NotInCommittee(*to)) - } - - /// Returns the addresses of all consensus nodes except `myself`. - pub fn others_consensus(&self, myself: &PublicKey) -> Vec<(PublicKey, ConsensusAddresses)> { - self.authorities - .iter() - .filter(|(name, _)| name != &myself) - .map(|(name, authority)| (*name, authority.consensus.clone())) - .collect() - } - /// Returns the primary addresses of the target primary. pub fn primary(&self, to: &PublicKey) -> Result { self.authorities diff --git a/hotstuff/Cargo.toml b/hotstuff/Cargo.toml deleted file mode 100644 index f7b81a80..00000000 --- a/hotstuff/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "hotstuff" -version = "0.1.0" -authors = ["Alberto Sonnino "] -edition = "2018" -publish = false - -[dependencies] -thiserror = "1.0.21" -tokio = { version = "1.3.0", features = ["rt", "macros", "sync"] } -ed25519-dalek = "1.0.1" -log = "0.4.0" -serde = { version = "1.0", features = ["derive"] } -bytes = "1.0.1" -bincode = "1.3.1" -futures = "0.3.8" -async-recursion = "0.3.1" -base64 = "0.13.0" -async-trait = "0.1.50" - -store = { path = "../store" } -crypto = { path = "../crypto" } -network = { path = "../network" } -config = { path = "../config" } -primary = { path = "../primary" } - -[dev-dependencies] -tokio-util = { version = "0.6.2", features= ["codec"] } -rand = "0.7.3" - -[features] -benchmark = [] \ No newline at end of file diff --git a/hotstuff/src/aggregator.rs b/hotstuff/src/aggregator.rs deleted file mode 100644 index d4f71356..00000000 --- a/hotstuff/src/aggregator.rs +++ /dev/null @@ -1,139 +0,0 @@ -use crate::consensus::Round; -use crate::error::{ConsensusError, ConsensusResult}; -use crate::messages::{Timeout, Vote, QC, TC}; -use config::{Committee, Stake}; -use crypto::Hash as _; -use crypto::{Digest, PublicKey, Signature}; -use std::collections::{HashMap, HashSet}; - -#[cfg(test)] -#[path = "tests/aggregator_tests.rs"] -pub mod aggregator_tests; - -pub struct Aggregator { - committee: Committee, - votes_aggregators: HashMap>>, - timeouts_aggregators: HashMap>, -} - -impl Aggregator { - pub fn new(committee: Committee) -> Self { - Self { - committee, - votes_aggregators: HashMap::new(), - timeouts_aggregators: HashMap::new(), - } - } - - pub fn add_vote(&mut self, vote: Vote) -> ConsensusResult> { - // TODO [issue #7]: A bad node may make us run out of memory by sending many votes - // with different round numbers or different digests. - - // Add the new vote to our aggregator and see if we have a QC. - self.votes_aggregators - .entry(vote.round) - .or_insert_with(HashMap::new) - .entry(vote.digest()) - .or_insert_with(|| Box::new(QCMaker::new())) - .append(vote, &self.committee) - } - - pub fn add_timeout(&mut self, timeout: Timeout) -> ConsensusResult> { - // TODO: A bad node may make us run out of memory by sending many timeouts - // with different round numbers. - - // Add the new timeout to our aggregator and see if we have a TC. - self.timeouts_aggregators - .entry(timeout.round) - .or_insert_with(|| Box::new(TCMaker::new())) - .append(timeout, &self.committee) - } - - pub fn cleanup(&mut self, round: &Round) { - self.votes_aggregators.retain(|k, _| k >= round); - self.timeouts_aggregators.retain(|k, _| k >= round); - } -} - -struct QCMaker { - weight: Stake, - votes: Vec<(PublicKey, Signature)>, - used: HashSet, -} - -impl QCMaker { - pub fn new() -> Self { - Self { - weight: 0, - votes: Vec::new(), - used: HashSet::new(), - } - } - - /// Try to append a signature to a (partial) quorum. - pub fn append(&mut self, vote: Vote, committee: &Committee) -> ConsensusResult> { - let author = vote.author; - - // Ensure it is the first time this authority votes. - ensure!( - self.used.insert(author), - ConsensusError::AuthorityReuse(author) - ); - - self.votes.push((author, vote.signature)); - self.weight += committee.stake(&author); - if self.weight >= committee.quorum_threshold() { - self.weight = 0; // Ensures QC is only made once. - return Ok(Some(QC { - hash: vote.hash.clone(), - round: vote.round, - votes: self.votes.clone(), - })); - } - Ok(None) - } -} - -struct TCMaker { - weight: Stake, - votes: Vec<(PublicKey, Signature, Round)>, - used: HashSet, -} - -impl TCMaker { - pub fn new() -> Self { - Self { - weight: 0, - votes: Vec::new(), - used: HashSet::new(), - } - } - - /// Try to append a signature to a (partial) quorum. - pub fn append( - &mut self, - timeout: Timeout, - committee: &Committee, - ) -> ConsensusResult> { - let author = timeout.author; - - // Ensure it is the first time this authority votes. - ensure!( - self.used.insert(author), - ConsensusError::AuthorityReuse(author) - ); - - // Add the timeout to the accumulator. - self.votes - .push((author, timeout.signature, timeout.high_qc.round)); - self.weight += committee.stake(&author); - if self.weight >= committee.quorum_threshold() { - self.weight = 0; // Ensures TC is only created once. - return Ok(Some(TC { - round: timeout.round, - votes: self.votes.clone(), - })); - } - Ok(None) - } -} diff --git a/hotstuff/src/committer.rs b/hotstuff/src/committer.rs deleted file mode 100644 index 2ec362f0..00000000 --- a/hotstuff/src/committer.rs +++ /dev/null @@ -1,279 +0,0 @@ -use crate::consensus::{Round, CHANNEL_CAPACITY}; -use crate::error::{ConsensusError, ConsensusResult}; -use config::Committee; -use crypto::Hash as _; -use crypto::{Digest, PublicKey}; -use futures::future::try_join_all; -use futures::stream::FuturesOrdered; -use futures::stream::StreamExt as _; -use log::{debug, error, info, log_enabled}; -use primary::Certificate; -use std::cmp::max; -use std::collections::{HashMap, HashSet}; -use store::Store; -use tokio::sync::mpsc::{channel, Receiver, Sender}; - -/// The representation of the DAG in memory. -type Dag = HashMap>; - -/// The state that needs to be persisted for crash-recovery. -struct State { - /// The last committed round. - last_committed_round: Round, - // Keeps the last committed round for each authority. This map is used to clean up the dag and - // ensure we don't commit twice the same certificate. - last_committed: HashMap, - /// Keeps the latest committed certificate (and its parents) for every authority. Anything older - /// must be regularly cleaned up through the function `update`. - dag: Dag, -} - -impl State { - fn new(genesis: Vec) -> Self { - let genesis = genesis - .into_iter() - .map(|x| (x.origin(), (x.digest(), x))) - .collect::>(); - - Self { - last_committed_round: 0, - last_committed: genesis.iter().map(|(x, (_, y))| (*x, y.round())).collect(), - dag: [(0, genesis)].iter().cloned().collect(), - } - } - - /// Update and clean up internal state base on committed certificates. - fn update(&mut self, certificate: &Certificate, gc_depth: Round) { - self.last_committed - .entry(certificate.origin()) - .and_modify(|r| *r = max(*r, certificate.round())) - .or_insert_with(|| certificate.round()); - - let last_committed_round = *self.last_committed.values().max().unwrap(); - self.last_committed_round = last_committed_round; - - for (name, round) in &self.last_committed { - self.dag.retain(|r, authorities| { - authorities.retain(|n, _| n != name || r >= round); - !authorities.is_empty() && r + gc_depth >= last_committed_round - }); - } - } -} - -pub struct Committer { - gc_depth: Round, - rx_mempool: Receiver, - rx_deliver: Receiver, - genesis: Vec, -} - -impl Committer { - pub fn spawn( - committee: Committee, - store: Store, - gc_depth: Round, - rx_mempool: Receiver, - rx_commit: Receiver, - ) { - let (tx_deliver, rx_deliver) = channel(CHANNEL_CAPACITY); - - tokio::spawn(async move { - CertificateWaiter::spawn(store, rx_commit, tx_deliver); - }); - - tokio::spawn(async move { - Self { - gc_depth, - rx_mempool, - rx_deliver, - genesis: Certificate::genesis(&committee), - } - .run() - .await; - }); - } - - async fn run(&mut self) { - // The consensus state (everything else is immutable). - let mut state = State::new(self.genesis.clone()); - - loop { - tokio::select! { - Some(certificate) = self.rx_mempool.recv() => { - // Add the new certificate to the local storage. - state.dag.entry(certificate.round()).or_insert_with(HashMap::new).insert( - certificate.origin(), - (certificate.digest(), certificate.clone()), - ); - }, - Some(certificate) = self.rx_deliver.recv() => { - debug!("Processing {:?}", certificate); - - // Ensure we didn't already order this certificate. - if let Some(r) = state.last_committed.get(&certificate.origin()) { - if r >= &certificate.round() { - continue; - } - } - - // Flatten the sub-dag referenced by the certificate. - let mut sequence = Vec::new(); - for x in self.order_dag(&certificate, &state) { - // Update and clean up internal state. - state.update(&x, self.gc_depth); - - // Add the certificate to the sequence. - sequence.push(x); - } - - // Log the latest committed round of every authority (for debug). - if log_enabled!(log::Level::Debug) { - for (name, round) in &state.last_committed { - debug!("Latest commit of {}: Round {}", name, round); - } - } - - // Print the committed sequence in the right order. - for certificate in sequence { - info!("Committed {}", certificate.header); - - #[cfg(feature = "benchmark")] - for digest in certificate.header.payload.keys() { - // NOTE: This log entry is used to compute performance. - info!("Committed {} -> {:?}", certificate.header, digest); - } - } - } - } - } - } - - /// Flatten the dag referenced by the input certificate. This is a classic depth-first search (pre-order): - /// https://en.wikipedia.org/wiki/Tree_traversal#Pre-order - fn order_dag(&self, tip: &Certificate, state: &State) -> Vec { - debug!("Processing sub-dag of {:?}", tip); - let mut ordered = Vec::new(); - let mut already_ordered = HashSet::new(); - - let mut buffer = vec![tip]; - while let Some(x) = buffer.pop() { - debug!("Sequencing {:?}", x); - ordered.push(x.clone()); - for parent in &x.header.parents { - let (digest, certificate) = match state - .dag - .get(&(x.round() - 1)) - .map(|x| x.values().find(|(x, _)| x == parent)) - .flatten() - { - Some(x) => x, - None => { - debug!("We already processed and cleaned up {}", parent); - continue; // We already ordered or GC up to here. - } - }; - - // We skip the certificate if we (1) already processed it or (2) we reached a round that we already - // committed for this authority. - let mut skip = already_ordered.contains(&digest); - skip |= state - .last_committed - .get(&certificate.origin()) - .map_or_else(|| false, |r| r == &certificate.round()); - if !skip { - buffer.push(certificate); - already_ordered.insert(digest); - } - } - } - - // Ensure we do not commit garbage collected certificates. - ordered.retain(|x| x.round() + self.gc_depth >= state.last_committed_round); - - // Ordering the output by round is not really necessary but it makes the commit sequence prettier. - ordered.sort_by_key(|x| x.round()); - ordered - } -} - -/// Waits to receive all the ancestors of a certificate before sending it through the output -/// channel. The outputs are in the same order as the input (FIFO). -pub struct CertificateWaiter { - /// The persistent storage. - store: Store, - /// Receives input certificates. - rx_input: Receiver, - /// Outputs the certificates once we have all its parents. - tx_output: Sender, -} - -impl CertificateWaiter { - pub fn spawn(store: Store, rx_input: Receiver, tx_output: Sender) { - tokio::spawn(async move { - Self { - store, - rx_input, - tx_output, - } - .run() - .await - }); - } - - /// Helper function. It waits for particular data to become available in the storage - /// and then delivers the specified header. - async fn waiter( - mut missing: Vec<(Vec, Store)>, - deliver: Certificate, - ) -> ConsensusResult { - let waiting: Vec<_> = missing - .iter_mut() - .map(|(x, y)| y.notify_read(x.to_vec())) - .collect(); - - try_join_all(waiting) - .await - .map(|_| deliver) - .map_err(ConsensusError::from) - } - - async fn run(&mut self) { - let mut waiting = FuturesOrdered::new(); - loop { - tokio::select! { - Some(certificate) = self.rx_input.recv() => { - // Skip genesis' children. - if certificate.round() == 1 { - self.tx_output.send(certificate).await.expect("Failed to send certificate"); - continue; - } - - debug!("Waiting for history of {:?}", certificate); - - // Add the certificate to the waiter pool. The waiter will return it to us - // when all its parents are in the store. - let wait_for = certificate - .header - .parents - .iter() - .cloned() - .map(|x| (x.to_vec(), self.store.clone())) - .collect(); - let fut = Self::waiter(wait_for, certificate); - waiting.push(fut); - } - Some(result) = waiting.next() => match result { - Ok(certificate) => { - debug!("Got all the history of {:?}", certificate); - self.tx_output.send(certificate).await.expect("Failed to send certificate"); - }, - Err(e) => { - error!("{}", e); - panic!("Storage failure: killing node."); - } - }, - } - } - } -} diff --git a/hotstuff/src/config.rs b/hotstuff/src/config.rs deleted file mode 100644 index e21048b7..00000000 --- a/hotstuff/src/config.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crypto::PublicKey; -use log::info; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::net::SocketAddr; - -pub type Stake = u32; -pub type EpochNumber = u128; - -#[derive(Serialize, Deserialize)] -pub struct Parameters { - pub timeout_delay: u64, - pub sync_retry_delay: u64, -} - -impl Default for Parameters { - fn default() -> Self { - Self { - timeout_delay: 5_000, - sync_retry_delay: 10_000, - } - } -} - -impl Parameters { - pub fn log(&self) { - // NOTE: These log entries are used to compute performance. - info!("Timeout delay set to {} rounds", self.timeout_delay); - info!("Sync retry delay set to {} ms", self.sync_retry_delay); - } -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct Authority { - pub stake: Stake, - pub address: SocketAddr, -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct Committee { - pub authorities: HashMap, - pub epoch: EpochNumber, -} - -impl Committee { - pub fn new(info: Vec<(PublicKey, Stake, SocketAddr)>, epoch: EpochNumber) -> Self { - Self { - authorities: info - .into_iter() - .map(|(name, stake, address)| { - let authority = Authority { stake, address }; - (name, authority) - }) - .collect(), - epoch, - } - } - - pub fn size(&self) -> usize { - self.authorities.len() - } - - pub fn stake(&self, name: &PublicKey) -> Stake { - self.authorities.get(&name).map_or_else(|| 0, |x| x.stake) - } - - pub fn quorum_threshold(&self) -> Stake { - // If N = 3f + 1 + k (0 <= k < 3) - // then (2 N + 3) / 3 = 2f + 1 + (2k + 2)/3 = 2f + 1 + k = N - f - let total_votes: Stake = self.authorities.values().map(|x| x.stake).sum(); - 2 * total_votes / 3 + 1 - } - - pub fn address(&self, name: &PublicKey) -> Option { - self.authorities.get(name).map(|x| x.address) - } - - pub fn broadcast_addresses(&self, myself: &PublicKey) -> Vec<(PublicKey, SocketAddr)> { - self.authorities - .iter() - .filter(|(name, _)| name != &myself) - .map(|(name, x)| (*name, x.address)) - .collect() - } -} diff --git a/hotstuff/src/consensus.rs b/hotstuff/src/consensus.rs deleted file mode 100644 index e19481ca..00000000 --- a/hotstuff/src/consensus.rs +++ /dev/null @@ -1,177 +0,0 @@ -use crate::committer::Committer; -use crate::core::Core; -use crate::error::ConsensusError; -use crate::helper::Helper; -use crate::leader::LeaderElector; -use crate::mempool::MempoolDriver; -use crate::messages::{Block, Timeout, Vote, TC}; -use crate::proposer::Proposer; -use crate::synchronizer::Synchronizer; -use async_trait::async_trait; -use bytes::Bytes; -use config::{Committee, Parameters}; -use crypto::{Digest, PublicKey, SignatureService}; -use futures::SinkExt as _; -use log::info; -use network::{MessageHandler, Receiver as NetworkReceiver, Writer}; -use primary::Certificate; -use serde::{Deserialize, Serialize}; -use std::error::Error; -use store::Store; -use tokio::sync::mpsc::{channel, Receiver, Sender}; - -#[cfg(test)] -#[path = "tests/consensus_tests.rs"] -pub mod consensus_tests; - -/// The default channel capacity for each channel of the consensus. -pub const CHANNEL_CAPACITY: usize = 1_000; - -/// The consensus round number. -pub type Round = u64; - -#[derive(Serialize, Deserialize, Debug)] -pub enum ConsensusMessage { - Propose(Block), - Vote(Vote), - Timeout(Timeout), - TC(TC), - SyncRequest(Digest, PublicKey), -} - -pub struct Consensus; - -impl Consensus { - #[allow(clippy::too_many_arguments)] - pub fn spawn( - name: PublicKey, - committee: Committee, - parameters: Parameters, - signature_service: SignatureService, - store: Store, - rx_mempool: Receiver, - tx_mempool: Sender, - tx_output: Sender, - ) { - // NOTE: This log entry is used to compute performance. - parameters.log(); - - let (tx_consensus, rx_consensus) = channel(CHANNEL_CAPACITY); - let (tx_loopback, rx_loopback) = channel(CHANNEL_CAPACITY); - let (tx_proposer, rx_proposer) = channel(CHANNEL_CAPACITY); - let (tx_helper, rx_helper) = channel(CHANNEL_CAPACITY); - let (tx_commit, rx_commit) = channel(CHANNEL_CAPACITY); - let (tx_mempool_copy, rx_mempool_copy) = channel(CHANNEL_CAPACITY); - - // Spawn the network receiver. - let mut address = committee - .consensus(&name) - .expect("Our public key is not in the committee") - .consensus_to_consensus; - address.set_ip("0.0.0.0".parse().unwrap()); - NetworkReceiver::spawn( - address, - /* handler */ - ConsensusReceiverHandler { - tx_consensus, - tx_helper, - }, - ); - info!( - "Node {} listening to consensus messages on {}", - name, address - ); - - // Make the leader election module. - let leader_elector = LeaderElector::new(committee.clone()); - - // Make the mempool driver. - let mempool_driver = MempoolDriver::new(committee.clone(), tx_mempool); - - // Make the synchronizer. - let synchronizer = Synchronizer::new( - name, - committee.clone(), - store.clone(), - tx_loopback.clone(), - parameters.sync_retry_delay, - ); - - // Spawn the consensus core. - Core::spawn( - name, - committee.clone(), - signature_service.clone(), - store.clone(), - leader_elector, - mempool_driver, - synchronizer, - parameters.timeout_delay, - /* rx_message */ rx_consensus, - rx_loopback, - tx_proposer, - tx_commit, - tx_output, - ); - - // Commits the mempool certificates and their sub-dag. - Committer::spawn( - committee.clone(), - store.clone(), - parameters.gc_depth, - rx_mempool_copy, - rx_commit, - ); - - // Spawn the block proposer. - Proposer::spawn( - name, - committee.clone(), - signature_service, - rx_mempool, - /* rx_message */ rx_proposer, - tx_loopback, - tx_mempool_copy, - ); - - // Spawn the helper module. - Helper::spawn(committee, store, /* rx_requests */ rx_helper); - } -} - -/// Defines how the network receiver handles incoming primary messages. -#[derive(Clone)] -struct ConsensusReceiverHandler { - tx_consensus: Sender, - tx_helper: Sender<(Digest, PublicKey)>, -} - -#[async_trait] -impl MessageHandler for ConsensusReceiverHandler { - async fn dispatch(&self, writer: &mut Writer, serialized: Bytes) -> Result<(), Box> { - // Deserialize and parse the message. - match bincode::deserialize(&serialized).map_err(ConsensusError::SerializationError)? { - ConsensusMessage::SyncRequest(missing, origin) => self - .tx_helper - .send((missing, origin)) - .await - .expect("Failed to send consensus message"), - message @ ConsensusMessage::Propose(..) => { - // Reply with an ACK. - let _ = writer.send(Bytes::from("Ack")).await; - - // Pass the message to the consensus core. - self.tx_consensus - .send(message) - .await - .expect("Failed to consensus message") - } - message => self - .tx_consensus - .send(message) - .await - .expect("Failed to consensus message"), - } - Ok(()) - } -} diff --git a/hotstuff/src/core.rs b/hotstuff/src/core.rs deleted file mode 100644 index 679d3dbd..00000000 --- a/hotstuff/src/core.rs +++ /dev/null @@ -1,430 +0,0 @@ -use crate::aggregator::Aggregator; -use crate::consensus::{ConsensusMessage, Round}; -use crate::error::{ConsensusError, ConsensusResult}; -use crate::leader::LeaderElector; -use crate::mempool::MempoolDriver; -use crate::messages::{Block, Timeout, Vote, QC, TC}; -use crate::proposer::ProposerMessage; -use crate::synchronizer::Synchronizer; -use crate::timer::Timer; -use async_recursion::async_recursion; -use bytes::Bytes; -use config::Committee; -use crypto::Hash as _; -use crypto::{PublicKey, SignatureService}; -use log::{debug, error, warn}; -use network::SimpleSender; -use primary::Certificate; -use std::cmp::max; -use std::collections::VecDeque; -use store::Store; -use tokio::sync::mpsc::{Receiver, Sender}; - -#[cfg(test)] -#[path = "tests/core_tests.rs"] -pub mod core_tests; - -pub struct Core { - name: PublicKey, - committee: Committee, - store: Store, - signature_service: SignatureService, - leader_elector: LeaderElector, - mempool_driver: MempoolDriver, - synchronizer: Synchronizer, - rx_message: Receiver, - rx_loopback: Receiver, - tx_proposer: Sender, - tx_commit: Sender, - tx_output: Sender, - round: Round, - last_voted_round: Round, - last_committed_round: Round, - high_qc: QC, - timer: Timer, - aggregator: Aggregator, - network: SimpleSender, -} - -impl Core { - #[allow(clippy::too_many_arguments)] - pub fn spawn( - name: PublicKey, - committee: Committee, - signature_service: SignatureService, - store: Store, - leader_elector: LeaderElector, - mempool_driver: MempoolDriver, - synchronizer: Synchronizer, - timeout_delay: u64, - rx_message: Receiver, - rx_loopback: Receiver, - tx_proposer: Sender, - tx_commit: Sender, - tx_output: Sender, - ) { - tokio::spawn(async move { - Self { - name, - committee: committee.clone(), - signature_service, - store, - leader_elector, - mempool_driver, - synchronizer, - rx_message, - rx_loopback, - tx_proposer, - tx_commit, - tx_output, - round: 1, - last_voted_round: 0, - last_committed_round: 0, - high_qc: QC::genesis(), - timer: Timer::new(timeout_delay), - aggregator: Aggregator::new(committee), - network: SimpleSender::new(), - } - .run() - .await - }); - } - - async fn store_block(&mut self, block: &Block) { - let key = block.digest().to_vec(); - let value = bincode::serialize(block).expect("Failed to serialize block"); - self.store.write(key, value).await; - } - - fn increase_last_voted_round(&mut self, target: Round) { - self.last_voted_round = max(self.last_voted_round, target); - } - - async fn make_vote(&mut self, block: &Block) -> Option { - // Check if we can vote for this block. - let safety_rule_1 = block.round > self.last_voted_round; - let mut safety_rule_2 = block.qc.round + 1 == block.round; - if let Some(ref tc) = block.tc { - let mut can_extend = tc.round + 1 == block.round; - can_extend &= block.qc.round >= *tc.high_qc_rounds().iter().max().expect("Empty TC"); - safety_rule_2 |= can_extend; - } - if !(safety_rule_1 && safety_rule_2) { - return None; - } - - // Ensure we won't vote for contradicting blocks. - self.increase_last_voted_round(block.round); - // TODO [issue #15]: Write to storage preferred_round and last_voted_round. - Some(Vote::new(&block, self.name, self.signature_service.clone()).await) - } - - async fn commit(&mut self, block: Block) -> ConsensusResult<()> { - if self.last_committed_round >= block.round { - return Ok(()); - } - - let mut to_commit = VecDeque::new(); - to_commit.push_back(block.clone()); - - // Ensure we commit the entire chain. This is needed after view-change. - let mut parent = block.clone(); - while self.last_committed_round + 1 < parent.round { - let ancestor = self - .synchronizer - .get_parent_block(&parent) - .await? - .expect("We should have all the ancestors by now"); - to_commit.push_back(ancestor.clone()); - parent = ancestor; - } - - // Save the last committed block. - self.last_committed_round = block.round; - - // Send all the newly committed blocks to the node's application layer. - while let Some(block) = to_commit.pop_back() { - debug!("Committed {:?}", block); - - // Output the block to the top-level application. - if let Err(e) = self.tx_output.send(block.clone()).await { - warn!("Failed to send block through the output channel: {}", e); - } - - // Send the payload to the committer. - for certificate in &block.payload { - self.tx_commit - .send(certificate.clone()) - .await - .expect("Failed to send payload"); - } - - // Cleanup the mempool. - self.mempool_driver.cleanup(block.payload).await; - } - Ok(()) - } - - fn update_high_qc(&mut self, qc: &QC) { - if qc.round > self.high_qc.round { - self.high_qc = qc.clone(); - } - } - - async fn local_timeout_round(&mut self) -> ConsensusResult<()> { - warn!("Timeout reached for round {}", self.round); - - // Increase the last voted round. - self.increase_last_voted_round(self.round); - - // Make a timeout message. - let timeout = Timeout::new( - self.high_qc.clone(), - self.round, - self.name, - self.signature_service.clone(), - ) - .await; - debug!("Created {:?}", timeout); - - // Reset the timer. - self.timer.reset(); - - // Broadcast the timeout message. - debug!("Broadcasting {:?}", timeout); - let addresses = self - .committee - .others_consensus(&self.name) - .into_iter() - .map(|(_, x)| x.consensus_to_consensus) - .collect(); - let message = bincode::serialize(&ConsensusMessage::Timeout(timeout.clone())) - .expect("Failed to serialize timeout message"); - self.network - .broadcast(addresses, Bytes::from(message)) - .await; - - // Process our message. - self.handle_timeout(&timeout).await - } - - #[async_recursion] - async fn handle_vote(&mut self, vote: &Vote) -> ConsensusResult<()> { - debug!("Processing {:?}", vote); - if vote.round < self.round { - return Ok(()); - } - - // Ensure the vote is well formed. - vote.verify(&self.committee)?; - - // Add the new vote to our aggregator and see if we have a quorum. - if let Some(qc) = self.aggregator.add_vote(vote.clone())? { - debug!("Assembled {:?}", qc); - - // Process the QC. - self.process_qc(&qc).await; - - // Make a new block if we are the next leader. - if self.name == self.leader_elector.get_leader(self.round) { - self.generate_proposal(None).await; - } - } - Ok(()) - } - - async fn handle_timeout(&mut self, timeout: &Timeout) -> ConsensusResult<()> { - debug!("Processing {:?}", timeout); - if timeout.round < self.round { - return Ok(()); - } - - // Ensure the timeout is well formed. - timeout.verify(&self.committee)?; - - // Process the QC embedded in the timeout. - self.process_qc(&timeout.high_qc).await; - - // Add the new vote to our aggregator and see if we have a quorum. - if let Some(tc) = self.aggregator.add_timeout(timeout.clone())? { - debug!("Assembled {:?}", tc); - - // Try to advance the round. - self.advance_round(tc.round).await; - - // Broadcast the TC. - debug!("Broadcasting {:?}", tc); - let addresses = self - .committee - .others_consensus(&self.name) - .into_iter() - .map(|(_, x)| x.consensus_to_consensus) - .collect(); - let message = bincode::serialize(&ConsensusMessage::TC(tc.clone())) - .expect("Failed to serialize timeout certificate"); - self.network - .broadcast(addresses, Bytes::from(message)) - .await; - - // Make a new block if we are the next leader. - if self.name == self.leader_elector.get_leader(self.round) { - self.generate_proposal(Some(tc)).await; - } - } - Ok(()) - } - - #[async_recursion] - async fn advance_round(&mut self, round: Round) { - if round < self.round { - return; - } - // Reset the timer and advance round. - self.timer.reset(); - self.round = round + 1; - debug!("Moved to round {}", self.round); - - // Cleanup the vote aggregator. - self.aggregator.cleanup(&self.round); - } - - #[async_recursion] - async fn generate_proposal(&mut self, tc: Option) { - self.tx_proposer - .send(ProposerMessage(self.round, self.high_qc.clone(), tc)) - .await - .expect("Failed to send message to proposer"); - } - - async fn process_qc(&mut self, qc: &QC) { - debug!("Processing {:?}", qc); - self.advance_round(qc.round).await; - self.update_high_qc(qc); - } - - #[async_recursion] - async fn process_block(&mut self, block: &Block) -> ConsensusResult<()> { - debug!("Processing {:?}", block); - - // Let's see if we have the last three ancestors of the block, that is: - // b0 <- |qc0; b1| <- |qc1; block| - // If we don't, the synchronizer asks for them to other nodes. It will - // then ensure we process both ancestors in the correct order, and - // finally make us resume processing this block. - let (b0, b1) = match self.synchronizer.get_ancestors(block).await? { - Some(ancestors) => ancestors, - None => { - debug!("Processing of {} suspended: missing parent", block.digest()); - return Ok(()); - } - }; - - // Store the block only if we have already processed all its ancestors. - self.store_block(block).await; - - // Check if we can commit the head of the 2-chain. - // Note that we commit blocks only if we have all its ancestors. - if b0.round + 1 == b1.round { - self.commit(b0).await?; - } - - // Ensure the block's round is as expected. - // This check is important: it prevents bad leaders from producing blocks - // far in the future that may cause overflow on the round number. - if block.round != self.round { - return Ok(()); - } - - // See if we can vote for this block. - if let Some(vote) = self.make_vote(block).await { - debug!("Created {:?}", vote); - let next_leader = self.leader_elector.get_leader(self.round + 1); - if next_leader == self.name { - self.handle_vote(&vote).await?; - } else { - debug!("Sending {:?} to {}", vote, next_leader); - let address = self - .committee - .consensus(&next_leader) - .expect("The next leader is not in the committee") - .consensus_to_consensus; - let message = bincode::serialize(&ConsensusMessage::Vote(vote)) - .expect("Failed to serialize vote"); - self.network.send(address, Bytes::from(message)).await; - } - } - Ok(()) - } - - async fn handle_proposal(&mut self, block: &Block) -> ConsensusResult<()> { - let digest = block.digest(); - - // Ensure the block proposer is the right leader for the round. - ensure!( - block.author == self.leader_elector.get_leader(block.round), - ConsensusError::WrongLeader { - digest, - leader: block.author, - round: block.round - } - ); - - // Check the block is correctly formed. - block.verify(&self.committee)?; - - // Process the QC. This may allow us to advance round. - self.process_qc(&block.qc).await; - - // Process the TC (if any). This may also allow us to advance round. - if let Some(ref tc) = block.tc { - debug!("Processing (embedded) {:?}", tc); - self.advance_round(tc.round).await; - } - - // Check that the payload certificates are valid. - self.mempool_driver.verify(&block).await?; - - // All check pass, we can process this block. - self.process_block(block).await - } - - async fn handle_tc(&mut self, tc: TC) -> ConsensusResult<()> { - debug!("Processing {:?}", tc); - self.advance_round(tc.round).await; - if self.name == self.leader_elector.get_leader(self.round) { - self.generate_proposal(Some(tc)).await; - } - Ok(()) - } - - pub async fn run(&mut self) { - // Upon booting, generate the very first block (if we are the leader). - // Also, schedule a timer in case we don't hear from the leader. - self.timer.reset(); - if self.name == self.leader_elector.get_leader(self.round) { - self.generate_proposal(None).await; - } - - // This is the main loop: it processes incoming blocks and votes, - // and receive timeout notifications from our Timeout Manager. - loop { - let result = tokio::select! { - Some(message) = self.rx_message.recv() => match message { - ConsensusMessage::Propose(block) => self.handle_proposal(&block).await, - ConsensusMessage::Vote(vote) => self.handle_vote(&vote).await, - ConsensusMessage::Timeout(timeout) => self.handle_timeout(&timeout).await, - ConsensusMessage::TC(tc) => self.handle_tc(tc).await, - _ => panic!("Unexpected protocol message") - }, - Some(block) = self.rx_loopback.recv() => self.process_block(&block).await, - () = &mut self.timer => self.local_timeout_round().await, - }; - match result { - Ok(()) => (), - Err(ConsensusError::StoreError(e)) => error!("{}", e), - Err(ConsensusError::SerializationError(e)) => error!("Store corrupted. {}", e), - Err(e) => warn!("{}", e), - } - } - } -} diff --git a/hotstuff/src/error.rs b/hotstuff/src/error.rs deleted file mode 100644 index cf03b95f..00000000 --- a/hotstuff/src/error.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::consensus::Round; -use crypto::{CryptoError, Digest, PublicKey}; -use primary::DagError; -use store::StoreError; -use thiserror::Error; - -#[macro_export] -macro_rules! bail { - ($e:expr) => { - return Err($e); - }; -} - -#[macro_export(local_inner_macros)] -macro_rules! ensure { - ($cond:expr, $e:expr) => { - if !($cond) { - bail!($e); - } - }; -} - -pub type ConsensusResult = Result; - -#[derive(Error, Debug)] -pub enum ConsensusError { - #[error("Network error: {0}")] - NetworkError(#[from] std::io::Error), - - #[error("Serialization error: {0}")] - SerializationError(#[from] Box), - - #[error("Store error: {0}")] - StoreError(#[from] StoreError), - - #[error("Node {0} is not in the committee")] - NotInCommittee(PublicKey), - - #[error("Invalid signature")] - InvalidSignature(#[from] CryptoError), - - #[error("Received more than one vote from {0}")] - AuthorityReuse(PublicKey), - - #[error("Received vote from unknown authority {0}")] - UnknownAuthority(PublicKey), - - #[error("Received QC without a quorum")] - QCRequiresQuorum, - - #[error("Received TC without a quorum")] - TCRequiresQuorum, - - #[error("Malformed block {0}")] - MalformedBlock(Digest), - - #[error("Received block {digest} from leader {leader} at round {round}")] - WrongLeader { - digest: Digest, - leader: PublicKey, - round: Round, - }, - - #[error("Invalid payload")] - InvalidPayload, - - #[error("Message {0} (round {1}) too old")] - TooOld(Digest, Round), - - #[error(transparent)] - DagError(#[from] DagError), -} diff --git a/hotstuff/src/helper.rs b/hotstuff/src/helper.rs deleted file mode 100644 index 8e397491..00000000 --- a/hotstuff/src/helper.rs +++ /dev/null @@ -1,68 +0,0 @@ -use crate::consensus::ConsensusMessage; -use bytes::Bytes; -use config::Committee; -use crypto::{Digest, PublicKey}; -use log::warn; -use network::SimpleSender; -use store::Store; -use tokio::sync::mpsc::Receiver; - -#[cfg(test)] -#[path = "tests/helper_tests.rs"] -pub mod helper_tests; - -/// A task dedicated to help other authorities by replying to their sync requests. -pub struct Helper { - /// The committee information. - committee: Committee, - /// The persistent storage. - store: Store, - /// Input channel to receive sync requests. - rx_requests: Receiver<(Digest, PublicKey)>, - /// A network sender to reply to the sync requests. - network: SimpleSender, -} - -impl Helper { - pub fn spawn(committee: Committee, store: Store, rx_requests: Receiver<(Digest, PublicKey)>) { - tokio::spawn(async move { - Self { - committee, - store, - rx_requests, - network: SimpleSender::new(), - } - .run() - .await; - }); - } - - async fn run(&mut self) { - while let Some((digest, origin)) = self.rx_requests.recv().await { - // TODO [issue #58]: Do some accounting to prevent bad nodes from monopolizing our resources. - - // get the requestors address. - let address = match self.committee.consensus(&origin) { - Ok(x) => x.consensus_to_consensus, - Err(e) => { - warn!("Received unexpected sync request: {}", e); - continue; - } - }; - - // Reply to the request (if we can). - if let Some(bytes) = self - .store - .read(digest.to_vec()) - .await - .expect("Failed to read from storage") - { - let block = - bincode::deserialize(&bytes).expect("Failed to deserialize our own block"); - let message = bincode::serialize(&ConsensusMessage::Propose(block)) - .expect("Failed to serialize block"); - self.network.send(address, Bytes::from(message)).await; - } - } - } -} diff --git a/hotstuff/src/leader.rs b/hotstuff/src/leader.rs deleted file mode 100644 index c85e1153..00000000 --- a/hotstuff/src/leader.rs +++ /dev/null @@ -1,21 +0,0 @@ -use crate::consensus::Round; -use config::Committee; -use crypto::PublicKey; - -pub type LeaderElector = RRLeaderElector; - -pub struct RRLeaderElector { - committee: Committee, -} - -impl RRLeaderElector { - pub fn new(committee: Committee) -> Self { - Self { committee } - } - - pub fn get_leader(&self, round: Round) -> PublicKey { - let mut keys: Vec<_> = self.committee.authorities.keys().cloned().collect(); - keys.sort(); - keys[round as usize % self.committee.size()] - } -} diff --git a/hotstuff/src/lib.rs b/hotstuff/src/lib.rs deleted file mode 100644 index 5425f50e..00000000 --- a/hotstuff/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -#[macro_use] -mod error; -mod aggregator; -mod committer; -mod consensus; -mod core; -mod helper; -mod leader; -mod mempool; -mod messages; -mod proposer; -mod synchronizer; -mod timer; - -#[cfg(test)] -#[path = "tests/common.rs"] -mod common; - -pub use crate::consensus::Consensus; -pub use crate::messages::{Block, QC, TC}; diff --git a/hotstuff/src/mempool.rs b/hotstuff/src/mempool.rs deleted file mode 100644 index 1657f039..00000000 --- a/hotstuff/src/mempool.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::error::ConsensusResult; -use crate::messages::Block; -use config::Committee; -use primary::Certificate; -use tokio::sync::mpsc::Sender; - -pub struct MempoolDriver { - committee: Committee, - tx_mempool: Sender, -} - -impl MempoolDriver { - pub fn new(committee: Committee, tx_mempool: Sender) -> Self { - Self { - committee, - tx_mempool, - } - } - - /// Verify the payload certificates. - pub async fn verify(&mut self, block: &Block) -> ConsensusResult<()> { - for certificate in &block.payload { - certificate.verify(&self.committee)?; - } - Ok(()) - } - - /// Cleanup the mempool. - pub async fn cleanup(&mut self, payload: Vec) { - for certificate in payload { - self.tx_mempool - .send(certificate) - .await - .expect("Failed to send cleanup message"); - } - } -} diff --git a/hotstuff/src/messages.rs b/hotstuff/src/messages.rs deleted file mode 100644 index 1cb21cc8..00000000 --- a/hotstuff/src/messages.rs +++ /dev/null @@ -1,327 +0,0 @@ -use crate::consensus::Round; -use crate::error::{ConsensusError, ConsensusResult}; -use config::Committee; -use crypto::{Digest, Hash, PublicKey, Signature, SignatureService}; -use ed25519_dalek::Digest as _; -use ed25519_dalek::Sha512; -use primary::Certificate; -use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::convert::TryInto; -use std::fmt; - -#[cfg(test)] -#[path = "tests/messages_tests.rs"] -pub mod messages_tests; - -#[derive(Serialize, Deserialize, Default, Clone)] -pub struct Block { - pub qc: QC, - pub tc: Option, - pub author: PublicKey, - pub round: Round, - pub payload: Vec, - pub signature: Signature, -} - -impl Block { - pub async fn new( - qc: QC, - tc: Option, - author: PublicKey, - round: Round, - payload: Vec, - mut signature_service: SignatureService, - ) -> Self { - let block = Self { - qc, - tc, - author, - round, - payload, - signature: Signature::default(), - }; - let signature = signature_service.request_signature(block.digest()).await; - Self { signature, ..block } - } - - pub fn genesis() -> Self { - Block::default() - } - - pub fn parent(&self) -> &Digest { - &self.qc.hash - } - - pub fn verify(&self, committee: &Committee) -> ConsensusResult<()> { - // Ensure the authority has voting rights. - let voting_rights = committee.stake(&self.author); - ensure!( - voting_rights > 0, - ConsensusError::UnknownAuthority(self.author) - ); - - // Check the signature. - self.signature.verify(&self.digest(), &self.author)?; - - // Check the embedded QC. - if self.qc != QC::genesis() { - self.qc.verify(committee)?; - } - - // Check the TC embedded in the block (if any). - if let Some(ref tc) = self.tc { - tc.verify(committee)?; - } - Ok(()) - } -} - -impl Hash for Block { - fn digest(&self) -> Digest { - let mut hasher = Sha512::new(); - hasher.update(self.author.0); - hasher.update(self.round.to_le_bytes()); - for x in &self.payload { - hasher.update(&x.header.id); - } - hasher.update(&self.qc.hash); - Digest(hasher.finalize().as_slice()[..32].try_into().unwrap()) - } -} - -impl fmt::Debug for Block { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!( - f, - "{}: HSB({}, {}, {:?}, {})", - self.digest(), - self.author, - self.round, - self.qc, - self.payload.len(), - ) - } -} - -impl fmt::Display for Block { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "HSB{}", self.round) - } -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct Vote { - pub hash: Digest, - pub round: Round, - pub author: PublicKey, - pub signature: Signature, -} - -impl Vote { - pub async fn new( - block: &Block, - author: PublicKey, - mut signature_service: SignatureService, - ) -> Self { - let vote = Self { - hash: block.digest(), - round: block.round, - author, - signature: Signature::default(), - }; - let signature = signature_service.request_signature(vote.digest()).await; - Self { signature, ..vote } - } - - pub fn verify(&self, committee: &Committee) -> ConsensusResult<()> { - // Ensure the authority has voting rights. - ensure!( - committee.stake(&self.author) > 0, - ConsensusError::UnknownAuthority(self.author) - ); - - // Check the signature. - self.signature.verify(&self.digest(), &self.author)?; - Ok(()) - } -} - -impl Hash for Vote { - fn digest(&self) -> Digest { - let mut hasher = Sha512::new(); - hasher.update(&self.hash); - hasher.update(self.round.to_le_bytes()); - Digest(hasher.finalize().as_slice()[..32].try_into().unwrap()) - } -} - -impl fmt::Debug for Vote { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "V({}, {}, {})", self.author, self.round, self.hash) - } -} - -#[derive(Clone, Serialize, Deserialize, Default)] -pub struct QC { - pub hash: Digest, - pub round: Round, - pub votes: Vec<(PublicKey, Signature)>, -} - -impl QC { - pub fn genesis() -> Self { - QC::default() - } - - pub fn timeout(&self) -> bool { - self.hash == Digest::default() && self.round != 0 - } - - pub fn verify(&self, committee: &Committee) -> ConsensusResult<()> { - // Ensure the QC has a quorum. - let mut weight = 0; - let mut used = HashSet::new(); - for (name, _) in self.votes.iter() { - ensure!(!used.contains(name), ConsensusError::AuthorityReuse(*name)); - let voting_rights = committee.stake(name); - ensure!(voting_rights > 0, ConsensusError::UnknownAuthority(*name)); - used.insert(*name); - weight += voting_rights; - } - ensure!( - weight >= committee.quorum_threshold(), - ConsensusError::QCRequiresQuorum - ); - - // Check the signatures. - Signature::verify_batch(&self.digest(), &self.votes).map_err(ConsensusError::from) - } -} - -impl Hash for QC { - fn digest(&self) -> Digest { - let mut hasher = Sha512::new(); - hasher.update(&self.hash); - hasher.update(self.round.to_le_bytes()); - Digest(hasher.finalize().as_slice()[..32].try_into().unwrap()) - } -} - -impl fmt::Debug for QC { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "QC({}, {})", self.hash, self.round) - } -} - -impl PartialEq for QC { - fn eq(&self, other: &Self) -> bool { - self.hash == other.hash && self.round == other.round - } -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct Timeout { - pub high_qc: QC, - pub round: Round, - pub author: PublicKey, - pub signature: Signature, -} - -impl Timeout { - pub async fn new( - high_qc: QC, - round: Round, - author: PublicKey, - mut signature_service: SignatureService, - ) -> Self { - let timeout = Self { - high_qc, - round, - author, - signature: Signature::default(), - }; - let signature = signature_service.request_signature(timeout.digest()).await; - Self { - signature, - ..timeout - } - } - - pub fn verify(&self, committee: &Committee) -> ConsensusResult<()> { - // Ensure the authority has voting rights. - ensure!( - committee.stake(&self.author) > 0, - ConsensusError::UnknownAuthority(self.author) - ); - - // Check the signature. - self.signature.verify(&self.digest(), &self.author)?; - - // Check the embedded QC. - if self.high_qc != QC::genesis() { - self.high_qc.verify(committee)?; - } - Ok(()) - } -} - -impl Hash for Timeout { - fn digest(&self) -> Digest { - let mut hasher = Sha512::new(); - hasher.update(self.round.to_le_bytes()); - hasher.update(self.high_qc.round.to_le_bytes()); - Digest(hasher.finalize().as_slice()[..32].try_into().unwrap()) - } -} - -impl fmt::Debug for Timeout { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "TV({}, {}, {:?})", self.author, self.round, self.high_qc) - } -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct TC { - pub round: Round, - pub votes: Vec<(PublicKey, Signature, Round)>, -} - -impl TC { - pub fn verify(&self, committee: &Committee) -> ConsensusResult<()> { - // Ensure the QC has a quorum. - let mut weight = 0; - let mut used = HashSet::new(); - for (name, _, _) in self.votes.iter() { - ensure!(!used.contains(name), ConsensusError::AuthorityReuse(*name)); - let voting_rights = committee.stake(name); - ensure!(voting_rights > 0, ConsensusError::UnknownAuthority(*name)); - used.insert(*name); - weight += voting_rights; - } - ensure!( - weight >= committee.quorum_threshold(), - ConsensusError::TCRequiresQuorum - ); - - // Check the signatures. - for (author, signature, high_qc_round) in &self.votes { - let mut hasher = Sha512::new(); - hasher.update(self.round.to_le_bytes()); - hasher.update(high_qc_round.to_le_bytes()); - let digest = Digest(hasher.finalize().as_slice()[..32].try_into().unwrap()); - signature.verify(&digest, &author)?; - } - Ok(()) - } - - pub fn high_qc_rounds(&self) -> Vec { - self.votes.iter().map(|(_, _, r)| r).cloned().collect() - } -} - -impl fmt::Debug for TC { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "TC({}, {:?})", self.round, self.high_qc_rounds()) - } -} diff --git a/hotstuff/src/proposer.rs b/hotstuff/src/proposer.rs deleted file mode 100644 index 91de5e6d..00000000 --- a/hotstuff/src/proposer.rs +++ /dev/null @@ -1,172 +0,0 @@ -use crate::consensus::{ConsensusMessage, Round}; -use crate::messages::{Block, QC, TC}; -use bytes::Bytes; -use config::{Committee, Stake}; -use crypto::{PublicKey, SignatureService}; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::stream::StreamExt as _; -use log::{debug, info}; -use network::{CancelHandler, ReliableSender}; -use primary::Certificate; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::time::{sleep, Duration, Instant}; - -#[derive(Debug)] -pub struct ProposerMessage(pub Round, pub QC, pub Option); - -pub struct Proposer { - name: PublicKey, - committee: Committee, - signature_service: SignatureService, - max_block_delay: u64, - rx_mempool: Receiver, - rx_message: Receiver, - tx_loopback: Sender, - tx_committer: Sender, - buffer: Vec, - network: ReliableSender, - leader: Option<(Round, QC, Option)>, -} - -impl Proposer { - pub fn spawn( - name: PublicKey, - committee: Committee, - signature_service: SignatureService, - rx_mempool: Receiver, - rx_message: Receiver, - tx_loopback: Sender, - tx_committer: Sender, - ) { - tokio::spawn(async move { - Self { - name, - committee, - signature_service, - max_block_delay: 2_000, - rx_mempool, - rx_message, - tx_loopback, - tx_committer, - buffer: Vec::new(), - network: ReliableSender::new(), - leader: None, - } - .run() - .await; - }); - } - - /// Helper function. It waits for a future to complete and then delivers a value. - async fn waiter(wait_for: CancelHandler, deliver: Stake) -> Stake { - let _ = wait_for.await; - deliver - } - - async fn make_block(&mut self, round: Round, qc: QC, tc: Option) { - // Generate a new block. - let block = Block::new( - qc, - tc, - self.name, - round, - /* payload */ self.buffer.drain(..).collect(), - self.signature_service.clone(), - ) - .await; - - info!("Created {:?}", block); - - // Broadcast our new block. - debug!("Broadcasting {:?}", block); - let (names, addresses): (Vec<_>, _) = self - .committee - .others_consensus(&self.name) - .into_iter() - .map(|(name, x)| (name, x.consensus_to_consensus)) - .unzip(); - let message = bincode::serialize(&ConsensusMessage::Propose(block.clone())) - .expect("Failed to serialize block"); - let handles = self - .network - .broadcast(addresses, Bytes::from(message)) - .await; - - // Send our block to the core for processing. - self.tx_loopback - .send(block) - .await - .expect("Failed to send block"); - - // Control system: Wait for 2f+1 nodes to acknowledge our block before continuing. - let mut wait_for_quorum: FuturesUnordered<_> = names - .into_iter() - .zip(handles.into_iter()) - .map(|(name, handler)| { - let stake = self.committee.stake(&name); - Self::waiter(handler, stake) - }) - .collect(); - - let mut total_stake = self.committee.stake(&self.name); - while let Some(stake) = wait_for_quorum.next().await { - total_stake += stake; - if total_stake >= self.committee.quorum_threshold() { - break; - } - } - - // TODO: Ugly -- needed for small committee sizes. - //sleep(Duration::from_millis(100)).await; - } - - async fn run(&mut self) { - let timer = sleep(Duration::from_millis(self.max_block_delay)); - tokio::pin!(timer); - - loop { - // Check if we can propose a new block. - let timer_expired = timer.is_elapsed(); - let got_payload = !self.buffer.is_empty(); - - if timer_expired || got_payload { - if let Some((round, qc, tc)) = self.leader.take() { - // Make a new block. - self.make_block(round, qc, tc).await; - - // Reschedule the timer. - let deadline = Instant::now() + Duration::from_millis(self.max_block_delay); - timer.as_mut().reset(deadline); - } - } - - tokio::select! { - Some(certificate) = self.rx_mempool.recv() => { - debug!("Received {:?}", certificate); - self.tx_committer - .send(certificate.clone()) - .await - .expect("Failed to send certificate to committer"); - - if self.buffer.is_empty() { - self.buffer.push(certificate); - continue; - } - if self.buffer[0].round() < certificate.round() { - self.buffer.push(certificate); - self.buffer.swap_remove(0); - } - }, - Some(ProposerMessage(round, qc, tc)) = self.rx_message.recv() => { - self.leader = Some((round, qc, tc)); - }, - () = &mut timer => { - // Nothing to do. - } - } - - // Give the change to schedule other tasks. - //tokio::task::yield_now().await; - } - } -} diff --git a/hotstuff/src/synchronizer.rs b/hotstuff/src/synchronizer.rs deleted file mode 100644 index cbf3da97..00000000 --- a/hotstuff/src/synchronizer.rs +++ /dev/null @@ -1,151 +0,0 @@ -use crate::consensus::{ConsensusMessage, CHANNEL_CAPACITY}; -use crate::error::ConsensusResult; -use crate::messages::{Block, QC}; -use bytes::Bytes; -use config::Committee; -use crypto::Hash as _; -use crypto::{Digest, PublicKey}; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::stream::StreamExt as _; -use log::{debug, error}; -use network::SimpleSender; -use std::collections::{HashMap, HashSet}; -use std::time::{SystemTime, UNIX_EPOCH}; -use store::Store; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::time::{sleep, Duration, Instant}; - -#[cfg(test)] -#[path = "tests/synchronizer_tests.rs"] -pub mod synchronizer_tests; - -const TIMER_ACCURACY: u64 = 5_000; - -pub struct Synchronizer { - store: Store, - inner_channel: Sender, -} - -impl Synchronizer { - pub fn new( - name: PublicKey, - committee: Committee, - store: Store, - tx_loopback: Sender, - sync_retry_delay: u64, - ) -> Self { - let mut network = SimpleSender::new(); - let (tx_inner, mut rx_inner): (_, Receiver) = channel(CHANNEL_CAPACITY); - - let store_copy = store.clone(); - tokio::spawn(async move { - let mut waiting = FuturesUnordered::new(); - let mut pending = HashSet::new(); - let mut requests = HashMap::new(); - - let timer = sleep(Duration::from_millis(TIMER_ACCURACY)); - tokio::pin!(timer); - loop { - tokio::select! { - Some(block) = rx_inner.recv() => { - if pending.insert(block.digest()) { - let parent = block.parent().clone(); - let author = block.author; - let fut = Self::waiter(store_copy.clone(), parent.clone(), block); - waiting.push(fut); - - if !requests.contains_key(&parent){ - debug!("Requesting sync for block {}", parent); - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Failed to measure time") - .as_millis(); - requests.insert(parent.clone(), now); - let address = committee - .consensus(&author) - .expect("Author of valid block is not in the committee") - .consensus_to_consensus; - let message = ConsensusMessage::SyncRequest(parent, name); - let message = bincode::serialize(&message) - .expect("Failed to serialize sync request"); - network.send(address, Bytes::from(message)).await; - } - } - }, - Some(result) = waiting.next() => match result { - Ok(block) => { - let _ = pending.remove(&block.digest()); - let _ = requests.remove(&block.parent()); - if let Err(e) = tx_loopback.send(block).await { - panic!("Failed to send message through core channel: {}", e); - } - }, - Err(e) => error!("{}", e) - }, - () = &mut timer => { - // This implements the 'perfect point to point link' abstraction. - for (digest, timestamp) in &requests { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Failed to measure time") - .as_millis(); - if timestamp + (sync_retry_delay as u128) < now { - debug!("Requesting sync for block {} (retry)", digest); - let addresses = committee - .others_consensus(&name) - .into_iter() - .map(|(_, x)| x.consensus_to_consensus) - .collect(); - let message = ConsensusMessage::SyncRequest(digest.clone(), name); - let message = bincode::serialize(&message) - .expect("Failed to serialize sync request"); - network.broadcast(addresses, Bytes::from(message)).await; - } - } - timer.as_mut().reset(Instant::now() + Duration::from_millis(TIMER_ACCURACY)); - } - } - } - }); - Self { - store, - inner_channel: tx_inner, - } - } - - async fn waiter(mut store: Store, wait_on: Digest, deliver: Block) -> ConsensusResult { - let _ = store.notify_read(wait_on.to_vec()).await?; - Ok(deliver) - } - - pub async fn get_parent_block(&mut self, block: &Block) -> ConsensusResult> { - if block.qc == QC::genesis() { - return Ok(Some(Block::genesis())); - } - let parent = block.parent(); - match self.store.read(parent.to_vec()).await? { - Some(bytes) => Ok(Some(bincode::deserialize(&bytes)?)), - None => { - if let Err(e) = self.inner_channel.send(block.clone()).await { - panic!("Failed to send request to synchronizer: {}", e); - } - Ok(None) - } - } - } - - pub async fn get_ancestors( - &mut self, - block: &Block, - ) -> ConsensusResult> { - let b1 = match self.get_parent_block(block).await? { - Some(b) => b, - None => return Ok(None), - }; - let b0 = self - .get_parent_block(&b1) - .await? - .expect("We should have all ancestors of delivered blocks"); - Ok(Some((b0, b1))) - } -} diff --git a/hotstuff/src/tests/aggregator_tests.rs b/hotstuff/src/tests/aggregator_tests.rs deleted file mode 100644 index 4b5ce644..00000000 --- a/hotstuff/src/tests/aggregator_tests.rs +++ /dev/null @@ -1,56 +0,0 @@ -use super::*; -use crate::common::{committee, keys, qc, vote}; - -#[test] -fn add_vote() { - let mut aggregator = Aggregator::new(committee()); - let result = aggregator.add_vote(vote()); - assert!(result.is_ok()); - assert!(result.unwrap().is_none()); -} - -#[test] -fn make_qc() { - let mut aggregator = Aggregator::new(committee()); - let mut keys = keys(); - let qc = qc(); - let hash = qc.digest(); - let round = qc.round; - - // Add 2f+1 votes to the aggregator and ensure it returns the cryptographic - // material to make a valid QC. - let (public_key, secret_key) = keys.pop().unwrap(); - let vote = Vote::new_from_key(hash.clone(), round, public_key, &secret_key); - let result = aggregator.add_vote(vote); - assert!(result.is_ok()); - assert!(result.unwrap().is_none()); - - let (public_key, secret_key) = keys.pop().unwrap(); - let vote = Vote::new_from_key(hash.clone(), round, public_key, &secret_key); - let result = aggregator.add_vote(vote); - assert!(result.is_ok()); - assert!(result.unwrap().is_none()); - - let (public_key, secret_key) = keys.pop().unwrap(); - let vote = Vote::new_from_key(hash.clone(), round, public_key, &secret_key); - match aggregator.add_vote(vote) { - Ok(Some(qc)) => assert!(qc.verify(&committee()).is_ok()), - _ => assert!(false), - } -} - -#[test] -fn cleanup() { - let mut aggregator = Aggregator::new(committee()); - - // Add a vote and ensure it is in the aggregator memory. - let result = aggregator.add_vote(vote()); - assert!(result.is_ok()); - assert_eq!(aggregator.votes_aggregators.len(), 1); - assert!(aggregator.timeouts_aggregators.is_empty()); - - // Clean up the aggregator. - aggregator.cleanup(&2); - assert!(aggregator.votes_aggregators.is_empty()); - assert!(aggregator.timeouts_aggregators.is_empty()); -} diff --git a/hotstuff/src/tests/common.rs b/hotstuff/src/tests/common.rs deleted file mode 100644 index 240a2928..00000000 --- a/hotstuff/src/tests/common.rs +++ /dev/null @@ -1,198 +0,0 @@ -use crate::config::Committee; -use crate::consensus::Round; -use crate::messages::{Block, Timeout, Vote, QC}; -use bytes::Bytes; -use crypto::Hash as _; -use crypto::{generate_keypair, Digest, PublicKey, SecretKey, Signature}; -use futures::sink::SinkExt as _; -use futures::stream::StreamExt as _; -use rand::rngs::StdRng; -use rand::SeedableRng as _; -use std::net::SocketAddr; -use tokio::net::TcpListener; -use tokio::task::JoinHandle; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; - -// Fixture. -pub fn keys() -> Vec<(PublicKey, SecretKey)> { - let mut rng = StdRng::from_seed([0; 32]); - (0..4).map(|_| generate_keypair(&mut rng)).collect() -} - -// Fixture. -pub fn committee() -> Committee { - Committee::new( - keys() - .into_iter() - .enumerate() - .map(|(i, (name, _))| { - let address = format!("127.0.0.1:{}", i).parse().unwrap(); - let stake = 1; - (name, stake, address) - }) - .collect(), - /* epoch */ 100, - ) -} - -// Fixture. -pub fn committee_with_base_port(base_port: u16) -> Committee { - let mut committee = committee(); - for authority in committee.authorities.values_mut() { - let port = authority.address.port(); - authority.address.set_port(base_port + port); - } - committee -} - -impl Block { - pub fn new_from_key( - qc: QC, - author: PublicKey, - round: Round, - payload: Vec, - secret: &SecretKey, - ) -> Self { - let block = Block { - qc, - tc: None, - author, - round, - payload, - signature: Signature::default(), - }; - let signature = Signature::new(&block.digest(), secret); - Self { signature, ..block } - } -} - -impl PartialEq for Block { - fn eq(&self, other: &Self) -> bool { - self.digest() == other.digest() - } -} - -impl Vote { - pub fn new_from_key(hash: Digest, round: Round, author: PublicKey, secret: &SecretKey) -> Self { - let vote = Self { - hash, - round, - author, - signature: Signature::default(), - }; - let signature = Signature::new(&vote.digest(), &secret); - Self { signature, ..vote } - } -} - -impl PartialEq for Vote { - fn eq(&self, other: &Self) -> bool { - self.digest() == other.digest() - } -} - -impl Timeout { - pub fn new_from_key(high_qc: QC, round: Round, author: PublicKey, secret: &SecretKey) -> Self { - let timeout = Self { - high_qc, - round, - author, - signature: Signature::default(), - }; - let signature = Signature::new(&timeout.digest(), &secret); - Self { - signature, - ..timeout - } - } -} - -impl PartialEq for Timeout { - fn eq(&self, other: &Self) -> bool { - self.digest() == other.digest() - } -} - -// Fixture. -pub fn block() -> Block { - let (public_key, secret_key) = keys().pop().unwrap(); - Block::new_from_key(QC::genesis(), public_key, 1, Vec::new(), &secret_key) -} - -// Fixture. -pub fn vote() -> Vote { - let (public_key, secret_key) = keys().pop().unwrap(); - Vote::new_from_key(block().digest(), 1, public_key, &secret_key) -} - -// Fixture. -pub fn qc() -> QC { - let qc = QC { - hash: Digest::default(), - round: 1, - votes: Vec::new(), - }; - let digest = qc.digest(); - let mut keys = keys(); - let votes: Vec<_> = (0..3) - .map(|_| { - let (public_key, secret_key) = keys.pop().unwrap(); - (public_key, Signature::new(&digest, &secret_key)) - }) - .collect(); - QC { votes, ..qc } -} - -// Fixture. -pub fn chain(keys: Vec<(PublicKey, SecretKey)>) -> Vec { - let mut latest_qc = QC::genesis(); - keys.iter() - .enumerate() - .map(|(i, key)| { - // Make a block. - let (public_key, secret_key) = key; - let block = Block::new_from_key( - latest_qc.clone(), - *public_key, - 1 + i as Round, - Vec::new(), - secret_key, - ); - - // Make a qc for that block (it will be used for the next block). - let qc = QC { - hash: block.digest(), - round: block.round, - votes: Vec::new(), - }; - let digest = qc.digest(); - let votes: Vec<_> = keys - .iter() - .map(|(public_key, secret_key)| (*public_key, Signature::new(&digest, secret_key))) - .collect(); - latest_qc = QC { votes, ..qc }; - - // Return the block. - block - }) - .collect() -} - -// Fixture -pub fn listener(address: SocketAddr, expected: Option) -> JoinHandle<()> { - tokio::spawn(async move { - let listener = TcpListener::bind(&address).await.unwrap(); - let (socket, _) = listener.accept().await.unwrap(); - let transport = Framed::new(socket, LengthDelimitedCodec::new()); - let (mut writer, mut reader) = transport.split(); - match reader.next().await { - Some(Ok(received)) => { - writer.send(Bytes::from("Ack")).await.unwrap(); - if let Some(expected) = expected { - assert_eq!(received.freeze(), expected); - } - } - _ => panic!("Failed to receive network message"), - } - }) -} diff --git a/hotstuff/src/tests/consensus_tests.rs b/hotstuff/src/tests/consensus_tests.rs deleted file mode 100644 index 1e60ec75..00000000 --- a/hotstuff/src/tests/consensus_tests.rs +++ /dev/null @@ -1,68 +0,0 @@ -use super::*; -use crate::common::{committee_with_base_port, keys}; -use crate::config::Parameters; -use crypto::SecretKey; -use futures::future::try_join_all; -use std::fs; -use tokio::sync::mpsc::channel; -use tokio::task::JoinHandle; - -fn spawn_nodes( - keys: Vec<(PublicKey, SecretKey)>, - committee: Committee, - store_path: &str, -) -> Vec> { - keys.into_iter() - .enumerate() - .map(|(i, (name, secret))| { - let committee = committee.clone(); - let parameters = Parameters { - timeout_delay: 100, - ..Parameters::default() - }; - let store_path = format!("{}_{}", store_path, i); - let _ = fs::remove_dir_all(&store_path); - let store = Store::new(&store_path).unwrap(); - let signature_service = SignatureService::new(secret); - let (tx_consensus_to_mempool, mut rx_consensus_to_mempool) = channel(10); - let (_tx_mempool_to_consensus, rx_mempool_to_consensus) = channel(1); - let (tx_commit, mut rx_commit) = channel(1); - - // Sink the mempool channel. - tokio::spawn(async move { - loop { - rx_consensus_to_mempool.recv().await; - } - }); - - // Spawn the consensus engine. - tokio::spawn(async move { - Consensus::spawn( - name, - committee, - parameters, - signature_service, - store, - rx_mempool_to_consensus, - tx_consensus_to_mempool, - tx_commit, - ); - - rx_commit.recv().await.unwrap() - }) - }) - .collect() -} - -#[tokio::test] -async fn end_to_end() { - let committee = committee_with_base_port(15_000); - - // Run all nodes. - let store_path = ".db_test_end_to_end"; - let handles = spawn_nodes(keys(), committee, store_path); - - // Ensure all threads terminated correctly. - let blocks = try_join_all(handles).await.unwrap(); - assert!(blocks.windows(2).all(|w| w[0] == w[1])); -} diff --git a/hotstuff/src/tests/core_tests.rs b/hotstuff/src/tests/core_tests.rs deleted file mode 100644 index d32d2d59..00000000 --- a/hotstuff/src/tests/core_tests.rs +++ /dev/null @@ -1,186 +0,0 @@ -use super::*; -use crate::common::{chain, committee, committee_with_base_port, keys, listener}; -use crypto::SecretKey; -use futures::future::try_join_all; -use std::fs; -use tokio::sync::mpsc::channel; - -fn core( - name: PublicKey, - secret: SecretKey, - committee: Committee, - store_path: &str, -) -> ( - Sender, - Receiver, - Receiver, -) { - let (tx_core, rx_core) = channel(1); - let (tx_loopback, rx_loopback) = channel(1); - let (tx_proposer, rx_proposer) = channel(1); - let (tx_mempool, mut rx_mempool) = channel(1); - let (tx_commit, rx_commit) = channel(1); - - let signature_service = SignatureService::new(secret); - let _ = fs::remove_dir_all(store_path); - let store = Store::new(store_path).unwrap(); - let leader_elector = LeaderElector::new(committee.clone()); - let mempool_driver = MempoolDriver::new(store.clone(), tx_mempool, tx_loopback.clone()); - let synchronizer = Synchronizer::new( - name, - committee.clone(), - store.clone(), - tx_loopback, - /* sync_retry_delay */ 100_000, - ); - - tokio::spawn(async move { - loop { - rx_mempool.recv().await; - } - }); - - Core::spawn( - name, - committee, - signature_service, - store, - leader_elector, - mempool_driver, - synchronizer, - /* timeout_delay */ 100, - /* rx_message */ rx_core, - rx_loopback, - tx_proposer, - tx_commit, - ); - - (tx_core, rx_proposer, rx_commit) -} - -fn leader_keys(round: Round) -> (PublicKey, SecretKey) { - let leader_elector = LeaderElector::new(committee()); - let leader = leader_elector.get_leader(round); - keys() - .into_iter() - .find(|(public_key, _)| *public_key == leader) - .unwrap() -} - -#[tokio::test] -async fn handle_proposal() { - let committee = committee_with_base_port(16_000); - - // Make a block and the vote we expect to receive. - let block = chain(vec![leader_keys(1)]).pop().unwrap(); - let (public_key, secret_key) = keys().pop().unwrap(); - let vote = Vote::new_from_key(block.digest(), block.round, public_key, &secret_key); - let expected = bincode::serialize(&ConsensusMessage::Vote(vote)).unwrap(); - - // Run a core instance. - let store_path = ".db_test_handle_proposal"; - let (tx_core, _rx_proposer, _rx_commit) = - core(public_key, secret_key, committee.clone(), store_path); - - // Send a block to the core. - let message = ConsensusMessage::Propose(block.clone()); - tx_core.send(message).await.unwrap(); - - // Ensure the next leaders gets the vote. - let (next_leader, _) = leader_keys(2); - let address = committee.address(&next_leader).unwrap(); - let handle = listener(address, Some(Bytes::from(expected))); - assert!(handle.await.is_ok()); -} - -#[tokio::test] -async fn generate_proposal() { - // Get the keys of the leaders of this round and the next. - let (leader, leader_key) = leader_keys(1); - let (next_leader, next_leader_key) = leader_keys(2); - - // Make a block, votes, and QC. - let block = Block::new_from_key(QC::genesis(), leader, 1, Vec::new(), &leader_key); - let hash = block.digest(); - let votes: Vec<_> = keys() - .iter() - .map(|(public_key, secret_key)| { - Vote::new_from_key(hash.clone(), block.round, *public_key, &secret_key) - }) - .collect(); - let hight_qc = QC { - hash, - round: block.round, - votes: votes - .iter() - .cloned() - .map(|x| (x.author, x.signature)) - .collect(), - }; - - // Run a core instance. - let store_path = ".db_test_generate_proposal"; - let (tx_core, mut rx_proposer, _rx_commit) = - core(next_leader, next_leader_key, committee(), store_path); - - // Send all votes to the core. - for vote in votes.clone() { - let message = ConsensusMessage::Vote(vote); - tx_core.send(message).await.unwrap(); - } - - // Ensure the core sends a new block. - let ProposerMessage(round, qc, tc) = rx_proposer.recv().await.unwrap(); - assert_eq!(round, 2); - assert_eq!(qc, hight_qc); - assert!(tc.is_none()); -} - -#[tokio::test] -async fn commit_block() { - // Get enough distinct leaders to form a quorum. - let leaders = vec![leader_keys(1), leader_keys(2), leader_keys(3)]; - let chain = chain(leaders); - - // Run a core instance. - let store_path = ".db_test_commit_block"; - let (public_key, secret_key) = keys().pop().unwrap(); - let (tx_core, _rx_proposer, mut rx_commit) = - core(public_key, secret_key, committee(), store_path); - - // Send a the blocks to the core. - let committed = chain[0].clone(); - for block in chain { - let message = ConsensusMessage::Propose(block); - tx_core.send(message).await.unwrap(); - } - - // Ensure the core commits the head. - match rx_commit.recv().await { - Some(b) => assert_eq!(b, committed), - _ => assert!(false), - } -} - -#[tokio::test] -async fn local_timeout_round() { - let committee = committee_with_base_port(16_100); - - // Make the timeout vote we expect to send. - let (public_key, secret_key) = leader_keys(3); - let timeout = Timeout::new_from_key(QC::genesis(), 1, public_key, &secret_key); - let expected = bincode::serialize(&ConsensusMessage::Timeout(timeout)).unwrap(); - - // Run a core instance. - let store_path = ".db_test_local_timeout_round"; - let (_tx_core, _rx_proposer, _rx_commit) = - core(public_key, secret_key, committee.clone(), store_path); - - // Ensure the node broadcasts a timeout vote. - let handles: Vec<_> = committee - .broadcast_addresses(&public_key) - .into_iter() - .map(|address| listener(address, Some(Bytes::from(expected.clone())))) - .collect(); - assert!(try_join_all(handles).await.is_ok()); -} diff --git a/hotstuff/src/tests/helper_tests.rs b/hotstuff/src/tests/helper_tests.rs deleted file mode 100644 index c2acb761..00000000 --- a/hotstuff/src/tests/helper_tests.rs +++ /dev/null @@ -1,37 +0,0 @@ -use super::*; -use crate::common::{block, committee_with_base_port, keys, listener}; -use crypto::Hash as _; -use std::fs; -use tokio::sync::mpsc::channel; - -#[tokio::test] -async fn sync_reply() { - let (tx_request, rx_request) = channel(1); - let (requestor, _) = keys().pop().unwrap(); - let committee = committee_with_base_port(13_000); - - // Create a new test store. - let path = ".db_test_sync_reply"; - let _ = fs::remove_dir_all(path); - let mut store = Store::new(path).unwrap(); - - // Add a batch to the store. - let digest = block().digest(); - let serialized = bincode::serialize(&block()).unwrap(); - store.write(digest.to_vec(), serialized.clone()).await; - - // Spawn an `Helper` instance. - Helper::spawn(committee.clone(), store, rx_request); - - // Spawn a listener to receive the sync reply. - let address = committee.address(&requestor).unwrap(); - let message = ConsensusMessage::Propose(block()); - let expected = Bytes::from(bincode::serialize(&message).unwrap()); - let handle = listener(address, Some(expected)); - - // Send a sync request. - tx_request.send((digest, requestor)).await.unwrap(); - - // Ensure the requestor received the batch (ie. it did not panic). - assert!(handle.await.is_ok()); -} diff --git a/hotstuff/src/tests/messages_tests.rs b/hotstuff/src/tests/messages_tests.rs deleted file mode 100644 index 90a3ae07..00000000 --- a/hotstuff/src/tests/messages_tests.rs +++ /dev/null @@ -1,55 +0,0 @@ -use super::*; -use crate::common::{committee, qc}; -use crypto::generate_keypair; -use rand::rngs::StdRng; -use rand::SeedableRng as _; - -#[test] -fn verify_valid_qc() { - assert!(qc().verify(&committee()).is_ok()); -} - -#[test] -fn verify_qc_authority_reuse() { - // Modify QC to reuse one authority. - let mut qc = qc(); - let _ = qc.votes.pop(); - let vote = qc.votes[0].clone(); - qc.votes.push(vote.clone()); - - // Verify the QC. - match qc.verify(&committee()) { - Err(ConsensusError::AuthorityReuse(name)) => assert_eq!(name, vote.0), - _ => assert!(false), - } -} - -#[test] -fn verify_qc_unknown_authority() { - let mut qc = qc(); - - // Modify QC to add one unknown authority. - let mut rng = StdRng::from_seed([1; 32]); - let (unknown, _) = generate_keypair(&mut rng); - let (_, sig) = qc.votes.pop().unwrap(); - qc.votes.push((unknown, sig)); - - // Verify the QC. - match qc.verify(&committee()) { - Err(ConsensusError::UnknownAuthority(name)) => assert_eq!(name, unknown), - _ => assert!(false), - } -} - -#[test] -fn verify_qc_insufficient_stake() { - // Modify QC to remove one authority. - let mut qc = qc(); - let _ = qc.votes.pop(); - - // Verify the QC. - match qc.verify(&committee()) { - Err(ConsensusError::QCRequiresQuorum) => assert!(true), - _ => assert!(false), - } -} diff --git a/hotstuff/src/tests/synchronizer_tests.rs b/hotstuff/src/tests/synchronizer_tests.rs deleted file mode 100644 index a0d6b3a2..00000000 --- a/hotstuff/src/tests/synchronizer_tests.rs +++ /dev/null @@ -1,110 +0,0 @@ -use super::*; -use crate::common::{block, chain, committee, committee_with_base_port, keys, listener}; -use std::fs; - -#[tokio::test] -async fn get_existing_parent_block() { - let mut chain = chain(keys()); - let block = chain.pop().unwrap(); - let b2 = chain.pop().unwrap(); - - // Add the block b2 to the store. - let path = ".db_test_get_existing_parent_block"; - let _ = fs::remove_dir_all(path); - let mut store = Store::new(path).unwrap(); - let key = b2.digest().to_vec(); - let value = bincode::serialize(&b2).unwrap(); - let _ = store.write(key, value).await; - - // Make a new synchronizer. - let (name, _) = keys().pop().unwrap(); - let (tx_loopback, _) = channel(10); - let mut synchronizer = Synchronizer::new( - name, - committee(), - store, - tx_loopback, - /* sync_retry_delay */ 10_000, - ); - - // Ask the predecessor of 'block' to the synchronizer. - match synchronizer.get_parent_block(&block).await { - Ok(Some(b)) => assert_eq!(b, b2), - _ => assert!(false), - } -} - -#[tokio::test] -async fn get_genesis_parent_block() { - // Make a new synchronizer. - let path = ".db_test_get_genesis_parent_block"; - let _ = fs::remove_dir_all(path); - let store = Store::new(path).unwrap(); - let (name, _) = keys().pop().unwrap(); - let (tx_loopback, _) = channel(1); - let mut synchronizer = Synchronizer::new( - name, - committee(), - store, - tx_loopback, - /* sync_retry_delay */ 10_000, - ); - - // Ask the predecessor of 'block' to the synchronizer. - match synchronizer.get_parent_block(&block()).await { - Ok(Some(b)) => assert_eq!(b, Block::genesis()), - _ => assert!(false), - } -} - -#[tokio::test] -async fn get_missing_parent_block() { - let committee = committee_with_base_port(12_000); - let mut chain = chain(keys()); - let block = chain.pop().unwrap(); - let parent_block = chain.pop().unwrap(); - - // Make a new synchronizer. - let path = ".db_test_get_missing_parent_block"; - let _ = fs::remove_dir_all(path); - let mut store = Store::new(path).unwrap(); - let (name, _) = keys().pop().unwrap(); - let (tx_loopback, mut rx_loopback) = channel(1); - let mut synchronizer = Synchronizer::new( - name, - committee.clone(), - store.clone(), - tx_loopback, - /* sync_retry_delay */ 10_000, - ); - - // Spawn a listener to receive our sync request. - let address = committee.address(&block.author).unwrap(); - let message = ConsensusMessage::SyncRequest(parent_block.digest(), name); - let expected = Bytes::from(bincode::serialize(&message).unwrap()); - let listener_handle = listener(address, Some(expected.clone())); - - // Ask for the parent of a block to the synchronizer. The store does not have the parent yet. - let copy = block.clone(); - let handle = tokio::spawn(async move { - let ret = synchronizer.get_parent_block(©).await; - assert!(ret.is_ok()); - assert!(ret.unwrap().is_none()); - }); - - // Ensure the other listeners correctly received the sync request. - assert!(listener_handle.await.is_ok()); - - // Ensure the synchronizer returns None, thus suspending the processing of the block. - assert!(handle.await.is_ok()); - - // Add the parent to the store. - let key = parent_block.digest().to_vec(); - let value = bincode::serialize(&parent_block).unwrap(); - let _ = store.write(key, value).await; - - // Now that we have the parent, ensure the synchronizer loops back the block to the core - // to resume processing. - let delivered = rx_loopback.recv().await.unwrap(); - assert_eq!(delivered, block.clone()); -} diff --git a/hotstuff/src/tests/timer_tests.rs b/hotstuff/src/tests/timer_tests.rs deleted file mode 100644 index 7c204cb9..00000000 --- a/hotstuff/src/tests/timer_tests.rs +++ /dev/null @@ -1,9 +0,0 @@ -use super::*; - -#[tokio::test] -async fn schedule() { - let timer = Timer::new(100); - let now = Instant::now(); - timer.await; - assert!(now.elapsed().as_millis() > 95); -} diff --git a/hotstuff/src/timer.rs b/hotstuff/src/timer.rs deleted file mode 100644 index c31ddd24..00000000 --- a/hotstuff/src/timer.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::time::{sleep, Duration, Instant, Sleep}; - -#[cfg(test)] -#[path = "tests/timer_tests.rs"] -pub mod timer_tests; - -pub struct Timer { - duration: u64, - sleep: Pin>, -} - -impl Timer { - pub fn new(duration: u64) -> Self { - let sleep = Box::pin(sleep(Duration::from_millis(duration))); - Self { duration, sleep } - } - - pub fn reset(&mut self) { - self.sleep - .as_mut() - .reset(Instant::now() + Duration::from_millis(self.duration)); - } -} - -impl Future for Timer { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - self.sleep.as_mut().poll(cx) - } -} diff --git a/node/Cargo.toml b/node/Cargo.toml index 82f2d68e..3a14aa28 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -21,10 +21,10 @@ store = { path = "../store" } crypto = { path = "../crypto" } primary = { path = "../primary" } worker = { path = "../worker" } -hotstuff = { path = "../hotstuff" } +consensus = { path = "../consensus" } [features] -benchmark = ["worker/benchmark", "primary/benchmark", "hotstuff/benchmark"] +benchmark = ["worker/benchmark", "primary/benchmark", "consensus/benchmark"] [[bin]] name = "benchmark_client" diff --git a/node/src/main.rs b/node/src/main.rs index e271d3f5..e0e21f14 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -4,10 +4,9 @@ use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand}; use config::Export as _; use config::Import as _; use config::{Committee, KeyPair, Parameters, WorkerId}; -use crypto::SignatureService; +use consensus::Consensus; use env_logger::Env; -use hotstuff::{Block, Consensus}; -use primary::Primary; +use primary::{Certificate, Primary}; use store::Store; use tokio::sync::mpsc::{channel, Receiver}; use worker::Worker; @@ -75,7 +74,6 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { // Read the committee and node's keypair from file. let keypair = KeyPair::import(key_file).context("Failed to load the node's keypair")?; - let name = keypair.name; let committee = Committee::import(committee_file).context("Failed to load the committee information")?; @@ -87,9 +85,6 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { None => Parameters::default(), }; - // The `SignatureService` provides signatures on input digests. - let signature_service = SignatureService::new(keypair.secret); - // Make the data store. let store = Store::new(store_path).context("Failed to create a store")?; @@ -103,22 +98,18 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY); let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY); Primary::spawn( - name, + keypair, committee.clone(), parameters.clone(), - signature_service.clone(), - store.clone(), + store, /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, ); Consensus::spawn( - name, committee, - parameters, - signature_service, - store, - /* rx_mempool */ rx_new_certificates, - /* tx_mempool */ tx_feedback, + parameters.gc_depth, + /* rx_primary */ rx_new_certificates, + /* tx_primary */ tx_feedback, tx_output, ); } @@ -143,8 +134,8 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> { } /// Receives an ordered list of certificates and apply any application-specific logic. -async fn analyze(mut rx_output: Receiver) { - while let Some(_block) = rx_output.recv().await { +async fn analyze(mut rx_output: Receiver) { + while let Some(_certificate) = rx_output.recv().await { // NOTE: Here goes the application logic. } } diff --git a/primary/src/core.rs b/primary/src/core.rs index aee76c55..461c1ff5 100644 --- a/primary/src/core.rs +++ b/primary/src/core.rs @@ -306,7 +306,7 @@ impl Core { fn sanitize_header(&mut self, header: &Header) -> DagResult<()> { ensure!( self.gc_round <= header.round, - DagError::HeaderTooOld(header.id.clone(), header.round) + DagError::TooOld(header.id.clone(), header.round) ); // Verify the header's signature. @@ -320,7 +320,7 @@ impl Core { fn sanitize_vote(&mut self, vote: &Vote) -> DagResult<()> { ensure!( self.current_header.round <= vote.round, - DagError::VoteTooOld(vote.digest(), vote.round) + DagError::TooOld(vote.digest(), vote.round) ); // Ensure we receive a vote on the expected header. @@ -338,7 +338,7 @@ impl Core { fn sanitize_certificate(&mut self, certificate: &Certificate) -> DagResult<()> { ensure!( self.gc_round <= certificate.round(), - DagError::CertificateTooOld(certificate.digest(), certificate.round()) + DagError::TooOld(certificate.digest(), certificate.round()) ); // Verify the certificate (and the embedded header). @@ -393,9 +393,7 @@ impl Core { error!("{}", e); panic!("Storage failure: killing node."); } - Err(e @ DagError::HeaderTooOld(..)) => debug!("{}", e), - Err(e @ DagError::VoteTooOld(..)) => debug!("{}", e), - Err(e @ DagError::CertificateTooOld(..)) => debug!("{}", e), + Err(e @ DagError::TooOld(..)) => debug!("{}", e), Err(e) => warn!("{}", e), } @@ -408,7 +406,6 @@ impl Core { self.certificates_aggregators.retain(|k, _| k >= &gc_round); self.cancel_handlers.retain(|k, _| k >= &gc_round); self.gc_round = gc_round; - debug!("GC round moved to {}", self.gc_round); } } } diff --git a/primary/src/error.rs b/primary/src/error.rs index ed2603ef..a8e22826 100644 --- a/primary/src/error.rs +++ b/primary/src/error.rs @@ -54,12 +54,6 @@ pub enum DagError { #[error("Parents of header {0} are not a quorum")] HeaderRequiresQuorum(Digest), - #[error("Header {0} (round {1}) too old")] - HeaderTooOld(Digest, Round), - - #[error("Vote {0} (round {1}) too old")] - VoteTooOld(Digest, Round), - - #[error("Certificate {0} (round {1}) too old")] - CertificateTooOld(Digest, Round), + #[error("Message {0} (round {1}) too old")] + TooOld(Digest, Round), } diff --git a/primary/src/garbage_collector.rs b/primary/src/garbage_collector.rs index f2349487..0c0a0352 100644 --- a/primary/src/garbage_collector.rs +++ b/primary/src/garbage_collector.rs @@ -3,25 +3,19 @@ use crate::messages::Certificate; use crate::primary::PrimaryWorkerMessage; use bytes::Bytes; use config::Committee; -use crypto::Hash as _; use crypto::PublicKey; use network::SimpleSender; use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use store::Store; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::Receiver; /// Receives the highest round reached by consensus and update it for all tasks. pub struct GarbageCollector { - /// The persistent storage. - store: Store, /// The current consensus round (used for cleanup). consensus_round: Arc, /// Receives the ordered certificates from consensus. rx_consensus: Receiver, - /// A loopback channel to the primary's core. - tx_loopback: Sender, /// The network addresses of our workers. addresses: Vec, /// A network sender to notify our workers of cleanup events. @@ -32,10 +26,8 @@ impl GarbageCollector { pub fn spawn( name: &PublicKey, committee: &Committee, - store: Store, consensus_round: Arc, rx_consensus: Receiver, - tx_loopback: Sender, ) { let addresses = committee .our_workers(name) @@ -46,10 +38,8 @@ impl GarbageCollector { tokio::spawn(async move { Self { - store, consensus_round, rx_consensus, - tx_loopback, addresses, network: SimpleSender::new(), } @@ -63,21 +53,6 @@ impl GarbageCollector { while let Some(certificate) = self.rx_consensus.recv().await { // TODO [issue #9]: Re-include batch digests that have not been sequenced into our next block. - // Loop back the certificate from HotStuff in case we haven't seen it. - if self - .store - .read(certificate.digest().to_vec()) - .await - .expect("Failed to read from store") - .is_none() - { - self.tx_loopback - .send(certificate.clone()) - .await - .expect("Failed to loop back certificate to core"); - } - - // Cleanup all the modules. let round = certificate.round(); if round > last_committed_round { last_committed_round = round; diff --git a/primary/src/header_waiter.rs b/primary/src/header_waiter.rs index 8f9f38a5..ec71942d 100644 --- a/primary/src/header_waiter.rs +++ b/primary/src/header_waiter.rs @@ -163,7 +163,7 @@ impl HeaderWaiter { } for (worker_id, digests) in requires_sync { let address = self.committee - .worker(&self.name, &worker_id) + .worker(&author, &worker_id) .expect("Author of valid header is not in the committee") .primary_to_worker; let message = PrimaryWorkerMessage::Synchronize(digests, author); @@ -225,7 +225,6 @@ impl HeaderWaiter { Some(result) = waiting.next() => match result { Ok(Some(header)) => { - debug!("Finished synching {:?}", header); let _ = self.pending.remove(&header.id); for x in header.payload.keys() { let _ = self.batch_requests.remove(x); diff --git a/primary/src/lib.rs b/primary/src/lib.rs index 9918941f..58b6d125 100644 --- a/primary/src/lib.rs +++ b/primary/src/lib.rs @@ -17,6 +17,5 @@ mod synchronizer; #[path = "tests/common.rs"] mod common; -pub use crate::error::DagError; pub use crate::messages::{Certificate, Header}; pub use crate::primary::{Primary, PrimaryWorkerMessage, Round, WorkerPrimaryMessage}; diff --git a/primary/src/messages.rs b/primary/src/messages.rs index 8f58fa28..4d043176 100644 --- a/primary/src/messages.rs +++ b/primary/src/messages.rs @@ -237,16 +237,11 @@ impl fmt::Debug for Certificate { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { write!( f, - "{}: C{}({}, {}, {:?})", + "{}: C{}({}, {})", self.digest(), self.round(), self.origin(), - self.header.id, - self.header - .parents - .iter() - .map(|x| format!("{}", x)) - .collect::>() + self.header.id ) } } diff --git a/primary/src/primary.rs b/primary/src/primary.rs index 8616ac7d..8ba317fe 100644 --- a/primary/src/primary.rs +++ b/primary/src/primary.rs @@ -11,7 +11,7 @@ use crate::proposer::Proposer; use crate::synchronizer::Synchronizer; use async_trait::async_trait; use bytes::Bytes; -use config::{Committee, Parameters, WorkerId}; +use config::{Committee, KeyPair, Parameters, WorkerId}; use crypto::{Digest, PublicKey, SignatureService}; use futures::sink::SinkExt as _; use log::info; @@ -59,10 +59,9 @@ pub struct Primary; impl Primary { pub fn spawn( - name: PublicKey, + keypair: KeyPair, committee: Committee, parameters: Parameters, - signature_service: SignatureService, store: Store, tx_consensus: Sender, rx_consensus: Receiver, @@ -79,9 +78,12 @@ impl Primary { let (tx_cert_requests, rx_cert_requests) = channel(CHANNEL_CAPACITY); // Write the parameters to the logs. - // NOTE: These log entries are needed to compute performance. parameters.log(); + // Parse the public and secret key of this authority. + let name = keypair.name; + let secret = keypair.secret; + // Atomic variable use to synchronizer all tasks with the latest consensus round. This is only // used for cleanup. The only tasks that write into this variable is `GarbageCollector`. let consensus_round = Arc::new(AtomicU64::new(0)); @@ -133,6 +135,9 @@ impl Primary { /* tx_certificate_waiter */ tx_sync_certificates, ); + // The `SignatureService` is used to require signatures on specific digests. + let signature_service = SignatureService::new(secret); + // The `Core` receives and handles headers, votes, and certificates from the other primaries. Core::spawn( name, @@ -151,14 +156,7 @@ impl Primary { ); // Keeps track of the latest consensus round and allows other tasks to clean up their their internal state - GarbageCollector::spawn( - &name, - &committee, - store.clone(), - consensus_round.clone(), - rx_consensus, - tx_certificates_loopback.clone(), - ); + GarbageCollector::spawn(&name, &committee, consensus_round.clone(), rx_consensus); // Receives batch digests from other workers. They are only used to validate headers. PayloadReceiver::spawn(store.clone(), /* rx_workers */ rx_others_digests); diff --git a/primary/src/tests/common.rs b/primary/src/tests/common.rs index 4867c941..408b97c1 100644 --- a/primary/src/tests/common.rs +++ b/primary/src/tests/common.rs @@ -1,7 +1,7 @@ // Copyright(C) Facebook, Inc. and its affiliates. use crate::messages::{Certificate, Header, Vote}; use bytes::Bytes; -use config::{Authority, Committee, ConsensusAddresses, PrimaryAddresses, WorkerAddresses}; +use config::{Authority, Committee, PrimaryAddresses, WorkerAddresses}; use crypto::Hash as _; use crypto::{generate_keypair, PublicKey, SecretKey, Signature}; use futures::sink::SinkExt as _; @@ -38,9 +38,6 @@ pub fn committee() -> Committee { .iter() .enumerate() .map(|(i, (id, _))| { - let consensus = ConsensusAddresses { - consensus_to_consensus: format!("127.0.0.1:{}", 0 + i).parse().unwrap(), - }; let primary = PrimaryAddresses { primary_to_primary: format!("127.0.0.1:{}", 100 + i).parse().unwrap(), worker_to_primary: format!("127.0.0.1:{}", 200 + i).parse().unwrap(), @@ -60,7 +57,6 @@ pub fn committee() -> Committee { *id, Authority { stake: 1, - consensus, primary, workers, },