Skip to content

Commit

Permalink
Merge pull request #10 from teohhanhui/bump/msrv-1.60
Browse files Browse the repository at this point in the history
Bump MSRV to 1.60
  • Loading branch information
teohhanhui authored May 11, 2022
2 parents 2110f1c + 15ea229 commit 81bfbd8
Show file tree
Hide file tree
Showing 18 changed files with 204 additions and 190 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ jobs:
with:
command: test
toolchain: ${{ matrix.rust }}
args: --no-fail-fast --features trace_futures -- --nocapture --test-threads=1
args: --no-fail-fast --features tracing -- --nocapture --test-threads=1
- name: Install wasm-pack
uses: ./.github/actions/install-wasm-pack
- name: Run tests (wasm32-unknown-unknown, node)
run: rustup run ${{ matrix.rust }} wasm-pack test --node -- --features trace_futures
run: rustup run ${{ matrix.rust }} wasm-pack test --node -- --features tracing
- name: Install cargo-wasi
uses: ./.github/actions/cargo
with:
Expand All @@ -110,7 +110,7 @@ jobs:
with:
command: wasi
toolchain: ${{ matrix.rust }}
args: test --no-fail-fast --features trace -- --nocapture --test-threads=1
args: test --no-fail-fast --features tracing -- --nocapture --test-threads=1
- name: Run doctests
uses: ./.github/actions/cargo
with:
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Added

- Add `trace` feature to crate
- Add `tracing` feature to crate

## v0.14.0 - 2022-01-14

Expand Down
27 changes: 19 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "callbag"
version = "0.14.0"
authors = ["Teoh Han Hui <[email protected]>"]
edition = "2021"
rust-version = "1.58"
rust-version = "1.60"
description = "Rust implementation of the callbag spec for reactive/iterable programming"
repository = "https://github.com/teohhanhui/callbag-rs"
license = "MIT OR Apache-2.0"
Expand All @@ -17,21 +17,21 @@ async_nursery = { version = "0.4.0", default-features = false, optional = true }
cfg-if = "1.0.0"
never = "0.1.0"
paste = { version = "1.0.6", optional = true }
tracing = { version = "0.1.29", optional = true }
tracing = { version = "0.1.31", optional = true }
tracing-futures = { version = "0.2.5", optional = true }

[dev-dependencies]
crossbeam-queue = "0.3.3"
paste = "1.0.6"
test-log = { version = "0.2.8", default-features = false, features = ["trace"] }
tracing = "0.1.29"
tracing = "0.1.31"
tracing-futures = "0.2.5"
tracing-subscriber = { version = "0.3.5", features = ["env-filter"] }

[target.'cfg(not(all(target_arch = "wasm32", target_os = "wasi")))'.dev-dependencies]
async-std = { version = "1.10.0", features = ["attributes", "unstable"] }
async_executors = { version = "0.5.1", features = ["async_std", "tracing"] }
async_nursery = { version = "0.4.0", features = ["tracing"] }
tracing-futures = "0.2.5"

[target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dev-dependencies]
async_executors = { version = "0.5.1", features = ["timer"] }
Expand All @@ -55,22 +55,33 @@ default = [
"take",
]
browser = []
combine = ["paste"]
combine = [
"dep:paste",
]
concat = []
filter = []
flatten = []
for_each = []
from_iter = []
interval = ["async_executors", "async_nursery"]
interval = [
"dep:async_executors",
"dep:async_nursery",
]
map = []
merge = []
pipe = []
scan = []
share = []
skip = []
take = []
trace = ["paste", "tracing"]
trace_futures = ["trace", "tracing-futures"]
tracing = [
"async_executors?/tracing",
"async_nursery?/futures", # https://github.com/najamelan/async_nursery/issues/4
"async_nursery?/tracing",
"dep:paste",
"dep:tracing",
"dep:tracing-futures",
]

[lib]
doctest = false
Expand Down
18 changes: 9 additions & 9 deletions src/combine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
Message, Source,
};

#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
use {std::fmt, tracing::Span};

/// Callbag factory that combines the latest data points from multiple (2 or more) callbag sources.
Expand Down Expand Up @@ -103,10 +103,10 @@ macro_rules! combine_impls {
}

impl<$(
#[cfg(not(feature = "trace"))] $T: 'static,
#[cfg(feature = "trace")] $T: fmt::Debug + 'static,
#[cfg(not(feature = "trace"))] [<S $T>]: 'static,
#[cfg(feature = "trace")] [<S $T>]: fmt::Debug + 'static,
#[cfg(not(feature = "tracing"))] $T: 'static,
#[cfg(feature = "tracing")] $T: fmt::Debug + 'static,
#[cfg(not(feature = "tracing"))] [<S $T>]: 'static,
#[cfg(feature = "tracing")] [<S $T>]: fmt::Debug + 'static,
)+> Combine for ($([<S $T>],)+)
where
$(
Expand All @@ -116,9 +116,9 @@ macro_rules! combine_impls {
{
type Output = ($($T,)+);

#[cfg_attr(feature = "trace", tracing::instrument(level = "trace"))]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))]
fn combine(self) -> Source<Self::Output> {
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let combine_fn_span = Span::current();
$(
let [<source_ $idx>] = self.$idx.into_arc_source();
Expand All @@ -141,7 +141,7 @@ macro_rules! combine_impls {
Arc::new(Default::default());
let talkback: Arc<Source<Self::Output>> = Arc::new(
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let combine_span = combine_span.clone();
let source_talkbacks = Arc::clone(&source_talkbacks);
move |message| {
Expand Down Expand Up @@ -212,7 +212,7 @@ macro_rules! combine_impls {
[<source_ $idx>],
Message::Handshake(Arc::new(
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let combine_span = combine_span.clone();
let sink = Arc::clone(&sink);
let n_start = Arc::clone(&n_start);
Expand Down
114 changes: 57 additions & 57 deletions src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
Message, Source,
};

#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
use {std::fmt, tracing::Span};

/// Callbag factory that concatenates the data from multiple (2 or more) callbag sources.
Expand All @@ -23,20 +23,67 @@ use {std::fmt, tracing::Span};
/// Works with both pullable and listenable sources.
///
/// See <https://github.com/staltz/callbag-concat/blob/db3ce91a831309057e165f344a87aa1615b4774e/readme.js#L29-L64>
#[cfg_attr(feature = "trace", tracing::instrument(level = "trace"))]
///
/// # Examples
///
/// ```
/// use crossbeam_queue::SegQueue;
/// use std::sync::Arc;
///
/// use callbag::{concat, for_each, from_iter};
///
/// let actual = Arc::new(SegQueue::new());
///
/// let source = concat!(from_iter(["10", "20", "30"]), from_iter(["a", "b"]));
///
/// for_each({
/// let actual = Arc::clone(&actual);
/// move |x| {
/// println!("{x}");
/// actual.push(x);
/// }
/// })(source);
///
/// assert_eq!(
/// &{
/// let mut v = vec![];
/// while let Some(x) = actual.pop() {
/// v.push(x);
/// }
/// v
/// }[..],
/// ["10", "20", "30", "a", "b"]
/// );
/// ```
#[macro_export]
macro_rules! concat {
($($s:expr),* $(,)?) => {
$crate::concat(::std::vec![$($s),*].into_boxed_slice())
};
}

/// Callbag factory that concatenates the data from multiple (2 or more) callbag sources.
///
/// It starts each source at a time: waits for the previous source to end before starting the next
/// source.
///
/// Works with both pullable and listenable sources.
///
/// See <https://github.com/staltz/callbag-concat/blob/db3ce91a831309057e165f344a87aa1615b4774e/readme.js#L29-L64>
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))]
#[doc(hidden)]
pub fn concat<
#[cfg(not(feature = "trace"))] T: 'static,
#[cfg(feature = "trace")] T: fmt::Debug + 'static,
#[cfg(not(feature = "trace"))] S: 'static,
#[cfg(feature = "trace")] S: fmt::Debug + 'static,
#[cfg(not(feature = "tracing"))] T: 'static,
#[cfg(feature = "tracing")] T: fmt::Debug + 'static,
#[cfg(not(feature = "tracing"))] S: 'static,
#[cfg(feature = "tracing")] S: fmt::Debug + 'static,
>(
sources: Box<[S]>,
) -> Source<T>
where
S: Into<Arc<Source<T>>> + Send + Sync,
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let concat_fn_span = Span::current();
let sources: Arc<Box<[Arc<Source<T>>]>> =
Arc::new(Vec::from(sources).into_iter().map(|s| s.into()).collect());
Expand All @@ -51,7 +98,7 @@ where
let got_pull = Arc::new(AtomicBool::new(false));
let talkback: Arc<Source<T>> = Arc::new(
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let concat_span = concat_span.clone();
let source_talkback = Arc::clone(&source_talkback);
let got_pull = Arc::clone(&got_pull);
Expand Down Expand Up @@ -101,7 +148,7 @@ where
let next_ref: Arc<ArcSwapOption<Box<dyn Fn() + Send + Sync>>> =
Arc::new(ArcSwapOption::from(None));
let next: Arc<Box<dyn Fn() + Send + Sync>> = Arc::new(Box::new({
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let concat_span = concat_span.clone();
let sources = Arc::clone(&sources);
let sink = Arc::clone(&sink);
Expand All @@ -116,7 +163,7 @@ where
sources[i.load(AtomicOrdering::Acquire)],
Message::Handshake(Arc::new(
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let concat_span = concat_span.clone();
let sink = Arc::clone(&sink);
let i = Arc::clone(&i);
Expand Down Expand Up @@ -187,50 +234,3 @@ where
})
.into()
}

/// Callbag factory that concatenates the data from multiple (2 or more) callbag sources.
///
/// It starts each source at a time: waits for the previous source to end before starting the next
/// source.
///
/// Works with both pullable and listenable sources.
///
/// See <https://github.com/staltz/callbag-concat/blob/db3ce91a831309057e165f344a87aa1615b4774e/readme.js#L29-L64>
///
/// # Examples
///
/// ```
/// use crossbeam_queue::SegQueue;
/// use std::sync::Arc;
///
/// use callbag::{concat, for_each, from_iter};
///
/// let actual = Arc::new(SegQueue::new());
///
/// let source = concat!(from_iter(["10", "20", "30"]), from_iter(["a", "b"]));
///
/// for_each({
/// let actual = Arc::clone(&actual);
/// move |x| {
/// println!("{x}");
/// actual.push(x);
/// }
/// })(source);
///
/// assert_eq!(
/// &{
/// let mut v = vec![];
/// while let Some(x) = actual.pop() {
/// v.push(x);
/// }
/// v
/// }[..],
/// ["10", "20", "30", "a", "b"]
/// );
/// ```
#[macro_export]
macro_rules! concat {
($($s:expr),* $(,)?) => {
$crate::concat(::std::vec![$($s),*].into_boxed_slice())
};
}
18 changes: 9 additions & 9 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
Message, Source,
};

#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
use {std::fmt, tracing::Span};

/// Callbag operator that conditionally lets data pass through.
Expand Down Expand Up @@ -50,12 +50,12 @@ use {std::fmt, tracing::Span};
/// );
/// ```
#[cfg_attr(
feature = "trace",
feature = "tracing",
tracing::instrument(level = "trace", skip(condition))
)]
pub fn filter<
#[cfg(not(feature = "trace"))] I: 'static,
#[cfg(feature = "trace")] I: fmt::Debug + 'static,
#[cfg(not(feature = "tracing"))] I: 'static,
#[cfg(feature = "tracing")] I: fmt::Debug + 'static,
F: 'static,
S,
>(
Expand All @@ -65,15 +65,15 @@ where
F: Fn(&I) -> bool + Clone + Send + Sync,
S: Into<Arc<Source<I>>>,
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let filter_fn_span = Span::current();
Box::new(move |source| {
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let _filter_fn_entered = filter_fn_span.enter();
let source: Arc<Source<I>> = source.into();
{
let condition = condition.clone();
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let filter_fn_span = filter_fn_span.clone();
move |message| {
instrument!(follows_from: &filter_fn_span, "filter", filter_span);
Expand All @@ -85,7 +85,7 @@ where
Message::Handshake(Arc::new(
{
let condition = condition.clone();
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let filter_span = filter_span.clone();
move |message| {
instrument!(parent: &filter_span, "source_talkback");
Expand All @@ -98,7 +98,7 @@ where
sink,
Message::Handshake(Arc::new(
{
#[cfg(feature = "trace")]
#[cfg(feature = "tracing")]
let filter_span = filter_span.clone();
move |message| {
instrument!(
Expand Down
Loading

0 comments on commit 81bfbd8

Please sign in to comment.