Skip to content

Commit

Permalink
fix: avoid stage timeout when waiting for blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Oct 19, 2023
1 parent 3d22d50 commit 8c56e5e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 38 deletions.
20 changes: 12 additions & 8 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use miette::{Context, IntoDiagnostic};
pub struct Args {}

#[tokio::main]
pub async fn run(
config: super::Config,
policy: &gasket::runtime::Policy,
_args: &Args,
) -> miette::Result<()> {
pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, chain, ledger) = crate::common::open_data_stores(&config)?;
Expand All @@ -23,9 +19,17 @@ pub async fn run(
chain.clone(),
));

dolos::sync::pipeline(&config.upstream, wal, chain, ledger, byron_genesis, policy)
.unwrap()
.block();
dolos::sync::pipeline(
&config.upstream,
wal,
chain,
ledger,
byron_genesis,
&config.retries,
)
.into_diagnostic()
.context("bootstrapping sync pipeline")?
.block();

server.abort();

Expand Down
23 changes: 2 additions & 21 deletions src/bin/dolos/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,15 @@ impl Config {
}
}

fn define_gasket_policy(config: Option<&gasket::retries::Policy>) -> gasket::runtime::Policy {
let default_policy = gasket::retries::Policy {
max_retries: 20,
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: Duration::from_secs(60),
dismissible: false,
};

gasket::runtime::Policy {
tick_timeout: std::time::Duration::from_secs(120).into(),
bootstrap_retry: config.cloned().unwrap_or(default_policy.clone()),
work_retry: config.cloned().unwrap_or(default_policy.clone()),
teardown_retry: config.cloned().unwrap_or(default_policy.clone()),
}
}

fn main() -> Result<()> {
let args = Cli::parse();
let config = Config::new(&args.config)
.into_diagnostic()
.context("parsing configuration")?;

let retries = define_gasket_policy(config.retries.as_ref());

match args.command {
Command::Daemon(x) => daemon::run(config, &retries, &x)?,
Command::Sync(x) => sync::run(&config, &retries, &x)?,
Command::Daemon(x) => daemon::run(config, &x)?,
Command::Sync(x) => sync::run(&config, &x)?,
Command::Data(x) => data::run(&config, &x)?,
Command::Serve(x) => serve::run(config, &x)?,
};
Expand Down
21 changes: 13 additions & 8 deletions src/bin/dolos/sync.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
use dolos::prelude::*;
use miette::{Context, IntoDiagnostic};

#[derive(Debug, clap::Args)]
pub struct Args {}

pub fn run(
config: &super::Config,
policy: &gasket::runtime::Policy,
_args: &Args,
) -> miette::Result<()> {
pub fn run(config: &super::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, chain, ledger) = crate::common::open_data_stores(config)?;

let byron_genesis =
pallas::ledger::configs::byron::from_file(&config.byron.path).map_err(Error::config)?;

dolos::sync::pipeline(&config.upstream, wal, chain, ledger, byron_genesis, policy)
.unwrap()
.block();
dolos::sync::pipeline(
&config.upstream,
wal,
chain,
ledger,
byron_genesis,
&config.retries,
)
.into_diagnostic()
.context("bootstrapping sync pipeline")?
.block();

Ok(())
}
26 changes: 25 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use gasket::messaging::{RecvPort, SendPort};
use pallas::ledger::configs::byron::GenesisFile;
use pallas::storage::rolldb::chain::Store as ChainStore;
Expand All @@ -19,13 +21,33 @@ pub struct Config {
network_magic: u64,
}

fn define_gasket_policy(config: &Option<gasket::retries::Policy>) -> gasket::runtime::Policy {
let default_retries = gasket::retries::Policy {
max_retries: 20,
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: Duration::from_secs(60),
dismissible: false,
};

let retries = config.clone().unwrap_or(default_retries);

gasket::runtime::Policy {
//be generous with tick timeout to avoid timeout during block awaits
tick_timeout: std::time::Duration::from_secs(600).into(),
bootstrap_retry: retries.clone(),
work_retry: retries.clone(),
teardown_retry: retries.clone(),
}
}

pub fn pipeline(
config: &Config,
wal: WalStore,
chain: ChainStore,
ledger: ApplyDB,
genesis: GenesisFile,
policy: &gasket::runtime::Policy,
retries: &Option<gasket::retries::Policy>,
) -> Result<gasket::daemon::Daemon, Error> {
let pull_cursor = wal
.intersect_options(5)
Expand Down Expand Up @@ -65,6 +87,8 @@ pub fn pipeline(
// output to outside of out pipeline
// apply.downstream.connect(output);

let policy = define_gasket_policy(retries);

let pull = gasket::runtime::spawn_stage(pull, policy.clone());
let roll = gasket::runtime::spawn_stage(roll, policy.clone());
let chain = gasket::runtime::spawn_stage(chain, policy.clone());
Expand Down

0 comments on commit 8c56e5e

Please sign in to comment.