From 4c24d53344a03a4b2c1cfc6e95dc4b05118d6683 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 May 2023 12:38:46 +0200 Subject: [PATCH 1/4] Only return `Poll::Ready` from `Sender` if the channel is empty --- futures-channel/src/mpsc/mod.rs | 36 +++++++++++++++++++++++++++ futures-channel/src/mpsc/sink_impl.rs | 8 +----- futures-channel/tests/mpsc.rs | 31 +++++++++++++++++++++++ 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index edbac7aa38..5a3d308ffe 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -616,6 +616,25 @@ impl BoundedSenderInner { self.poll_unparked(Some(cx)).map(Ok) } + /// Polls the channel to determine if it is empty. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(()` if there are no messages in the channel; + /// - `Poll::Pending` if there are messages in the channel. + fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let state = decode_state(self.inner.state.load(SeqCst)); + if state.num_messages == 0 { + return Poll::Ready(()); + } + + // If there are messages in the channel, we must park the task unconditionally. + self.sender_task.lock().unwrap().task = Some(cx.waker().clone()); + return Poll::Pending; + } + /// Returns whether the senders send to the same receiver. fn same_receiver(&self, other: &Self) -> bool { Arc::ptr_eq(&self.inner, &other.inner) @@ -755,6 +774,23 @@ impl Sender { let ptr = self.0.as_ref().map(|inner| inner.ptr()); ptr.hash(hasher); } + + /// Polls the channel to determine if it is empty. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(()` if there are no messages in the channel or the [`Receiver`] is disconnected. + /// - `Poll::Pending` if there are messages in the channel. + pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let inner = match self.0.as_mut() { + None => return Poll::Ready(()), + Some(inner) => inner, + }; + + inner.poll_is_empty(cx) + } } impl UnboundedSender { diff --git a/futures-channel/src/mpsc/sink_impl.rs b/futures-channel/src/mpsc/sink_impl.rs index 1be20162c2..bf61046978 100644 --- a/futures-channel/src/mpsc/sink_impl.rs +++ b/futures-channel/src/mpsc/sink_impl.rs @@ -15,13 +15,7 @@ impl Sink for Sender { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match (*self).poll_ready(cx) { - Poll::Ready(Err(ref e)) if e.is_disconnected() => { - // If the receiver disconnected, we consider the sink to be flushed. - Poll::Ready(Ok(())) - } - x => x, - } + (*self).poll_is_empty(cx).map(Ok) } fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index f18fc3d66c..8e3e768d0e 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -60,6 +60,37 @@ fn send_recv_no_buffer() { })); } +#[test] +fn sink_poll_flush() { + // Run on a task context + block_on(poll_fn(move |cx| { + let (tx, rx) = mpsc::channel::(2); + pin_mut!(tx, rx); + + assert!(tx.as_mut().poll_flush(cx).is_ready()); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send two messages, `poll_flush` should be pending after each of them. + assert!(tx.as_mut().start_send(1).is_ok()); + assert!(tx.as_mut().poll_flush(cx).is_pending()); + + assert!(tx.as_mut().start_send(2).is_ok()); + assert!(tx.as_mut().poll_flush(cx).is_pending()); + + // Take first message + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().poll_flush(cx).is_pending()); + + // Take second message + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().poll_flush(cx).is_ready()); + + Poll::Ready(()) + })); +} + #[test] fn send_shared_recv() { let (mut tx1, rx) = mpsc::channel::(16); From d84a22775777d0d81b3b51a8d117e9c838bda7b0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 May 2023 12:41:17 +0200 Subject: [PATCH 2/4] Fix clippy lint --- futures-channel/src/mpsc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 5a3d308ffe..f9ca3d10c5 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -632,7 +632,7 @@ impl BoundedSenderInner { // If there are messages in the channel, we must park the task unconditionally. self.sender_task.lock().unwrap().task = Some(cx.waker().clone()); - return Poll::Pending; + Poll::Pending } /// Returns whether the senders send to the same receiver. From 96f22f2662666ddb8265047bd45c808292807f3e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 May 2023 15:17:30 +0200 Subject: [PATCH 3/4] Feature-gate new function --- futures-channel/src/mpsc/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index f9ca3d10c5..addc969835 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -624,6 +624,7 @@ impl BoundedSenderInner { /// /// - `Poll::Ready(()` if there are no messages in the channel; /// - `Poll::Pending` if there are messages in the channel. + #[cfg(feature = "sink")] fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> { let state = decode_state(self.inner.state.load(SeqCst)); if state.num_messages == 0 { @@ -783,6 +784,7 @@ impl Sender { /// /// - `Poll::Ready(()` if there are no messages in the channel or the [`Receiver`] is disconnected. /// - `Poll::Pending` if there are messages in the channel. + #[cfg(feature = "sink")] pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> { let inner = match self.0.as_mut() { None => return Poll::Ready(()), From 228f75b4c0afc5c0f64b9f1be177a597a9ef39f9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 May 2023 15:38:38 +0200 Subject: [PATCH 4/4] Fix miri failure --- futures/tests/async_await_macros.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/futures/tests/async_await_macros.rs b/futures/tests/async_await_macros.rs index e87a5bf05c..96f806fd29 100644 --- a/futures/tests/async_await_macros.rs +++ b/futures/tests/async_await_macros.rs @@ -90,8 +90,8 @@ fn select_streams() { _ = rx1.next() => panic!(), _ = rx2.next() => panic!(), default => { - tx1.send(2).await.unwrap(); - tx2.send(3).await.unwrap(); + tx1.feed(2).await.unwrap(); + tx2.feed(3).await.unwrap(); tx1_opt = Some(tx1); tx2_opt = Some(tx2); }