Skip to content

Commit

Permalink
fanout
Browse files Browse the repository at this point in the history
  • Loading branch information
timotheyca committed Dec 30, 2023
0 parents commit aa1d906
Show file tree
Hide file tree
Showing 13 changed files with 952 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
12 changes: 12 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "ruchei"
version = "0.1.0"
edition = "2021"

[dependencies]
futures-util = { version = "0.3.30", features = ["sink"] }
pin-project = "1"

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
async-tungstenite = { version = "0.24.0", features = ["async-std-runtime"] }
18 changes: 18 additions & 0 deletions examples/ws-buffered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use async_std::net::TcpListener;
use futures_util::StreamExt;
use ruchei::{concurrent::ConcurrentExt, echo::EchoExt, fanout_buffered::MulticastBuffered};

#[async_std::main]
async fn main() {
let streams = TcpListener::bind("127.0.0.1:8080").await.unwrap();
streams
.incoming()
.filter_map(|r| async { r.ok() })
.map(async_tungstenite::accept_async)
.concurrent()
.filter_map(|r| async { r.ok() })
.multicast_buffered(|_| {})
.echo()
.await
.unwrap();
}
18 changes: 18 additions & 0 deletions examples/ws-bufferless.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use async_std::net::TcpListener;
use futures_util::StreamExt;
use ruchei::{concurrent::ConcurrentExt, echo::EchoExt, fanout_bufferless::MulticastBufferless};

#[async_std::main]
async fn main() {
let streams = TcpListener::bind("127.0.0.1:8080").await.unwrap();
streams
.incoming()
.filter_map(|r| async { r.ok() })
.map(async_tungstenite::accept_async)
.concurrent()
.filter_map(|r| async { r.ok() })
.multicast_bufferless(|_| {})
.echo()
.await
.unwrap();
}
12 changes: 12 additions & 0 deletions examples/ws-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use ruchei::echo::EchoExt;

#[async_std::main]
async fn main() {
async_tungstenite::async_std::connect_async("ws://127.0.0.1:8080/")
.await
.unwrap()
.0
.echo()
.await
.unwrap();
}
9 changes: 9 additions & 0 deletions src/callback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub trait Callback<E>: Clone {
fn on_close(&self, error: Option<E>);
}

impl<E, F: Clone + Fn(Option<E>)> Callback<E> for F {
fn on_close(&self, error: Option<E>) {
self(error)
}
}
55 changes: 55 additions & 0 deletions src/concurrent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures_util::{
stream::{Fuse, FuturesUnordered},
Future, Stream, StreamExt,
};

#[pin_project::pin_project]
pub struct Concurrent<R, Fut> {
#[pin]
streams: Fuse<R>,
#[pin]
futures: FuturesUnordered<Fut>,
}

impl<Fut: Future, R: Stream<Item = Fut>> Stream for Concurrent<R, Fut> {
type Item = Fut::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
while let Poll::Ready(Some(future)) = this.streams.as_mut().poll_next(cx) {
this.futures.push(future)
}
match this.futures.poll_next(cx) {
Poll::Ready(None) if !this.streams.is_done() => Poll::Pending,
poll => poll,
}
}
}

impl<Fut, R: Stream<Item = Fut>> From<R> for Concurrent<R, Fut> {
fn from(streams: R) -> Self {
Self {
streams: streams.fuse(),
futures: Default::default(),
}
}
}

pub trait ConcurrentExt: Sized {
type Fut;

fn concurrent(self) -> Concurrent<Self, Self::Fut>;
}

impl<Fut, R: Stream<Item = Fut>> ConcurrentExt for R {
type Fut = Fut;

fn concurrent(self) -> Concurrent<Self, Self::Fut> {
self.into()
}
}
77 changes: 77 additions & 0 deletions src/echo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};

use futures_util::{stream::Fuse, Future, Sink, Stream, StreamExt};
use pin_project::pin_project;

#[pin_project]
pub struct Echo<T, S> {
#[pin]
stream: Fuse<S>,
queue: VecDeque<T>,
item: Option<T>,
started: bool,
}

impl<T, E, S: Stream<Item = Result<T, E>> + Sink<T, Error = E>> Future for Echo<T, S> {
type Output = Result<(), E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
while let Poll::Ready(Some(t)) = this.stream.as_mut().poll_next(cx)? {
this.queue.push_back(t);
}
loop {
match this.item.take() {
Some(item) => match this.stream.as_mut().poll_ready(cx)? {
Poll::Ready(()) => {
this.stream.as_mut().start_send(item)?;
*this.started = true;
}
Poll::Pending => {
*this.item = Some(item);
break;
}
},
None => match this.queue.pop_front() {
Some(item) => *this.item = Some(item),
None => {
break;
}
},
}
}
if *this.started && this.stream.as_mut().poll_flush(cx)?.is_ready() {
*this.started = false;
}
Poll::Pending
}
}

impl<T, E, S: Stream<Item = Result<T, E>>> From<S> for Echo<T, S> {
fn from(stream: S) -> Self {
Self {
stream: stream.fuse(),
queue: Default::default(),
item: None,
started: false,
}
}
}

pub trait EchoExt: Sized {
type T;

fn echo(self) -> Echo<Self::T, Self>;
}

impl<T, E, S: Stream<Item = Result<T, E>>> EchoExt for S {
type T = T;

fn echo(self) -> Echo<Self::T, Self> {
self.into()
}
}
Loading

0 comments on commit aa1d906

Please sign in to comment.