diff --git a/paladin-core/src/channel/queue.rs b/paladin-core/src/channel/queue.rs index d80e18d..3e61274 100644 --- a/paladin-core/src/channel/queue.rs +++ b/paladin-core/src/channel/queue.rs @@ -84,9 +84,7 @@ //! //! Ok(()) //! } - -use anyhow::Result; -use async_trait::async_trait; +//! ``` use crate::{ channel::{Channel, ChannelFactory, ChannelType}, @@ -95,7 +93,11 @@ use crate::{ }, serializer::Serializable, }; +use anyhow::Result; +use async_trait::async_trait; +use rand::{distributions::Alphanumeric, Rng}; +/// Conversion from [`ChannelType`] to [`QueueOptions`]. impl From for QueueOptions { fn from(channel_type: ChannelType) -> Self { match channel_type { @@ -156,7 +158,7 @@ impl< async fn sender<'a, T: Serializable + 'a>(&self) -> Result> { let queue = self .connection - .declare_queue(self.identifier.as_str(), self.channel_type.into()) + .declare_queue(&self.identifier, self.channel_type.into()) .await?; Ok(queue.publisher()) @@ -164,18 +166,17 @@ impl< /// Get a receiver for the underlying queue. async fn receiver<'a, T: Serializable + 'a>(&self) -> Result> { - // TODO - this is a bit of a hack, but it works for now - use rand::{distributions::Alphanumeric, Rng}; // 0.8 - let iden: String = rand::thread_rng() + let identifier: String = rand::thread_rng() .sample_iter(&Alphanumeric) - .take(7) + .take(10) .map(char::from) .collect(); + let queue = self .connection - .declare_queue(self.identifier.as_str(), self.channel_type.into()) + .declare_queue(&self.identifier, self.channel_type.into()) .await?; - let consumer = queue.declare_consumer(iden.as_str()).await?; + let consumer = queue.declare_consumer(&identifier).await?; Ok(consumer) } @@ -186,8 +187,7 @@ impl< let identifier = self.identifier.clone(); tokio::spawn(async move { - let identifier = identifier.as_str(); - _ = conn.delete_queue(identifier).await; + _ = conn.delete_queue(&identifier).await; }); } } @@ -208,14 +208,11 @@ where }) } - /// Issue a new channel, generating a new string as the identifier. + /// Issue a new channel, generating a unique identifier. async fn issue(&self, channel_type: ChannelType) -> Result<(String, Self::Channel)> { - // TODO - Hacky way to generate a unique identifier - use rand::{distributions::Alphanumeric, Rng}; // 0.8 - let identifier: String = rand::thread_rng() .sample_iter(&Alphanumeric) - .take(7) + .take(10) .map(char::from) .collect(); diff --git a/paladin-core/src/config/mod.rs b/paladin-core/src/config/mod.rs index 5020e8c..fcdceea 100644 --- a/paladin-core/src/config/mod.rs +++ b/paladin-core/src/config/mod.rs @@ -41,6 +41,8 @@ pub struct Config { #[arg(long, help_heading = HELP_HEADING, env = "AMQP_URI", required_if_eq("runtime", "amqp"))] pub amqp_uri: Option, + /// Provides the routing key for workers to listen on, if the AMQP runtime + #[arg(long, help_heading = HELP_HEADING)] pub task_bus_routing_key: Option, } diff --git a/paladin-core/src/runtime/mod.rs b/paladin-core/src/runtime/mod.rs index 08e04af..c465c7d 100644 --- a/paladin-core/src/runtime/mod.rs +++ b/paladin-core/src/runtime/mod.rs @@ -56,6 +56,7 @@ use std::{ use anyhow::Result; use dashmap::{mapref::entry::Entry, DashMap}; use futures::{stream::BoxStream, Stream, StreamExt}; +use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use tokio::{select, task::JoinHandle, try_join}; use tracing::{debug_span, error, instrument, trace, warn, Instrument}; @@ -130,10 +131,7 @@ impl Runtime { let channel_factory = DynamicChannelFactory::from_config(config).await?; let task_channel = channel_factory .get( - config - .task_bus_routing_key - .clone() - .unwrap_or("".to_string()), + config.task_bus_routing_key.clone().unwrap_or_default(), ChannelType::ExactlyOnce, ) .await?; @@ -413,10 +411,7 @@ impl WorkerRuntime { let channel_factory = DynamicChannelFactory::from_config(config).await?; let task_channel = channel_factory .get( - config - .task_bus_routing_key - .clone() - .unwrap_or("".to_string()), + config.task_bus_routing_key.clone().unwrap_or_default(), ChannelType::ExactlyOnce, ) .await?; @@ -658,9 +653,14 @@ impl WorkerRuntime { } }); + let identifier: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(10) + .map(char::from) + .collect(); + // Create a watch channel for signaling IPC changes while processing a task. - let (ipc_sig_term_tx, ipc_sig_term_rx) = - tokio::sync::watch::channel::("".to_string()); + let (ipc_sig_term_tx, ipc_sig_term_rx) = tokio::sync::watch::channel::(identifier); // Spawn a task that will listen for IPC termination signals and mark jobs as // terminated.