Skip to content

Commit

Permalink
Make IndexedStream's foldable implementation fully concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
cpubot committed Dec 3, 2023
1 parent 9b5ce7e commit 62f4d1e
Show file tree
Hide file tree
Showing 22 changed files with 589 additions and 495 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions examples/hello-world-rabbitmq/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions paladin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bytes = { version = "1.5.0", features = ["std", "serde"] }
crossbeam = "0.8.2"
postcard = { version = "1.0.8", features = ["use-std"] }
linkme = "0.3.17"
crossbeam-skiplist = "0.1.1"

# Local dependencies
paladin-opkind-derive = { path = "../paladin-opkind-derive" }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! The [`Publisher`] end of a coordinated channel.
//!
//! The [`CoordinatedPublisher`] wraps a [`Publisher`] and does the following:
//! - Keeps track of the number of pending sends.
//! - Closes the channel when dropped or explicitly closed.
use std::sync::{atomic::Ordering, Arc};

use anyhow::{bail, Result};
use async_trait::async_trait;
use thiserror::Error;

use super::ChannelState;
use crate::queue::Publisher;

pub struct CoordinatedPublisher<T, Inner> {
inner: Inner,
state: Arc<ChannelState>,
_marker: std::marker::PhantomData<T>,
}

impl<T, Inner> CoordinatedPublisher<T, Inner> {
pub fn new(inner: Inner, state: Arc<ChannelState>) -> Self {
Self {
inner,
state,
_marker: std::marker::PhantomData,
}
}
}

impl<T, Inner> Drop for CoordinatedPublisher<T, Inner> {
fn drop(&mut self) {
self.state.close();
}
}

#[derive(Debug, Error)]
pub enum CoordinatedPublisherError {
#[error("Inner error: {0}")]
Inner(#[from] anyhow::Error),

#[error("Publisher is closed")]
PublisherClosed,
}

fn err_closed<T>() -> anyhow::Result<T> {
bail!(CoordinatedPublisherError::PublisherClosed)
}

#[async_trait]
impl<T, Inner> Publisher<T> for CoordinatedPublisher<T, Inner>
where
T: Send + Sync,
Inner: Publisher<T> + Sync,
{
async fn publish(&self, payload: &T) -> Result<()> {
if self.state.closed.load(Ordering::SeqCst) {
return err_closed();
}

self.state.num_pending_sends.fetch_add(1, Ordering::SeqCst);

self.inner
.publish(payload)
.await
.map_err(CoordinatedPublisherError::Inner)?;

Ok(())
}

async fn close(&self) -> Result<()> {
self.inner.close().await?;
self.state.close();
Ok(())
}
}
122 changes: 0 additions & 122 deletions paladin-core/src/channel/coordinated_channel/coordinated_sink.rs

This file was deleted.

22 changes: 9 additions & 13 deletions paladin-core/src/channel/coordinated_channel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! State coordination between two distinct channel pipes (a
//! [`Sink`] and a [`Stream`]).
//! [`Publisher`] and a [`Stream`]).
//!
//! Generally when working with channels, the expectation is that closing or
//! dropping a sender will signal to the receiver that the channel is closed.
Expand All @@ -17,11 +17,12 @@ use std::sync::{
Arc,
};

use futures::{Sink, Stream};
use futures::Stream;

use self::{coordinated_sink::CoordinatedSink, coordinated_stream::CoordinatedStream};
use self::{coordinated_publisher::CoordinatedPublisher, coordinated_stream::CoordinatedStream};
use crate::queue::Publisher;

pub mod coordinated_sink;
pub mod coordinated_publisher;
pub mod coordinated_stream;

/// The shared state between the sender and receiver of a coordinated channel.
Expand Down Expand Up @@ -50,7 +51,7 @@ impl ChannelState {
}

/// State coordination between two distinct channel pipes (a
/// [`Sink`] and a [`Stream`]).
/// [`Publisher`] and a [`Stream`]).
///
/// Generally when working with channels, the expectation is that closing or
/// dropping a sender will signal to the receiver that the channel is closed.
Expand All @@ -61,17 +62,12 @@ impl ChannelState {
///
/// [`coordinated_channel`] solves these problems by binding the sender and
/// receiver to a shared state that tracks sender closure and pending sends.
pub fn coordinated_channel<
A,
B,
Sender: Sink<A, Error = anyhow::Error>,
Receiver: Stream<Item = B>,
>(
pub fn coordinated_channel<A, B, Sender: Publisher<A>, Receiver: Stream<Item = B>>(
sender: Sender,
receiver: Receiver,
) -> (CoordinatedSink<A, Sender>, CoordinatedStream<Receiver>) {
) -> (CoordinatedPublisher<A, Sender>, CoordinatedStream<Receiver>) {
let state = Arc::new(ChannelState::new());
let sender = CoordinatedSink::new(sender, state.clone());
let sender = CoordinatedPublisher::new(sender, state.clone());
let receiver = CoordinatedStream::new(receiver, state.clone());

(sender, receiver)
Expand Down
6 changes: 3 additions & 3 deletions paladin-core/src/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use std::{

use anyhow::Result;
use async_trait::async_trait;
use futures::{Sink, Stream};
use futures::Stream;
use pin_project::{pin_project, pinned_drop};
use uuid::Uuid;

use crate::{acker::Acker, serializer::Serializable};
use crate::{acker::Acker, queue::Publisher, serializer::Serializable};

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ChannelType {
Expand All @@ -44,7 +44,7 @@ pub enum ChannelType {
/// is needed.
#[async_trait]
pub trait Channel {
type Sender<'a, T: Serializable + 'a>: Sink<T>;
type Sender<'a, T: Serializable + 'a>: Publisher<T>;
type Acker: Acker;
type Receiver<'a, T: Serializable + 'a>: Stream<Item = (T, Self::Acker)>;

Expand Down
13 changes: 6 additions & 7 deletions paladin-core/src/channel/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
//!
//! ```no_run
//! use paladin::{
//! queue::{Connection, amqp::{AMQPConnection, AMQPConnectionOptions}},
//! queue::{Connection, Publisher, amqp::{AMQPConnection, AMQPConnectionOptions}},
//! channel::{Channel, ChannelType, ChannelFactory, queue::QueueChannelFactory},
//! };
//! use uuid::Uuid;
//! use serde::{Serialize, Deserialize};
//! use anyhow::Result;
//! use futures::SinkExt;
//!
//! #[derive(Serialize, Deserialize)]
//! struct MyStruct {
Expand All @@ -41,7 +40,7 @@
//! // Get a sender pipe
//! let mut sender = channel.sender::<MyStruct>().await?;
//! // Dispatch a message
//! sender.send(MyStruct { field: "hello world".to_string() }).await?;
//! sender.publish(&MyStruct { field: "hello world".to_string() }).await?;
//!
//! Ok(())
//! }
Expand Down Expand Up @@ -94,8 +93,7 @@ use uuid::Uuid;
use crate::{
channel::{Channel, ChannelFactory, ChannelType},
queue::{
sink::QueueSink, Connection, DeliveryMode, QueueDurability, QueueHandle, QueueOptions,
SyndicationMode,
Connection, DeliveryMode, QueueDurability, QueueHandle, QueueOptions, SyndicationMode,
},
serializer::Serializable,
};
Expand Down Expand Up @@ -147,7 +145,7 @@ impl<
> Channel for QueueChannel<Conn>
{
type Acker = <QHandle as QueueHandle>::Acker;
type Sender<'a, T: Serializable + 'a> = QueueSink<'a, T, QHandle>;
type Sender<'a, T: Serializable + 'a> = <QHandle as QueueHandle>::Publisher<T>;
type Receiver<'a, T: Serializable + 'a> = <QHandle as QueueHandle>::Consumer<T>;

/// Close the underlying connection.
Expand All @@ -167,7 +165,8 @@ impl<
self.channel_type.into(),
)
.await?;
Ok(QueueSink::new(queue))

Ok(queue.publisher())
}

/// Get a receiver for the underlying queue.
Expand Down
Loading

0 comments on commit 62f4d1e

Please sign in to comment.