From 9b39315f288aed7feca25b841a2b9b83439e3187 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 23 Oct 2024 14:17:06 +0200 Subject: [PATCH] fix: timeout --- .../hello-world-rabbitmq/docker-compose.yml | 1 + .../hello-world-rabbitmq/leader/src/main.rs | 50 +++++++++++-------- examples/hello-world-rabbitmq/ops/src/lib.rs | 2 +- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/examples/hello-world-rabbitmq/docker-compose.yml b/examples/hello-world-rabbitmq/docker-compose.yml index c4be783..3d9fadf 100644 --- a/examples/hello-world-rabbitmq/docker-compose.yml +++ b/examples/hello-world-rabbitmq/docker-compose.yml @@ -10,6 +10,7 @@ services: environment: - RUST_LOG=info - AMQP_URI=amqp://rabbitmq:5672 + - JOB_TIMEOUT=10 worker: build: context: ../../ diff --git a/examples/hello-world-rabbitmq/leader/src/main.rs b/examples/hello-world-rabbitmq/leader/src/main.rs index 6080983..ae57106 100644 --- a/examples/hello-world-rabbitmq/leader/src/main.rs +++ b/examples/hello-world-rabbitmq/leader/src/main.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use anyhow::Result; @@ -10,7 +11,7 @@ use paladin::{ directive::{indexed_stream::IndexedStream, Directive}, runtime::Runtime, }; -use tracing::info; +use tracing::{error, info, warn}; mod init; @@ -18,45 +19,50 @@ mod init; pub struct Cli { #[command(flatten)] pub options: Config, - #[arg(long, short)] + /// Optional timeout for job in the seconds + #[arg(long, short, env = "JOB_TIMEOUT")] pub timeout: Option, } const INPUT: &str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."; -#[tokio::main] -async fn main() -> Result<()> { - dotenv().ok(); - init::tracing(); - - let args = Cli::parse(); - let runtime = std::sync::Arc::new(Runtime::from_config(&args.options, register()).await?); - - let input: Vec = INPUT.chars().collect(); - let computation = IndexedStream::from(input) - .map(&CharToString) - .fold(&StringConcat); - - let runtime_ = runtime.clone(); +async fn set_abort_timeout_job(timeout: u64, runtime: Arc) { tokio::spawn(async move { - let command_channel = runtime_ + let command_channel = runtime .get_command_ipc_sender() .await .expect("retrieved ipc sender"); - println!("Waiting to abort the execution..."); - tokio::time::sleep(Duration::from_secs(10)).await; - println!("Aborting the execution..."); + tokio::time::sleep(Duration::from_secs(timeout)).await; + warn!("User timeout expired, aborting the execution..."); if let Err(e) = command_channel .publish(&CommandIpc::Abort { routing_key: paladin::runtime::COMMAND_IPC_ABORT_ALL_KEY.into(), }) .await { - println!("Unable to send abort signal: {e}"); + error!("Unable to send abort signal: {e:?}"); } else { - println!("Abort signal successfully sent"); + info!("Abort signal successfully sent"); } }); +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv().ok(); + init::tracing(); + + let args = Cli::parse(); + let runtime = std::sync::Arc::new(Runtime::from_config(&args.options, register()).await?); + + let input: Vec = INPUT.chars().collect(); + let computation = IndexedStream::from(input) + .map(&CharToString) + .fold(&StringConcat); + + if let Some(timeout) = args.timeout { + set_abort_timeout_job(timeout, runtime.clone()).await; + } let result = computation.run(&runtime).await; runtime diff --git a/examples/hello-world-rabbitmq/ops/src/lib.rs b/examples/hello-world-rabbitmq/ops/src/lib.rs index 4ae51a6..5c21aba 100644 --- a/examples/hello-world-rabbitmq/ops/src/lib.rs +++ b/examples/hello-world-rabbitmq/ops/src/lib.rs @@ -33,7 +33,7 @@ impl Operation for CharToString { .load(std::sync::atomic::Ordering::SeqCst) { return Err(OperationError::Fatal { - err: anyhow::anyhow!("aborted on command at CharToString iteration {i} for input {input:?}"), + err: anyhow::anyhow!("aborted per request at CharToString iteration {i} for input {input:?}"), strategy: FatalStrategy::Terminate, }); }