Skip to content

Commit

Permalink
Assign specific jobs to dedicated workers (#35)
Browse files Browse the repository at this point in the history
* poc

* fix clippy issues

* remove uuid

* refactor

* fix review comments

* change default queue name

* fix review comments
  • Loading branch information
temaniarpit27 authored Sep 11, 2024
1 parent 63f3f8b commit 7f0e649
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 101 deletions.
13 changes: 1 addition & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion paladin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "paladin"
[dependencies]
anyhow = { version = "1.0.75", features = ["backtrace"] }
dotenvy = "0.15.7"
rand = "0.8.5"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
lapin = "2.3.1"
Expand All @@ -30,7 +31,6 @@ serde = "1.0.183"
async-trait = "0.1.73"
ciborium = "0.2.1"
futures = "0.3.28"
uuid = { version = "1.6.1", features = ["v4", "fast-rng", "serde"] }
clap = { version = "4.4.2", features = ["derive", "env"] }
pin-project = "1.1.3"
thiserror = "1.0.50"
Expand Down
14 changes: 6 additions & 8 deletions paladin-core/src/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
//! - It supports a notion of message acknowledgement.
//! - It supports a notion of resource release.
//! - Rather than returning a tuple of `(sender, receiver)`, it breaks each into
//! separate methods.
//! This is because generally senders and receivers are usually instantiated in
//! separate process, as the channel is meant to facilitate inter process
//! communication. This avoids instantiating unnecessary resources when only one
//! is needed.
//! separate methods. This is because generally senders and receivers are
//! usually instantiated in separate process, as the channel is meant to
//! facilitate inter process communication. This avoids instantiating
//! unnecessary resources when only one is needed.

use std::{
pin::Pin,
Expand All @@ -20,7 +19,6 @@ use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use pin_project::{pin_project, pinned_drop};
use uuid::Uuid;

use crate::{acker::Acker, queue::Publisher, serializer::Serializable};

Expand Down Expand Up @@ -70,11 +68,11 @@ pub trait ChannelFactory {

/// Retrieve an existing channel. An identifier is provided when a channel
/// is issued.
async fn get(&self, identifier: Uuid, channel_type: ChannelType) -> Result<Self::Channel>;
async fn get(&self, identifier: String, channel_type: ChannelType) -> Result<Self::Channel>;

/// Issue a new channel. An identifier is returned which can be used to
/// retrieve the channel later in some other process.
async fn issue(&self, channel_type: ChannelType) -> Result<(Uuid, Self::Channel)>;
async fn issue(&self, channel_type: ChannelType) -> Result<(String, Self::Channel)>;
}

/// Guard a channel and embed a particular pipe in the lease guard.
Expand Down
42 changes: 17 additions & 25 deletions paladin-core/src/channel/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! queue::{Connection, Publisher, amqp::{AMQPConnection, AMQPConnectionOptions}},
//! channel::{Channel, ChannelType, ChannelFactory, queue::QueueChannelFactory},
//! };
//! use uuid::Uuid;
//! use serde::{Serialize, Deserialize};
//! use anyhow::Result;
//!
Expand Down Expand Up @@ -85,19 +84,21 @@
//!
//! Ok(())
//! }
//! ```

use anyhow::Result;
use async_trait::async_trait;
use uuid::Uuid;

use crate::{
channel::{Channel, ChannelFactory, ChannelType},
common::get_random_routing_key,
queue::{
Connection, DeliveryMode, QueueDurability, QueueHandle, QueueOptions, SyndicationMode,
},
serializer::Serializable,
};

/// Conversion from [`ChannelType`] to [`QueueOptions`].
impl From<ChannelType> for QueueOptions {
fn from(channel_type: ChannelType) -> Self {
match channel_type {
Expand Down Expand Up @@ -134,7 +135,7 @@ impl<Conn> QueueChannelFactory<Conn> {
#[derive(Clone)]
pub struct QueueChannel<Conn> {
connection: Conn,
identifier: Uuid,
identifier: String,
channel_type: ChannelType,
}

Expand All @@ -158,42 +159,32 @@ impl<
async fn sender<'a, T: Serializable + 'a>(&self) -> Result<Self::Sender<'a, T>> {
let queue = self
.connection
.declare_queue(
self.identifier
.as_simple()
.encode_lower(&mut Uuid::encode_buffer()),
self.channel_type.into(),
)
.declare_queue(&self.identifier, self.channel_type.into())
.await?;

Ok(queue.publisher())
}

/// Get a receiver for the underlying queue.
async fn receiver<'a, T: Serializable + 'a>(&self) -> Result<Self::Receiver<'a, T>> {
let identifier: String = get_random_routing_key();

let queue = self
.connection
.declare_queue(
self.identifier
.as_simple()
.encode_lower(&mut Uuid::encode_buffer()),
self.channel_type.into(),
)
.declare_queue(&self.identifier, self.channel_type.into())
.await?;
let consumer = queue.declare_consumer(&Uuid::new_v4().to_string()).await?;
let consumer = queue.declare_consumer(&identifier).await?;

Ok(consumer)
}

/// Delete the underlying queue.
fn release(&self) {
let conn = self.connection.clone();
let identifier = self.identifier;
let identifier = self.identifier.clone();

tokio::spawn(async move {
let buffer = &mut Uuid::encode_buffer();
let identifier = identifier.as_simple().encode_lower(buffer);
_ = conn.delete_queue(identifier).await;
_ = conn.delete_queue(&identifier).await;
});
}
}
Expand All @@ -206,19 +197,20 @@ where
type Channel = QueueChannel<Conn>;

/// Get an existing channel.
async fn get(&self, identifier: Uuid, channel_type: ChannelType) -> Result<Self::Channel> {
async fn get(&self, identifier: String, channel_type: ChannelType) -> Result<Self::Channel> {
Ok(QueueChannel {
connection: self.connection.clone(),
identifier,
channel_type,
})
}

/// Issue a new channel, generating a new UUID as the identifier.
async fn issue(&self, channel_type: ChannelType) -> Result<(Uuid, Self::Channel)> {
let identifier = Uuid::new_v4();
/// Issue a new channel, generating a unique identifier.
async fn issue(&self, channel_type: ChannelType) -> Result<(String, Self::Channel)> {
let identifier: String = get_random_routing_key();

Ok((
identifier,
identifier.clone(),
QueueChannel {
connection: self.connection.clone(),
identifier,
Expand Down
9 changes: 9 additions & 0 deletions paladin-core/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use rand::{distributions::Alphanumeric, Rng};

pub fn get_random_routing_key() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(5)
.map(char::from)
.collect()
}
5 changes: 5 additions & 0 deletions paladin-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pub struct Config {
/// Provides the URI for the AMQP broker, if the AMQP runtime is selected.
#[arg(long, help_heading = HELP_HEADING, env = "AMQP_URI", required_if_eq("runtime", "amqp"))]
pub amqp_uri: Option<String>,

/// Provides the routing key for workers to listen on, if the AMQP runtime
/// is used in configuration.
#[arg(long, help_heading = HELP_HEADING)]
pub task_bus_routing_key: Option<String>,
}

/// Enumerates the available serialization formats.
Expand Down
5 changes: 2 additions & 3 deletions paladin-core/src/directive/indexed_stream/foldable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::{
};
use serde::{Deserialize, Serialize};
use tokio::{select, sync::Notify};
use uuid::Uuid;

use super::IndexedStream;
use crate::{
Expand Down Expand Up @@ -83,7 +82,7 @@ struct Dispatcher<'a, M: Monoid> {
m: &'a M,
assembler: Arc<ContiguousQueue<TaskOutput<M, Metadata>>>,
tx: Sender<Task<'a, M, Metadata>>,
channel_identifier: Uuid,
channel_identifier: String,
}

impl<'a, Op: Monoid + 'static> Dispatcher<'a, Op> {
Expand All @@ -104,7 +103,7 @@ impl<'a, Op: Monoid + 'static> Dispatcher<'a, Op> {
async fn try_dispatch(&self, result: TaskOutput<Op, Metadata>) -> Result<()> {
if let Some((lhs, rhs)) = self.assembler.acquire_contiguous_pair_or_queue(result) {
let task = Task {
routing_key: self.channel_identifier,
routing_key: self.channel_identifier.clone(),
metadata: Metadata {
range: *lhs.metadata.range.start()..=*rhs.metadata.range.end(),
},
Expand Down
2 changes: 1 addition & 1 deletion paladin-core/src/directive/indexed_stream/functor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<'a, A: Send + Sync + 'a, B: Send + 'a> Functor<'a, B> for IndexedStream<'a,
// the output stream will be incomplete.
let sender_stream = futures::stream::once(async move {
let task_stream = self.map_ok(|(idx, input)| Task {
routing_key: channel_identifier,
routing_key: channel_identifier.clone(),
metadata: Metadata { idx },
op,
input,
Expand Down
5 changes: 3 additions & 2 deletions paladin-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
//! impl Operation for FibAt {
//! type Input = u64;
//! type Output = u64;
//!
//!
//! fn execute(&self, input: Self::Input) -> Result<Self::Output> {
//! match input {
//! 0 => Ok(0),
Expand Down Expand Up @@ -102,7 +102,7 @@
//! # impl Operation for FibAt {
//! # type Input = u64;
//! # type Output = u64;
//! #
//! #
//! # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
//! # match input {
//! # 0 => Ok(0),
Expand Down Expand Up @@ -208,6 +208,7 @@
//! general, most of your logic should exist in `ops` and `leader`.
pub mod acker;
pub mod channel;
pub mod common;
pub mod config;
pub mod contiguous;
pub mod directive;
Expand Down
5 changes: 2 additions & 3 deletions paladin-core/src/runtime/dynamic_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use uuid::Uuid;

use crate::{
acker::Acker,
Expand Down Expand Up @@ -98,7 +97,7 @@ pub enum DynamicChannelFactory {
impl ChannelFactory for DynamicChannelFactory {
type Channel = DynamicChannel;

async fn get(&self, identifier: Uuid, channel_type: ChannelType) -> Result<DynamicChannel> {
async fn get(&self, identifier: String, channel_type: ChannelType) -> Result<DynamicChannel> {
match self {
Self::Amqp(factory) => Ok(DynamicChannel::Amqp(
factory.get(identifier, channel_type).await?,
Expand All @@ -109,7 +108,7 @@ impl ChannelFactory for DynamicChannelFactory {
}
}

async fn issue(&self, channel_type: ChannelType) -> Result<(Uuid, DynamicChannel)> {
async fn issue(&self, channel_type: ChannelType) -> Result<(String, DynamicChannel)> {
match self {
Self::Amqp(factory) => {
let (identifier, channel) = factory.issue(channel_type).await?;
Expand Down
Loading

0 comments on commit 7f0e649

Please sign in to comment.