Skip to content

Commit

Permalink
Merge #145
Browse files Browse the repository at this point in the history
145: Add Stream::poll_next r=stjepang a=stjepang

Adding `poll_next` to the `Stream` trait will simplify #125.

After a discussion with @yoshuawuyts and @withoutboats, we became confident that the `Stream` trait of the future will never solely rely on `async fn next()` and will always have to rely on `fn poll_next()`.

This PR now makes our `Stream` trait implementable by end users.

I also made a few adjustments around pinning to `all()` and `any()` combinators since they take a `&mut self`, which implies `Self: Unpin`. A rule of thumb is that if a method takes a `&mut self` and then pins `self`, we *have to* require `Self: Unpin`.

Co-authored-by: Stjepan Glavina <[email protected]>
  • Loading branch information
bors[bot] and Stjepan Glavina authored Sep 11, 2019
2 parents a296760 + 2497f4d commit 2ecaf18
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 61 deletions.
37 changes: 15 additions & 22 deletions src/stream/stream/all.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,36 @@
use crate::future::Future;
use crate::task::{Context, Poll};

use std::marker::PhantomData;
use std::pin::Pin;

#[derive(Debug)]
pub struct AllFuture<'a, S, F, T>
where
F: FnMut(T) -> bool,
{
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct AllFuture<'a, S, F, T> {
pub(crate) stream: &'a mut S,
pub(crate) f: F,
pub(crate) result: bool,
pub(crate) __item: PhantomData<T>,
pub(crate) _marker: PhantomData<T>,
}

impl<'a, S, F, T> AllFuture<'a, S, F, T>
where
F: FnMut(T) -> bool,
{
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(result: bool);
pin_utils::unsafe_unpinned!(f: F);
}
impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}

impl<S, F> Future for AllFuture<'_, S, F, S::Item>
where
S: futures_core::stream::Stream + Unpin + Sized,
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool,
{
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));

match next {
Some(v) => {
let result = (self.as_mut().f())(v);
*self.as_mut().result() = result;
let result = (&mut self.f)(v);
self.result = result;

if result {
// don't forget to wake this task again to pull the next item from stream
cx.waker().wake_by_ref();
Expand Down
37 changes: 15 additions & 22 deletions src/stream/stream/any.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,36 @@
use crate::future::Future;
use crate::task::{Context, Poll};

use std::marker::PhantomData;
use std::pin::Pin;

#[derive(Debug)]
pub struct AnyFuture<'a, S, F, T>
where
F: FnMut(T) -> bool,
{
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct AnyFuture<'a, S, F, T> {
pub(crate) stream: &'a mut S,
pub(crate) f: F,
pub(crate) result: bool,
pub(crate) __item: PhantomData<T>,
pub(crate) _marker: PhantomData<T>,
}

impl<'a, S, F, T> AnyFuture<'a, S, F, T>
where
F: FnMut(T) -> bool,
{
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(result: bool);
pin_utils::unsafe_unpinned!(f: F);
}
impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}

impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
where
S: futures_core::stream::Stream + Unpin + Sized,
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool,
{
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));

match next {
Some(v) => {
let result = (self.as_mut().f())(v);
*self.as_mut().result() = result;
let result = (&mut self.f)(v);
self.result = result;

if result {
Poll::Ready(true)
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/stream/stream/min_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::cmp::Ordering;
use std::pin::Pin;

use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinByFuture<S, F, T> {
stream: S,
Expand All @@ -27,7 +29,7 @@ impl<S, F, T> MinByFuture<S, F, T> {

impl<S, F> Future for MinByFuture<S, F, S::Item>
where
S: futures_core::stream::Stream + Unpin + Sized,
S: Stream + Unpin + Sized,
S::Item: Copy,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
Expand Down
72 changes: 62 additions & 10 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ use nth::NthFuture;

use std::cmp::Ordering;
use std::marker::PhantomData;
use std::pin::Pin;

use cfg_if::cfg_if;

use crate::task::{Context, Poll};

cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
Expand Down Expand Up @@ -83,6 +86,55 @@ pub trait Stream {
/// The type of items yielded by this stream.
type Item;

/// Attempts to receive the next item from the stream.
///
/// There are several possible return values:
///
/// * `Poll::Pending` means this stream's next value is not ready yet.
/// * `Poll::Ready(None)` means this stream has been exhausted.
/// * `Poll::Ready(Some(item))` means `item` was received out of the stream.
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::pin::Pin;
///
/// use async_std::prelude::*;
/// use async_std::stream;
/// use async_std::task::{Context, Poll};
///
/// fn increment(s: impl Stream<Item = i32> + Unpin) -> impl Stream<Item = i32> + Unpin {
/// struct Increment<S>(S);
///
/// impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
/// type Item = S::Item;
///
/// fn poll_next(
/// mut self: Pin<&mut Self>,
/// cx: &mut Context<'_>,
/// ) -> Poll<Option<Self::Item>> {
/// match Pin::new(&mut self.0).poll_next(cx) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(None) => Poll::Ready(None),
/// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
/// }
/// }
/// }
///
/// Increment(s)
/// }
///
/// let mut s = increment(stream::once(7));
///
/// assert_eq!(s.next().await, Some(8));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

/// Advances the stream and returns the next value.
///
/// Returns [`None`] when iteration is finished. Individual stream implementations may
Expand All @@ -108,7 +160,10 @@ pub trait Stream {
/// ```
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
where
Self: Unpin;
Self: Unpin,
{
NextFuture { stream: self }
}

/// Creates a stream that yields its first `n` elements.
///
Expand Down Expand Up @@ -312,13 +367,13 @@ pub trait Stream {
#[inline]
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item)
where
Self: Sized,
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
{
AllFuture {
stream: self,
result: true, // the default if the empty stream
__item: PhantomData,
_marker: PhantomData,
f,
}
}
Expand Down Expand Up @@ -436,13 +491,13 @@ pub trait Stream {
#[inline]
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item)
where
Self: Sized,
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
{
AnyFuture {
stream: self,
result: false, // the default if the empty stream
__item: PhantomData,
_marker: PhantomData,
f,
}
}
Expand All @@ -451,10 +506,7 @@ pub trait Stream {
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
type Item = <Self as futures_core::stream::Stream>::Item;

fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
where
Self: Unpin,
{
NextFuture { stream: self }
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures_core::stream::Stream::poll_next(self, cx)
}
}
6 changes: 4 additions & 2 deletions src/stream/stream/next.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::pin::Pin;

use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
use std::pin::Pin;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NextFuture<'a, T: Unpin + ?Sized> {
pub(crate) stream: &'a mut T,
}

impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
impl<T: Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
type Output = Option<T::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
9 changes: 5 additions & 4 deletions src/stream/stream/take.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::task::{Context, Poll};

use std::pin::Pin;

use crate::stream::Stream;
use crate::task::{Context, Poll};

/// A stream that yields the first `n` items of another stream.
#[derive(Clone, Debug)]
pub struct Take<S> {
Expand All @@ -11,12 +12,12 @@ pub struct Take<S> {

impl<S: Unpin> Unpin for Take<S> {}

impl<S: futures_core::stream::Stream> Take<S> {
impl<S: Stream> Take<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(remaining: usize);
}

impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> {
impl<S: Stream> futures_core::stream::Stream for Take<S> {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
Expand Down

0 comments on commit 2ecaf18

Please sign in to comment.