Skip to content

Commit

Permalink
v0.5.0 (#57)
Browse files Browse the repository at this point in the history
v0.5.0, updating to Tokio 1.0 and async-std 1.8
  • Loading branch information
austinjones authored Jan 11, 2021
1 parent 570454c commit b15ab23
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lifeline"
version = "0.4.0"
version = "0.5.0"
description = "Lifeline is a dependency injection library for asynchronous message-based applications."
keywords = ["async", "tokio", "async", "actor", "actors"]
categories = ["asynchronous", "rust-patterns", "web-programming"]
Expand Down Expand Up @@ -31,7 +31,7 @@ regex = "1.3"

tokio = { version = "1.0", default-features = false, optional = true }
tokio-stream = { version = "0.1", optional = true }
async-std = { version = "1.6", default-features = false, optional = true }
async-std = { version = "1.8", default-features = false, optional = true }

[dev-dependencies]
anyhow = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions examples/async-std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ mod bus {
lifeline_bus!(pub struct ExampleBus);

impl Message<ExampleBus> for ExampleSend {
type Channel = async_std::sync::Sender<Self>;
type Channel = async_std::channel::Sender<Self>;
}

impl Message<ExampleBus> for ExampleRecv {
type Channel = async_std::sync::Sender<Self>;
type Channel = async_std::channel::Sender<Self>;
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/channel/async_std.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Channel;
use crate::error::SendError as LifelineSendError;
use crate::{impl_channel_clone, impl_channel_take};
use async_std::sync::{channel, Receiver, Sender};
use async_std::channel::{bounded, Receiver, Sender};
use async_trait::async_trait;
use std::fmt::Debug;

Expand All @@ -10,7 +10,7 @@ impl<T: Send + 'static> Channel for Sender<T> {
type Rx = Receiver<T>;

fn channel(capacity: usize) -> (Self::Tx, Self::Rx) {
channel(capacity)
bounded(capacity)
}

fn default_capacity() -> usize {
Expand All @@ -27,7 +27,9 @@ where
T: Debug + Send,
{
async fn send(&mut self, value: T) -> Result<(), LifelineSendError<T>> {
Sender::send(self, value).await;
Sender::send(self, value)
.await
.map_err(|err| LifelineSendError::Return(err.0))?;

Ok(())
}
Expand Down
14 changes: 10 additions & 4 deletions src/channel/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ mod channel {

impl<T: Clone> Receiver<T> {
pub async fn recv(&mut self) -> Option<SubscriptionState<T>> {
self.rx.recv().await
match self.rx.changed().await {
Ok(_) => Some(self.rx.borrow().clone()),
Err(_) => None,
}
}
}

Expand Down Expand Up @@ -230,6 +233,8 @@ mod messages {
}

mod service {
use tokio::sync::watch;

use super::messages::{Subscription, SubscriptionState};
use crate::Task;
use crate::{Bus, Lifeline, Service};
Expand All @@ -248,7 +253,8 @@ mod service {

fn spawn(bus: &Self::Bus) -> Self::Lifeline {
let mut rx = bus.rx::<Subscription<T>>()?.into_inner();
let tx = bus.tx::<SubscriptionState<T>>()?.into_inner();
let tx: watch::Sender<SubscriptionState<T>> =
bus.tx::<SubscriptionState<T>>()?.into_inner();
let mut next_id = 0usize;
let lifeline = Self::try_task("run", async move {
let mut state = SubscriptionState::default();
Expand All @@ -260,7 +266,7 @@ mod service {
}

state.subscriptions.insert(id, next_id);
tx.broadcast(state.clone())?;
tx.send(state.clone())?;
next_id += 1;
}
Subscription::Unsubscribe(id) => {
Expand All @@ -269,7 +275,7 @@ mod service {
}

state.subscriptions.remove(&id);
tx.broadcast(state.clone())?;
tx.send(state.clone())?;
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/channel/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ where
T: Clone + Debug + Send + Sync,
{
async fn recv(&mut self) -> Option<T> {
watch::Receiver::recv(self).await
match self.changed().await {
Ok(_) => Some(self.borrow().clone()),
Err(_) => None,
}
}
}

0 comments on commit b15ab23

Please sign in to comment.