diff --git a/.cargo/katex-header.html b/.cargo/katex-header.html
deleted file mode 100644
index 5db5bc0..0000000
--- a/.cargo/katex-header.html
+++ /dev/null
@@ -1,30 +0,0 @@
-
-
-
-
\ No newline at end of file
diff --git a/examples/hello-world-rabbitmq/docker-compose.yml b/examples/hello-world-rabbitmq/docker-compose.yml
index c4be783..3d9fadf 100644
--- a/examples/hello-world-rabbitmq/docker-compose.yml
+++ b/examples/hello-world-rabbitmq/docker-compose.yml
@@ -10,6 +10,7 @@ services:
environment:
- RUST_LOG=info
- AMQP_URI=amqp://rabbitmq:5672
+ - JOB_TIMEOUT=10
worker:
build:
context: ../../
diff --git a/examples/hello-world-rabbitmq/leader/src/main.rs b/examples/hello-world-rabbitmq/leader/src/main.rs
index 6080983..ae57106 100644
--- a/examples/hello-world-rabbitmq/leader/src/main.rs
+++ b/examples/hello-world-rabbitmq/leader/src/main.rs
@@ -1,3 +1,4 @@
+use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
@@ -10,7 +11,7 @@ use paladin::{
directive::{indexed_stream::IndexedStream, Directive},
runtime::Runtime,
};
-use tracing::info;
+use tracing::{error, info, warn};
mod init;
@@ -18,45 +19,50 @@ mod init;
pub struct Cli {
#[command(flatten)]
pub options: Config,
- #[arg(long, short)]
+ /// Optional timeout for job in the seconds
+ #[arg(long, short, env = "JOB_TIMEOUT")]
pub timeout: Option,
}
const INPUT: &str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";
-#[tokio::main]
-async fn main() -> Result<()> {
- dotenv().ok();
- init::tracing();
-
- let args = Cli::parse();
- let runtime = std::sync::Arc::new(Runtime::from_config(&args.options, register()).await?);
-
- let input: Vec = INPUT.chars().collect();
- let computation = IndexedStream::from(input)
- .map(&CharToString)
- .fold(&StringConcat);
-
- let runtime_ = runtime.clone();
+async fn set_abort_timeout_job(timeout: u64, runtime: Arc) {
tokio::spawn(async move {
- let command_channel = runtime_
+ let command_channel = runtime
.get_command_ipc_sender()
.await
.expect("retrieved ipc sender");
- println!("Waiting to abort the execution...");
- tokio::time::sleep(Duration::from_secs(10)).await;
- println!("Aborting the execution...");
+ tokio::time::sleep(Duration::from_secs(timeout)).await;
+ warn!("User timeout expired, aborting the execution...");
if let Err(e) = command_channel
.publish(&CommandIpc::Abort {
routing_key: paladin::runtime::COMMAND_IPC_ABORT_ALL_KEY.into(),
})
.await
{
- println!("Unable to send abort signal: {e}");
+ error!("Unable to send abort signal: {e:?}");
} else {
- println!("Abort signal successfully sent");
+ info!("Abort signal successfully sent");
}
});
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ dotenv().ok();
+ init::tracing();
+
+ let args = Cli::parse();
+ let runtime = std::sync::Arc::new(Runtime::from_config(&args.options, register()).await?);
+
+ let input: Vec = INPUT.chars().collect();
+ let computation = IndexedStream::from(input)
+ .map(&CharToString)
+ .fold(&StringConcat);
+
+ if let Some(timeout) = args.timeout {
+ set_abort_timeout_job(timeout, runtime.clone()).await;
+ }
let result = computation.run(&runtime).await;
runtime
diff --git a/examples/hello-world-rabbitmq/ops/src/lib.rs b/examples/hello-world-rabbitmq/ops/src/lib.rs
index 4ae51a6..5c21aba 100644
--- a/examples/hello-world-rabbitmq/ops/src/lib.rs
+++ b/examples/hello-world-rabbitmq/ops/src/lib.rs
@@ -33,7 +33,7 @@ impl Operation for CharToString {
.load(std::sync::atomic::Ordering::SeqCst)
{
return Err(OperationError::Fatal {
- err: anyhow::anyhow!("aborted on command at CharToString iteration {i} for input {input:?}"),
+ err: anyhow::anyhow!("aborted per request at CharToString iteration {i} for input {input:?}"),
strategy: FatalStrategy::Terminate,
});
}