Skip to content

Commit

Permalink
Obviate 'static lifetime Clone requirements for operations
Browse files Browse the repository at this point in the history
  • Loading branch information
cpubot committed Nov 30, 2023
1 parent 540735a commit d3f4aed
Show file tree
Hide file tree
Showing 26 changed files with 630 additions and 829 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions examples/hello-world-rabbitmq/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/hello-world-rabbitmq/leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ async fn main() -> Result<()> {

let input = ['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'];
let computation = IndexedStream::from(input)
.map(CharToString)
.fold(StringConcat);
.map(&CharToString)
.fold(&StringConcat);

let result = computation.run(&runtime).await;
runtime.close().await?;
Expand Down
30 changes: 17 additions & 13 deletions examples/hello-world-rabbitmq/ops/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
use std::fmt::Debug;

use paladin::{
operation::{Monoid, Operation, Result},
opkind_derive::OpKind,
registry, RemoteExecute,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
registry!();

#[derive(Serialize, Deserialize, RemoteExecute)]
pub struct CharToString;

impl Operation for CharToString {
type Input = char;
type Output = String;
type Kind = Ops;

fn execute(&self, input: Self::Input) -> Result<Self::Output> {
Ok(input.to_string())
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[derive(Serialize, Deserialize, RemoteExecute)]
pub struct StringLength;
impl Operation for StringLength {
type Input = String;
type Output = usize;

fn execute(&self, input: Self::Input) -> Result<Self::Output> {
Ok(input.len())
}
}

#[derive(Serialize, Deserialize, RemoteExecute)]
pub struct StringConcat;

impl Monoid for StringConcat {
type Elem = String;
type Kind = Ops;

fn empty(&self) -> Self::Elem {
String::new()
}
Expand All @@ -33,9 +43,3 @@ impl Monoid for StringConcat {
Ok(a + &b)
}
}

#[derive(OpKind, Debug, Serialize, Deserialize, Clone, Copy)]
pub enum Ops {
CharToString(CharToString),
StringConcat(StringConcat),
}
5 changes: 3 additions & 2 deletions examples/hello-world-rabbitmq/worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use clap::Parser;
use dotenvy::dotenv;
use ops::Ops;
use ops::register;
use paladin::{config::Config, runtime::WorkerRuntime};

mod init;
Expand All @@ -16,9 +16,10 @@ pub struct Cli {
async fn main() -> Result<()> {
dotenv().ok();
init::tracing();

let args = Cli::parse();

let runtime: WorkerRuntime<Ops> = WorkerRuntime::from_config(&args.options).await?;
let runtime = WorkerRuntime::from_config(&args.options, register()).await?;
runtime.main_loop().await?;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions paladin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dashmap = "5.5.3"
bytes = "1.5.0"
crossbeam = "0.8.2"
postcard = { version = "1.0.8", features = ["alloc"] }
linkme = "0.3.17"

# Local dependencies
paladin-opkind-derive = { path = "../paladin-opkind-derive" }
Expand Down
2 changes: 1 addition & 1 deletion paladin-core/src/acker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use futures::TryFutureExt;
/// acknowledgement. Implementers of this trait should provide the actual logic
/// for acknowledging a message in the `ack` and `nack` methods.
#[async_trait]
pub trait Acker: Send + Sync + 'static {
pub trait Acker: Send + Sync {
async fn ack(&self) -> Result<()>;

async fn nack(&self) -> Result<()>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn err_closed<T>() -> anyhow::Result<T> {
bail!(CoordinatedSinkError::SinkClosed)
}

impl<T: Unpin, Inner: Sink<T, Error = anyhow::Error>> Sink<T> for CoordinatedSink<T, Inner> {
impl<T, Inner: Sink<T, Error = anyhow::Error>> Sink<T> for CoordinatedSink<T, Inner> {
type Error = anyhow::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand Down
2 changes: 1 addition & 1 deletion paladin-core/src/channel/coordinated_channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl ChannelState {
/// [`coordinated_channel`] solves these problems by binding the sender and
/// receiver to a shared state that tracks sender closure and pending sends.
pub fn coordinated_channel<
A: Unpin,
A,
B,
Sender: Sink<A, Error = anyhow::Error>,
Receiver: Stream<Item = B>,
Expand Down
8 changes: 4 additions & 4 deletions paladin-core/src/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ pub enum ChannelType {
/// is needed.
#[async_trait]
pub trait Channel {
type Sender<T: Serializable>: Sink<T>;
type Sender<'a, T: Serializable + 'a>: Sink<T>;
type Acker: Acker;
type Receiver<T: Serializable>: Stream<Item = (T, Self::Acker)>;
type Receiver<'a, T: Serializable + 'a>: Stream<Item = (T, Self::Acker)>;

async fn close(&self) -> Result<()>;

/// Acquire the sender side of the channel.
async fn sender<T: Serializable>(&self) -> Result<Self::Sender<T>>;
async fn sender<'a, T: Serializable + 'a>(&self) -> Result<Self::Sender<'a, T>>;

/// Acquire the receiver side of the channel.
async fn receiver<T: Serializable>(&self) -> Result<Self::Receiver<T>>;
async fn receiver<'a, T: Serializable + 'a>(&self) -> Result<Self::Receiver<'a, T>>;

/// Mark the channel for release.
fn release(&self);
Expand Down
8 changes: 4 additions & 4 deletions paladin-core/src/channel/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ impl<
> Channel for QueueChannel<Conn>
{
type Acker = <QHandle as QueueHandle>::Acker;
type Sender<T: Serializable> = QueueSink<T, QHandle>;
type Receiver<T: Serializable> = <QHandle as QueueHandle>::Consumer<T>;
type Sender<'a, T: Serializable + 'a> = QueueSink<'a, T, QHandle>;
type Receiver<'a, T: Serializable + 'a> = <QHandle as QueueHandle>::Consumer<T>;

/// Close the underlying connection.
async fn close(&self) -> Result<()> {
Expand All @@ -156,7 +156,7 @@ impl<
}

/// Get a sender for the underlying queue.
async fn sender<T: Serializable>(&self) -> Result<Self::Sender<T>> {
async fn sender<'a, T: Serializable + 'a>(&self) -> Result<Self::Sender<'a, T>> {
let queue = self
.connection
.declare_queue(&self.identifier, self.channel_type.into())
Expand All @@ -165,7 +165,7 @@ impl<
}

/// Get a receiver for the underlying queue.
async fn receiver<T: Serializable>(&self) -> Result<Self::Receiver<T>> {
async fn receiver<'a, T: Serializable + 'a>(&self) -> Result<Self::Receiver<'a, T>> {
let queue = self
.connection
.declare_queue(&self.identifier, self.channel_type.into())
Expand Down
49 changes: 24 additions & 25 deletions paladin-core/src/directive/indexed_stream/foldable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ impl<Op: Operation> Contiguous for TaskOutput<Op, Metadata> {
}
}

type Sender<Op> = Box<dyn Sink<Task<Op, Metadata>, Error = anyhow::Error> + Send + Unpin>;
type Sender<'a, Op> =
Box<dyn Sink<Task<'a, Op, Metadata>, Error = anyhow::Error> + Send + Unpin + 'a>;

/// A [`Dispatcher`] abstracts over the common functionality of queuing and
/// dispatching contiguous [`Task`]s to worker processes.
Expand All @@ -69,19 +70,22 @@ type Sender<Op> = Box<dyn Sink<Task<Op, Metadata>, Error = anyhow::Error> + Send
/// - Dequeuing [`TaskResult`]s.
/// - Attempting to dispatch [`TaskResult`]s if they are contiguous with another
/// [`TaskResult`].
struct Dispatcher<Op: Monoid> {
struct Dispatcher<'a, Op: Monoid> {
op: &'a Op,
assembler: Arc<Mutex<ContiguousQueue<TaskOutput<Op, Metadata>>>>,
sender: Arc<Mutex<Sender<Op>>>,
sender: Arc<Mutex<Sender<'a, Op>>>,
channel_identifier: String,
}

impl<Op: Monoid> Dispatcher<Op> {
impl<'a, Op: Monoid> Dispatcher<'a, Op> {
fn new(
op: &'a Op,
assembler: Arc<Mutex<ContiguousQueue<TaskOutput<Op, Metadata>>>>,
sender: Arc<Mutex<Sender<Op>>>,
sender: Arc<Mutex<Sender<'a, Op>>>,
channel_identifier: String,
) -> Self {
Self {
op,
assembler,
sender,
channel_identifier,
Expand Down Expand Up @@ -113,7 +117,7 @@ impl<Op: Monoid> Dispatcher<Op> {
metadata: Metadata {
range: *lhs.metadata.range.start()..=*rhs.metadata.range.end(),
},
op: lhs.op.clone(),
op: self.op,
input: (lhs.output, rhs.output),
};
let mut sender = self.sender.lock().await;
Expand All @@ -125,18 +129,19 @@ impl<Op: Monoid> Dispatcher<Op> {
}

#[async_trait]
impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
async fn f_fold<M: Monoid<Elem = A>>(self, m: M, runtime: &Runtime) -> Result<A> {
let (channel_identifier, sender, mut receiver) = runtime
.lease_coordinated_task_channel::<M, Metadata>()
.await?;
impl<'a, A: Send + 'a, B: Send + 'a> Foldable<'a, B> for IndexedStream<'a, A> {
async fn f_fold<M: Monoid<Elem = A>>(self, m: &'a M, runtime: &Runtime) -> Result<A> {
let (channel_identifier, sender, mut receiver) =
runtime.lease_coordinated_task_channel().await?;

// mutable access to the assembler. So we wrap it in an Arc<Mutex<>>.
let assembler = Arc::new(Mutex::new(ContiguousQueue::new()));
// Both the initialization step and the result stream need to asynchronous
// mutable access to the sender. So we wrap it in an Arc<Mutex<>>.
let sender = Arc::new(Mutex::new(sender));
// Initialize the dispatcher.
let dispatcher = Arc::new(Dispatcher::new(
m,
assembler.clone(),
sender.clone(),
channel_identifier.clone(),
Expand Down Expand Up @@ -165,7 +170,6 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
// of the job by tallying the number of inputs.
let init = self
.try_fold(0, |sum, (idx, item)| {
let op = m.clone();
let dispatcher = dispatcher.clone();
let should_dispatch = should_dispatch.clone();

Expand All @@ -175,7 +179,6 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
// Because this represents an uncombined input, we set the
// range equal to its index.
let item_result = TaskOutput {
op,
metadata: Metadata { range: idx..=idx },
output: item,
};
Expand All @@ -187,7 +190,7 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
} else {
// Now that we've seen at least two inputs, we can start dispatching.
// Notify the result stream that it can start consuming.
should_dispatch.notify_waiters();
should_dispatch.notify_one();
// Dispatch the task.
dispatcher.try_dispatch(item_result).await?;
}
Expand All @@ -204,11 +207,10 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
Ok::<_, anyhow::Error>(size)
});

let result_handle = tokio::spawn({
let result_handle = {
let resolved_input_size = resolved_input_size.clone();
let dispatcher = dispatcher.clone();
let should_dispatch = should_dispatch.clone();
let op = m.clone();

async move {
// Wait until at least two inputs have been received.
Expand All @@ -218,8 +220,9 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
let result = result?;
// Check to see if the input size is known.
if let Some(job_size) = *resolved_input_size.read().await {
// If it is, we can check to see if the result is the final result (i.e.,
// its range comprises the size of the input).
// If it is, we can check to see if the result is the final result
// (i.e., its range comprises the size of
// the input).
if result.is_final(job_size) {
sender.lock().await.close().await?;
return Ok(result.output);
Expand All @@ -230,21 +233,17 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
acker.ack().await?;
}

Ok(op.empty())
Ok(m.empty())
}
});
};

let size = init.await?;
match size {
0 => {
// Abort the result handle, as it will never receive a result.
result_handle.abort();
// If the input is empty, we return the empty element.
return Ok(m.empty());
}
1 => {
// Abort the result handle, as it will never receive a result.
result_handle.abort();
// If the input has a single element, we return it.
// The dispatcher is guaranteed to have queued the single input element at this
// point, so we can safely dequeue it. If it doesn't, this is a
Expand All @@ -258,6 +257,6 @@ impl<A: Send + 'static, B: Send + 'static> Foldable<B> for IndexedStream<A> {
_ => {}
}

result_handle.await?
result_handle.await
}
}
Loading

0 comments on commit d3f4aed

Please sign in to comment.