diff --git a/length_prefixed_stream/Cargo.toml b/length_prefixed_stream/Cargo.toml index 3a38538..332236d 100644 --- a/length_prefixed_stream/Cargo.toml +++ b/length_prefixed_stream/Cargo.toml @@ -4,11 +4,12 @@ edition = "2018" license = "BSD-3-Clause" name = "length-prefixed-stream" readme = "readme.md" -version = "1.0.0" +version = "1.1.0" [dependencies] -async-std = "1.9.0" desert = { path = "../desert" } -futures = "0.3.28" -futures-core = "0.3.28" +futures = { version = "0.3.29", default-features = false, features = ["std"] } pin-project-lite = "0.2.10" + +[dev-dependencies] +smol = "1.3.0" diff --git a/length_prefixed_stream/README.md b/length_prefixed_stream/README.md index d81e6cb..128e5b3 100644 --- a/length_prefixed_stream/README.md +++ b/length_prefixed_stream/README.md @@ -2,34 +2,42 @@ Decode a byte stream of varint length-encoded messages into a stream of chunks. -This crate is similar to and compatible with the +This crate is async-runtime agnostic and is similar to and compatible with the [javascript length-prefixed-stream](https://www.npmjs.com/package/length-prefixed-stream) package. -# example +# Example + +Note that we're using the [smol](https://crates.io/crates/smol) async runtime in this example. +One could just as easily use [tokio](https://crates.io/crates/tokio) or [async-std](https://crates.io/crates/async-std). ```rust -use async_std::{prelude::*, stream, task}; -use futures::stream::TryStreamExt; +use futures::stream::{self, TryStreamExt, StreamExt}; +use smol; + use length_prefixed_stream::decode; + type Error = Box; -// this program will print: +// This program will print: +// // [97,98,99,100,101,102] // [65,66,67,68] fn main() -> Result<(), Error> { - task::block_on(async { - let input = stream::from_iter(vec![ + smol::block_on(async { + let input = stream::iter(vec![ Ok(vec![6, 97, 98, 99]), Ok(vec![100, 101]), Ok(vec![102, 4, 65, 66]), Ok(vec![67, 68]), ]) .into_async_read(); + let mut decoder = decode(input); while let Some(chunk) = decoder.next().await { println!["{:?}", chunk?]; } + Ok(()) }) } diff --git a/length_prefixed_stream/examples/decode.rs b/length_prefixed_stream/examples/decode.rs index dba5271..d560685 100644 --- a/length_prefixed_stream/examples/decode.rs +++ b/length_prefixed_stream/examples/decode.rs @@ -1,25 +1,30 @@ -use async_std::{prelude::*, stream, task}; -use futures::stream::TryStreamExt; +use futures::stream::{self, StreamExt, TryStreamExt}; + use length_prefixed_stream::decode; + type Error = Box; -// this program will print: +// This program will print: +// // [97,98,99,100,101,102] // [65,66,67,68] fn main() -> Result<(), Error> { - task::block_on(async { - let input = stream::from_iter(vec![ + smol::block_on(async { + let input = stream::iter(vec![ Ok(vec![6, 97, 98, 99]), Ok(vec![100, 101]), Ok(vec![102, 4, 65, 66]), Ok(vec![67, 68]), ]) .into_async_read(); + let mut decoder = decode(input); + while let Some(chunk) = decoder.next().await { println!["{:?}", chunk?]; } + Ok(()) }) } diff --git a/length_prefixed_stream/src/lib.rs b/length_prefixed_stream/src/lib.rs index 8e5100f..fdb9aa3 100644 --- a/length_prefixed_stream/src/lib.rs +++ b/length_prefixed_stream/src/lib.rs @@ -16,9 +16,8 @@ mod unfold; use std::{collections::VecDeque, marker::Unpin}; -use async_std::{prelude::*, stream::Stream}; use desert::varint; -use futures::io::AsyncRead; +use futures::{io::AsyncRead, stream::Stream, AsyncReadExt}; pub use error::{DecodeError, DecodeErrorKind}; use unfold::unfold; diff --git a/length_prefixed_stream/src/unfold.rs b/length_prefixed_stream/src/unfold.rs index bf5758f..ea110da 100644 --- a/length_prefixed_stream/src/unfold.rs +++ b/length_prefixed_stream/src/unfold.rs @@ -1,4 +1,4 @@ -// vendored version of futures::stream::unfold +// Vendored version of futures::stream::unfold // modified to use async_std // The original source file from which this is derived is @@ -9,12 +9,13 @@ // https://github.com/rust-lang/futures-rs/blob/master/LICENSE-MIT // https://github.com/rust-lang/futures-rs/blob/master/LICENSE-APACHE -use async_std::future::Future; -use async_std::stream::Stream; -use async_std::task::{Context, Poll}; -use core::fmt; -use core::pin::Pin; -use futures_core::ready; +use core::{fmt, pin::Pin}; +use futures::{ + future::Future, + ready, + stream::Stream, + task::{Context, Poll}, +}; pub fn unfold(init: T, f: F) -> Unfold where diff --git a/length_prefixed_stream/tests/decode.rs b/length_prefixed_stream/tests/decode.rs index fdd3919..87c8c67 100644 --- a/length_prefixed_stream/tests/decode.rs +++ b/length_prefixed_stream/tests/decode.rs @@ -1,46 +1,52 @@ -use async_std::{prelude::*, stream, task}; -use futures::stream::TryStreamExt; +use futures::{stream, StreamExt, TryStreamExt}; + use length_prefixed_stream::decode; + type Error = Box; #[test] fn simple_0() -> Result<(), Error> { - task::block_on(async { - let input = stream::from_iter(vec![ + smol::block_on(async { + let input = stream::iter(vec![ Ok(vec![6, 97, 98, 99]), Ok(vec![100, 101]), Ok(vec![102, 4, 65, 66]), Ok(vec![67, 68]), ]) .into_async_read(); + let mut decoder = decode(input); let mut observed = vec![]; while let Some(chunk) = decoder.next().await { observed.push(chunk?); } + assert_eq![ observed, vec![vec![97, 98, 99, 100, 101, 102], vec![65, 66, 67, 68],] ]; + Ok(()) }) } #[test] fn simple_1() -> Result<(), Error> { - task::block_on(async { - let input = stream::from_iter(vec![ + smol::block_on(async { + let input = stream::iter(vec![ Ok(vec![3, 10, 20, 30, 5]), Ok(vec![11, 12, 13, 14, 15]), Ok(vec![1, 6, 3, 103]), Ok(vec![102, 101]), ]) .into_async_read(); + let mut decoder = decode(input); let mut observed = vec![]; while let Some(chunk) = decoder.next().await { observed.push(chunk?); } + assert_eq![ observed, vec![ @@ -50,14 +56,15 @@ fn simple_1() -> Result<(), Error> { vec![103, 102, 101], ] ]; + Ok(()) }) } #[test] fn multibyte_msg_len() -> Result<(), Error> { - task::block_on(async { - let input = stream::from_iter(vec![ + smol::block_on(async { + let input = stream::iter(vec![ Ok(vec![4, 200, 201, 202, 203, 144]), // encode(400) = [144,3] Ok([vec![3], (0..200).collect()].concat()), Ok((200..395).map(|c| (c % 256) as u8).collect()), @@ -67,11 +74,13 @@ fn multibyte_msg_len() -> Result<(), Error> { Ok(vec![55]), ]) .into_async_read(); + let mut decoder = decode(input); let mut observed = vec![]; while let Some(chunk) = decoder.next().await { observed.push(chunk?); } + assert_eq![ observed, vec![ @@ -82,6 +91,7 @@ fn multibyte_msg_len() -> Result<(), Error> { vec![55], ] ]; + Ok(()) }) } diff --git a/length_prefixed_stream/tests/options.rs b/length_prefixed_stream/tests/options.rs index 8ce6721..fa0433d 100644 --- a/length_prefixed_stream/tests/options.rs +++ b/length_prefixed_stream/tests/options.rs @@ -1,29 +1,34 @@ -use async_std::{prelude::*, stream, task}; -use futures::stream::TryStreamExt; +use futures::{stream, StreamExt, TryStreamExt}; + use length_prefixed_stream::{decode_with_options, DecodeOptions}; + type Error = Box; #[test] fn options_include_len() -> Result<(), Error> { - task::block_on(async { - let input = stream::from_iter(vec![ + smol::block_on(async { + let input = stream::iter(vec![ Ok(vec![6, 97, 98, 99]), Ok(vec![100, 101]), Ok(vec![102, 4, 65, 66]), Ok(vec![67, 68]), ]) .into_async_read(); + let mut options = DecodeOptions::default(); options.include_len = true; + let mut decoder = decode_with_options(input, options); let mut observed = vec![]; while let Some(chunk) = decoder.next().await { observed.push(chunk?); } + assert_eq![ observed, vec![vec![6, 97, 98, 99, 100, 101, 102], vec![4, 65, 66, 67, 68],] ]; + Ok(()) }) }