diff --git a/Cargo.lock b/Cargo.lock index 361897fb..2d76c560 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "base64ct" version = "1.6.0" @@ -302,7 +308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93f2635620bf0b9d4576eb7bb9a38a55df78bd1205d26fa994b25911a69f212f" dependencies = [ "bitcoin_hashes", - "rand_core 0.5.1", + "rand_core 0.6.4", "serde", "unicode-normalization", ] @@ -1902,16 +1908,16 @@ checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" [[package]] name = "pallas" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "pallas-addresses", "pallas-applying", - "pallas-codec 0.23.0", + "pallas-codec 0.24.0", "pallas-configs", - "pallas-crypto 0.23.0", + "pallas-crypto 0.24.0", "pallas-hardano", - "pallas-network 0.23.0", + "pallas-network 0.24.0", "pallas-primitives", "pallas-rolldb", "pallas-traverse", @@ -1922,28 +1928,28 @@ dependencies = [ [[package]] name = "pallas-addresses" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "base58", "bech32 0.9.1", "crc", "hex", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "sha3", "thiserror", ] [[package]] name = "pallas-applying" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "hex", "pallas-addresses", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "pallas-primitives", "pallas-traverse", "rand", @@ -1963,8 +1969,8 @@ dependencies = [ [[package]] name = "pallas-codec" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "hex", "minicbor", @@ -1974,14 +1980,14 @@ dependencies = [ [[package]] name = "pallas-configs" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ - "base64 0.21.7", + "base64 0.22.0", "hex", "pallas-addresses", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "serde", "serde_json", ] @@ -2002,12 +2008,12 @@ dependencies = [ [[package]] name = "pallas-crypto" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "cryptoxide", "hex", - "pallas-codec 0.23.0", + "pallas-codec 0.24.0", "rand_core 0.6.4", "serde", "thiserror", @@ -2015,11 +2021,11 @@ dependencies = [ [[package]] name = "pallas-hardano" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "binary-layout", - "pallas-network 0.23.0", + "pallas-network 0.24.0", "pallas-traverse", "tap", "thiserror", @@ -2046,14 +2052,14 @@ dependencies = [ [[package]] name = "pallas-network" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "byteorder", "hex", "itertools 0.12.1", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "rand", "socket2", "thiserror", @@ -2063,29 +2069,29 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "base58", "bech32 0.9.1", "hex", "log", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "serde", "serde_json", ] [[package]] name = "pallas-rolldb" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "async-stream", "bincode", "futures-core", "futures-util", - "pallas-crypto 0.23.0", + "pallas-crypto 0.24.0", "rocksdb", "serde", "thiserror", @@ -2095,13 +2101,13 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "hex", "pallas-addresses", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "pallas-primitives", "paste", "serde", @@ -2110,13 +2116,13 @@ dependencies = [ [[package]] name = "pallas-txbuilder" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "hex", "pallas-addresses", - "pallas-codec 0.23.0", - "pallas-crypto 0.23.0", + "pallas-codec 0.24.0", + "pallas-crypto 0.24.0", "pallas-primitives", "pallas-traverse", "pallas-wallet", @@ -2127,10 +2133,10 @@ dependencies = [ [[package]] name = "pallas-utxorpc" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ - "pallas-codec 0.23.0", + "pallas-codec 0.24.0", "pallas-primitives", "pallas-traverse", "utxorpc-spec", @@ -2138,14 +2144,14 @@ dependencies = [ [[package]] name = "pallas-wallet" -version = "0.23.0" -source = "git+https://github.com/txpipe/pallas.git#edbf4461880291edb25dbf76997d64dfb1e4ffd4" +version = "0.24.0" +source = "git+https://github.com/txpipe/pallas.git#00ece5d3008c1e808fb700914992853ece37a06b" dependencies = [ "bech32 0.9.1", "bip39", "cryptoxide", "ed25519-bip32", - "pallas-crypto 0.23.0", + "pallas-crypto 0.24.0", "rand", "thiserror", ] diff --git a/src/bin/dolos/common.rs b/src/bin/dolos/common.rs index d0d8924b..04f0b4f0 100644 --- a/src/bin/dolos/common.rs +++ b/src/bin/dolos/common.rs @@ -23,6 +23,7 @@ pub fn open_data_stores(config: &crate::Config) -> Result { let wal = wal::Store::open( rolldb_path.join("wal"), config.rolldb.k_param.unwrap_or(1000), + config.rolldb.immutable_overlap.clone(), ) .map_err(Error::storage)?; diff --git a/src/bin/dolos/main.rs b/src/bin/dolos/main.rs index e3a65902..3aa9afc0 100644 --- a/src/bin/dolos/main.rs +++ b/src/bin/dolos/main.rs @@ -43,6 +43,7 @@ struct Cli { pub struct RolldbConfig { path: Option, k_param: Option, + immutable_overlap: Option, } #[derive(Deserialize)] diff --git a/src/submit/grpc/mod.rs b/src/submit/grpc/mod.rs index 7f3d7dc9..a045b6a2 100644 --- a/src/submit/grpc/mod.rs +++ b/src/submit/grpc/mod.rs @@ -84,6 +84,7 @@ pub fn pipeline( let mut pull = sync::pull::Stage::new( config.peer_addresses[0].clone(), config.peer_magic, + 50, sync::pull::Intersection::Tip, ); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fb562a81..0b528437 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -20,6 +20,7 @@ pub mod roll; pub struct Config { pub peer_address: String, pub network_magic: u64, + pub block_fetch_batch_size: Option, pub network_id: u8, pub phase1_validation_enabled: bool, } @@ -62,6 +63,7 @@ pub fn pipeline( let mut pull = pull::Stage::new( config.peer_address.clone(), config.network_magic, + config.block_fetch_batch_size.unwrap_or(50), pull_cursor, ); @@ -72,7 +74,6 @@ pub fn pipeline( info!(?cursor_ledger, "ledger cursor found"); let mut roll = roll::Stage::new(wal, cursor_chain, cursor_ledger); - let mut chain = chain::Stage::new(chain); let mut ledger = ledger::Stage::new(ledger, byron, shelley, config.phase1_validation_enabled); diff --git a/src/sync/pull.rs b/src/sync/pull.rs index 5805c5a1..1ff1c273 100644 --- a/src/sync/pull.rs +++ b/src/sync/pull.rs @@ -4,9 +4,11 @@ use gasket::framework::*; use tracing::{debug, info}; use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::MultiEraHeader; +use pallas::ledger::traverse::{MultiEraBlock, MultiEraHeader}; use pallas::network::facades::PeerClient; -use pallas::network::miniprotocols::chainsync::{self, HeaderContent, NextResponse}; +use pallas::network::miniprotocols::chainsync::{ + self, HeaderContent, N2NClient, NextResponse, RollbackBuffer, RollbackEffect, Tip, +}; use pallas::network::miniprotocols::Point; use crate::prelude::*; @@ -95,71 +97,56 @@ async fn intersect(peer: &mut PeerClient, intersection: &Intersection) -> Result Ok(()) } +enum PullBatch { + BlockRange(Point, Point), + OutOfScopeRollback(Point), + Empty, +} + +pub enum WorkUnit { + Pull, + Await, +} + pub struct Worker { peer_session: PeerClient, } impl Worker { - async fn process_next( - &mut self, - stage: &mut Stage, - next: &NextResponse, - ) -> Result<(), WorkerError> { - match next { - NextResponse::RollForward(header, tip) => { - let header = to_traverse(header).or_panic()?; - let slot = header.slot(); - let hash = header.hash(); - - debug!(slot, %hash, "chain sync roll forward"); - - let block = self - .peer_session - .blockfetch() - .fetch_single(Point::Specific(slot, hash.to_vec())) - .await - .or_restart()?; - - stage - .downstream - .send(PullEvent::RollForward(slot, hash, block).into()) - .await - .or_panic()?; - - stage.intersection.add_breadcrumb(slot, hash.as_ref()); - - stage.chain_tip.set(tip.0.slot_or_default() as i64); + async fn define_pull_batch(&mut self, stage: &mut Stage) -> Result { + let client = self.peer_session.chainsync(); + let mut buffer = RollbackBuffer::new(); - Ok(()) - } - chainsync::NextResponse::RollBackward(point, tip) => { - match &point { - Point::Origin => debug!("rollback to origin"), - Point::Specific(slot, _) => debug!(slot, "rollback"), - }; + while buffer.size() < PULL_BATCH_SIZE { + let next = client.request_next().await.or_restart()?; - stage - .downstream - .send(PullEvent::Rollback(point.clone()).into()) - .await - .or_panic()?; + match next { + NextResponse::RollForward(header, tip) => { + let header = to_traverse(&header).or_panic()?; + let point = Point::Specific(header.slot(), header.hash().to_vec()); + buffer.roll_forward(point); - if let Point::Specific(slot, hash) = &point { - stage.intersection.add_breadcrumb(*slot, hash); + stage.track_tip(&tip); } - - stage.chain_tip.set(tip.0.slot_or_default() as i64); - - Ok(()) - } - chainsync::NextResponse::Await => { - info!("chain-sync reached the tip of the chain"); - Ok(()) + NextResponse::RollBackward(point, _) => match buffer.roll_back(&point) { + RollbackEffect::OutOfScope => return Ok(PullBatch::OutOfScopeRollback(point)), + RollbackEffect::Handled => (), + }, + NextResponse::Await => break, } } + + let range = match (buffer.oldest(), buffer.latest()) { + (Some(a), Some(b)) => PullBatch::BlockRange(a.clone(), b.clone()), + _ => PullBatch::Empty, + }; + + Ok(range) } } +const PULL_BATCH_SIZE: usize = 200; + #[async_trait::async_trait(?Send)] impl gasket::framework::Worker for Worker { async fn bootstrap(stage: &Stage) -> Result { @@ -185,37 +172,82 @@ impl gasket::framework::Worker for Worker { async fn schedule( &mut self, _stage: &mut Stage, - ) -> Result>, WorkerError> { + ) -> Result, WorkerError> { let client = self.peer_session.chainsync(); - let next = match client.has_agency() { - true => { - debug!("requesting next block"); - client.request_next().await.or_restart()? - } - false => { - debug!("awaiting next block (blocking)"); - client.recv_while_must_reply().await.or_restart()? + if client.has_agency() { + debug!("should request next batch of blocks"); + Ok(WorkSchedule::Unit(WorkUnit::Pull)) + } else { + debug!("should await next block"); + Ok(WorkSchedule::Unit(WorkUnit::Await)) + } + } + + async fn execute(&mut self, unit: &WorkUnit, stage: &mut Stage) -> Result<(), WorkerError> { + match unit { + WorkUnit::Pull => { + let batch = self.define_pull_batch(stage).await?; + + match batch { + PullBatch::BlockRange(start, end) => { + let blocks = self + .peer_session + .blockfetch() + .fetch_range((start, end)) + .await + .or_restart()?; + + stage.flush_blocks(blocks).await?; + } + PullBatch::OutOfScopeRollback(point) => { + stage.flush_rollback(point).await?; + } + PullBatch::Empty => (), + }; } - }; + WorkUnit::Await => { + let next = self + .peer_session + .chainsync() + .recv_while_must_reply() + .await + .or_restart()?; - Ok(WorkSchedule::Unit(next)) - } + match next { + NextResponse::RollForward(header, tip) => { + let header = to_traverse(&header).or_panic()?; + let point = Point::Specific(header.slot(), header.hash().to_vec()); + + let block = self + .peer_session + .blockfetch() + .fetch_single(point) + .await + .or_restart()?; + + stage.flush_blocks(vec![block]).await?; + stage.track_tip(&tip); + } + NextResponse::RollBackward(point, tip) => { + stage.flush_rollback(point).await?; + stage.track_tip(&tip); + } + NextResponse::Await => (), + } + } + } - async fn execute( - &mut self, - unit: &NextResponse, - stage: &mut Stage, - ) -> Result<(), WorkerError> { - self.process_next(stage, unit).await + Ok(()) } } #[derive(Stage)] -#[stage(name = "pull", unit = "NextResponse", worker = "Worker")] +#[stage(name = "pull", unit = "WorkUnit", worker = "Worker")] pub struct Stage { peer_address: String, network_magic: u64, + block_fetch_batch_size: usize, intersection: Intersection, pub downstream: DownstreamPort, @@ -228,14 +260,60 @@ pub struct Stage { } impl Stage { - pub fn new(peer_address: String, network_magic: u64, intersection: Intersection) -> Self { + pub fn new( + peer_address: String, + network_magic: u64, + block_fetch_batch_size: usize, + intersection: Intersection, + ) -> Self { Self { peer_address, network_magic, intersection, + block_fetch_batch_size, downstream: Default::default(), block_count: Default::default(), chain_tip: Default::default(), } } + + async fn flush_blocks(&mut self, blocks: Vec) -> Result<(), WorkerError> { + for cbor in blocks { + // TODO: can we avoid decoding in this stage? + let block = MultiEraBlock::decode(&cbor).or_panic()?; + let slot = block.slot(); + let hash = block.hash(); + + self.downstream + .send(PullEvent::RollForward(slot, hash, cbor).into()) + .await + .or_panic()?; + + self.intersection.add_breadcrumb(slot, hash.as_ref()); + } + + Ok(()) + } + + async fn flush_rollback(&mut self, point: Point) -> Result<(), WorkerError> { + match &point { + Point::Origin => debug!("rollback to origin"), + Point::Specific(slot, _) => debug!(slot, "rollback"), + }; + + self.downstream + .send(PullEvent::Rollback(point.clone()).into()) + .await + .or_panic()?; + + if let Point::Specific(slot, hash) = &point { + self.intersection.add_breadcrumb(*slot, hash); + } + + Ok(()) + } + + fn track_tip(&self, tip: &Tip) { + self.chain_tip.set(tip.0.slot_or_default() as i64); + } } diff --git a/src/tests/submit/mod.rs b/src/tests/submit/mod.rs index 46c40800..7654528b 100644 --- a/src/tests/submit/mod.rs +++ b/src/tests/submit/mod.rs @@ -27,7 +27,7 @@ mod tests { let config: crate::submit::Config = s.build().unwrap().try_deserialize().unwrap(); - let wal = pallas::storage::rolldb::wal::Store::open("tmp", 10000).unwrap(); + let wal = pallas::storage::rolldb::wal::Store::open("tmp", 10000, None).unwrap(); let daemon = crate::submit::grpc::pipeline(config.grpc, wal, true).unwrap(); diff --git a/src/tests/upstream.rs b/src/tests/upstream.rs index 52676980..a3ae982e 100644 --- a/src/tests/upstream.rs +++ b/src/tests/upstream.rs @@ -50,7 +50,7 @@ fn test_mainnet_upstream() { ) .unwrap(); - let rolldb = pallas::storage::rolldb::wal::Store::open("tmp", 10).unwrap(); + let rolldb = pallas::storage::rolldb::wal::Store::open("tmp", 10, None).unwrap(); let intersection = rolldb.intersect_options(5).unwrap().into_iter().collect(); @@ -59,6 +59,7 @@ fn test_mainnet_upstream() { let mut upstream = crate::sync::pull::Stage::new( "relays-new.cardano-mainnet.iohk.io:3001".into(), 764824073, + 20, intersection, );