From 2954896b38e26b4b9729a6cda6c0a268666153f7 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 19 Oct 2023 09:22:01 -0300 Subject: [PATCH] fix: avoid stage timeout when waiting for blocks (#119) --- src/bin/dolos/daemon.rs | 20 ++++++++++++-------- src/bin/dolos/main.rs | 23 ++--------------------- src/bin/dolos/sync.rs | 21 +++++++++++++-------- src/sync/mod.rs | 26 +++++++++++++++++++++++++- 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index f2605a00..73007d68 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -4,11 +4,7 @@ use miette::{Context, IntoDiagnostic}; pub struct Args {} #[tokio::main] -pub async fn run( - config: super::Config, - policy: &gasket::runtime::Policy, - _args: &Args, -) -> miette::Result<()> { +pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; let (wal, chain, ledger) = crate::common::open_data_stores(&config)?; @@ -23,9 +19,17 @@ pub async fn run( chain.clone(), )); - dolos::sync::pipeline(&config.upstream, wal, chain, ledger, byron_genesis, policy) - .unwrap() - .block(); + dolos::sync::pipeline( + &config.upstream, + wal, + chain, + ledger, + byron_genesis, + &config.retries, + ) + .into_diagnostic() + .context("bootstrapping sync pipeline")? + .block(); server.abort(); diff --git a/src/bin/dolos/main.rs b/src/bin/dolos/main.rs index 01a8003e..89739c76 100644 --- a/src/bin/dolos/main.rs +++ b/src/bin/dolos/main.rs @@ -89,34 +89,15 @@ impl Config { } } -fn define_gasket_policy(config: Option<&gasket::retries::Policy>) -> gasket::runtime::Policy { - let default_policy = gasket::retries::Policy { - max_retries: 20, - backoff_unit: Duration::from_secs(1), - backoff_factor: 2, - max_backoff: Duration::from_secs(60), - dismissible: false, - }; - - gasket::runtime::Policy { - tick_timeout: std::time::Duration::from_secs(120).into(), - bootstrap_retry: config.cloned().unwrap_or(default_policy.clone()), - work_retry: config.cloned().unwrap_or(default_policy.clone()), - teardown_retry: config.cloned().unwrap_or(default_policy.clone()), - } -} - fn main() -> Result<()> { let args = Cli::parse(); let config = Config::new(&args.config) .into_diagnostic() .context("parsing configuration")?; - let retries = define_gasket_policy(config.retries.as_ref()); - match args.command { - Command::Daemon(x) => daemon::run(config, &retries, &x)?, - Command::Sync(x) => sync::run(&config, &retries, &x)?, + Command::Daemon(x) => daemon::run(config, &x)?, + Command::Sync(x) => sync::run(&config, &x)?, Command::Data(x) => data::run(&config, &x)?, Command::Serve(x) => serve::run(config, &x)?, }; diff --git a/src/bin/dolos/sync.rs b/src/bin/dolos/sync.rs index ba636041..a7c98d81 100644 --- a/src/bin/dolos/sync.rs +++ b/src/bin/dolos/sync.rs @@ -1,13 +1,10 @@ use dolos::prelude::*; +use miette::{Context, IntoDiagnostic}; #[derive(Debug, clap::Args)] pub struct Args {} -pub fn run( - config: &super::Config, - policy: &gasket::runtime::Policy, - _args: &Args, -) -> miette::Result<()> { +pub fn run(config: &super::Config, _args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; let (wal, chain, ledger) = crate::common::open_data_stores(config)?; @@ -15,9 +12,17 @@ pub fn run( let byron_genesis = pallas::ledger::configs::byron::from_file(&config.byron.path).map_err(Error::config)?; - dolos::sync::pipeline(&config.upstream, wal, chain, ledger, byron_genesis, policy) - .unwrap() - .block(); + dolos::sync::pipeline( + &config.upstream, + wal, + chain, + ledger, + byron_genesis, + &config.retries, + ) + .into_diagnostic() + .context("bootstrapping sync pipeline")? + .block(); Ok(()) } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 72f5b8a3..255f0518 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use gasket::messaging::{RecvPort, SendPort}; use pallas::ledger::configs::byron::GenesisFile; use pallas::storage::rolldb::chain::Store as ChainStore; @@ -19,13 +21,33 @@ pub struct Config { network_magic: u64, } +fn define_gasket_policy(config: &Option) -> gasket::runtime::Policy { + let default_retries = gasket::retries::Policy { + max_retries: 20, + backoff_unit: Duration::from_secs(1), + backoff_factor: 2, + max_backoff: Duration::from_secs(60), + dismissible: false, + }; + + let retries = config.clone().unwrap_or(default_retries); + + gasket::runtime::Policy { + //be generous with tick timeout to avoid timeout during block awaits + tick_timeout: std::time::Duration::from_secs(600).into(), + bootstrap_retry: retries.clone(), + work_retry: retries.clone(), + teardown_retry: retries.clone(), + } +} + pub fn pipeline( config: &Config, wal: WalStore, chain: ChainStore, ledger: ApplyDB, genesis: GenesisFile, - policy: &gasket::runtime::Policy, + retries: &Option, ) -> Result { let pull_cursor = wal .intersect_options(5) @@ -65,6 +87,8 @@ pub fn pipeline( // output to outside of out pipeline // apply.downstream.connect(output); + let policy = define_gasket_policy(retries); + let pull = gasket::runtime::spawn_stage(pull, policy.clone()); let roll = gasket::runtime::spawn_stage(roll, policy.clone()); let chain = gasket::runtime::spawn_stage(chain, policy.clone());