Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
temaniarpit27 committed Aug 29, 2024
1 parent f8cfdfc commit ae637f0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
31 changes: 14 additions & 17 deletions paladin-core/src/channel/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@
//!
//! Ok(())
//! }
use anyhow::Result;
use async_trait::async_trait;
//! ```
use crate::{
channel::{Channel, ChannelFactory, ChannelType},
Expand All @@ -95,7 +93,11 @@ use crate::{
},
serializer::Serializable,
};
use anyhow::Result;
use async_trait::async_trait;
use rand::{distributions::Alphanumeric, Rng};

/// Conversion from [`ChannelType`] to [`QueueOptions`].
impl From<ChannelType> for QueueOptions {
fn from(channel_type: ChannelType) -> Self {
match channel_type {
Expand Down Expand Up @@ -156,26 +158,25 @@ impl<
async fn sender<'a, T: Serializable + 'a>(&self) -> Result<Self::Sender<'a, T>> {
let queue = self
.connection
.declare_queue(self.identifier.as_str(), self.channel_type.into())
.declare_queue(&self.identifier, self.channel_type.into())
.await?;

Ok(queue.publisher())
}

/// Get a receiver for the underlying queue.
async fn receiver<'a, T: Serializable + 'a>(&self) -> Result<Self::Receiver<'a, T>> {
// TODO - this is a bit of a hack, but it works for now
use rand::{distributions::Alphanumeric, Rng}; // 0.8
let iden: String = rand::thread_rng()
let identifier: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.take(10)
.map(char::from)
.collect();

let queue = self
.connection
.declare_queue(self.identifier.as_str(), self.channel_type.into())
.declare_queue(&self.identifier, self.channel_type.into())
.await?;
let consumer = queue.declare_consumer(iden.as_str()).await?;
let consumer = queue.declare_consumer(&identifier).await?;

Ok(consumer)
}
Expand All @@ -186,8 +187,7 @@ impl<
let identifier = self.identifier.clone();

tokio::spawn(async move {
let identifier = identifier.as_str();
_ = conn.delete_queue(identifier).await;
_ = conn.delete_queue(&identifier).await;
});
}
}
Expand All @@ -208,14 +208,11 @@ where
})
}

/// Issue a new channel, generating a new string as the identifier.
/// Issue a new channel, generating a unique identifier.
async fn issue(&self, channel_type: ChannelType) -> Result<(String, Self::Channel)> {
// TODO - Hacky way to generate a unique identifier
use rand::{distributions::Alphanumeric, Rng}; // 0.8

let identifier: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.take(10)
.map(char::from)
.collect();

Expand Down
2 changes: 2 additions & 0 deletions paladin-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct Config {
#[arg(long, help_heading = HELP_HEADING, env = "AMQP_URI", required_if_eq("runtime", "amqp"))]
pub amqp_uri: Option<String>,

/// Provides the routing key for workers to listen on, if the AMQP runtime
#[arg(long, help_heading = HELP_HEADING)]
pub task_bus_routing_key: Option<String>,
}

Expand Down
20 changes: 10 additions & 10 deletions paladin-core/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use std::{
use anyhow::Result;
use dashmap::{mapref::entry::Entry, DashMap};
use futures::{stream::BoxStream, Stream, StreamExt};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use tokio::{select, task::JoinHandle, try_join};
use tracing::{debug_span, error, instrument, trace, warn, Instrument};
Expand Down Expand Up @@ -130,10 +131,7 @@ impl Runtime {
let channel_factory = DynamicChannelFactory::from_config(config).await?;
let task_channel = channel_factory
.get(
config
.task_bus_routing_key
.clone()
.unwrap_or("".to_string()),
config.task_bus_routing_key.clone().unwrap_or_default(),
ChannelType::ExactlyOnce,
)
.await?;
Expand Down Expand Up @@ -413,10 +411,7 @@ impl WorkerRuntime {
let channel_factory = DynamicChannelFactory::from_config(config).await?;
let task_channel = channel_factory
.get(
config
.task_bus_routing_key
.clone()
.unwrap_or("".to_string()),
config.task_bus_routing_key.clone().unwrap_or_default(),
ChannelType::ExactlyOnce,
)
.await?;
Expand Down Expand Up @@ -658,9 +653,14 @@ impl WorkerRuntime {
}
});

let identifier: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect();

// Create a watch channel for signaling IPC changes while processing a task.
let (ipc_sig_term_tx, ipc_sig_term_rx) =
tokio::sync::watch::channel::<String>("".to_string());
let (ipc_sig_term_tx, ipc_sig_term_rx) = tokio::sync::watch::channel::<String>(identifier);

// Spawn a task that will listen for IPC termination signals and mark jobs as
// terminated.
Expand Down

0 comments on commit ae637f0

Please sign in to comment.