From 0d4c55d13ded51506f8f689982abe54fa7e03fb5 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Wed, 14 Aug 2024 16:11:14 +0200 Subject: [PATCH] chore(examples): replace `async-stream` usage by explicit streams Signed-off-by: Roman Volosatovs --- Cargo.lock | 25 +------------ Cargo.toml | 1 - examples/rust/streams-nats-client/Cargo.toml | 2 +- examples/rust/streams-nats-client/src/main.rs | 37 +++++++++++-------- examples/rust/streams-nats-server/Cargo.toml | 1 - 5 files changed, 23 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f0fe8c0..5f123aac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,28 +157,6 @@ dependencies = [ "url", ] -[[package]] -name = "async-stream" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.81" @@ -2238,11 +2216,11 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats-wrpc", - "async-stream", "bytes", "clap", "futures", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "url", @@ -2256,7 +2234,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-nats-wrpc", - "async-stream", "bytes", "clap", "futures", diff --git a/Cargo.toml b/Cargo.toml index 48c9c6a9..14a0c1d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,6 @@ wrpc-cli = { workspace = true } [workspace.dependencies] anyhow = { version = "1", default-features = false } async-nats = { package = "async-nats-wrpc", version = "0.35.1", default-features = false } -async-stream = { version = "0.3", default-features = false } bitflags = { version = "2", default-features = false } bytes = { version = "1", default-features = false } clap = { version = "4", default-features = false } diff --git a/examples/rust/streams-nats-client/Cargo.toml b/examples/rust/streams-nats-client/Cargo.toml index 9a8dd4e7..5adc38a3 100644 --- a/examples/rust/streams-nats-client/Cargo.toml +++ b/examples/rust/streams-nats-client/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true [dependencies] anyhow = { workspace = true } async-nats = { workspace = true } -async-stream = { workspace = true } bytes = { workspace = true } clap = { workspace = true, features = [ "color", @@ -24,6 +23,7 @@ clap = { workspace = true, features = [ ] } futures = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-stream = { workspace = true, features = ["time"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = [ "ansi", diff --git a/examples/rust/streams-nats-client/src/main.rs b/examples/rust/streams-nats-client/src/main.rs index 4741731a..8c9591b7 100644 --- a/examples/rust/streams-nats-client/src/main.rs +++ b/examples/rust/streams-nats-client/src/main.rs @@ -1,12 +1,12 @@ use core::time::Duration; use anyhow::Context as _; -use async_stream::stream; use bytes::Bytes; use clap::Parser; -use futures::StreamExt as _; -use tokio::time::sleep; -use tokio::{sync::mpsc, try_join}; +use futures::{stream, StreamExt as _}; +use tokio::sync::mpsc; +use tokio::{time, try_join}; +use tokio_stream::wrappers::IntervalStream; use tracing::debug; use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::util::SubscriberInitExt as _; @@ -50,18 +50,23 @@ async fn main() -> anyhow::Result<()> { .await .context("failed to connect to NATS.io")?; for prefix in prefixes { - let numbers = Box::pin(stream! { - for i in 1..=10 { - yield vec![i]; - sleep(Duration::from_secs(1)).await; - } - }); - let bytes = Box::pin(stream! { - for i in 1..=10 { - yield Bytes::from(i.to_string()); - sleep(Duration::from_secs(1)).await; - } - }); + let numbers = Box::pin( + stream::iter(1..) + .take(10) + .zip(IntervalStream::new(time::interval(Duration::from_secs(1)))) + .map(|(i, _)| i) + .ready_chunks(10), + ); + + // `stream` items are chunked using [`Bytes`] + let bytes = Box::pin( + stream::iter(b"foo bar baz") + .zip(IntervalStream::new(time::interval(Duration::from_secs(1)))) + .map(|(i, _)| *i) + .ready_chunks(10) + .map(Bytes::from), + ); + let wrpc = wrpc_transport_nats::Client::new(nats.clone(), prefix.clone(), None); let (mut numbers, mut bytes, io) = echo(&wrpc, None, Req { numbers, bytes }) .await diff --git a/examples/rust/streams-nats-server/Cargo.toml b/examples/rust/streams-nats-server/Cargo.toml index cf943727..3ade2092 100644 --- a/examples/rust/streams-nats-server/Cargo.toml +++ b/examples/rust/streams-nats-server/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true [dependencies] anyhow = { workspace = true } async-nats = { workspace = true } -async-stream = { workspace = true } bytes = { workspace = true } clap = { workspace = true, features = [ "color",