From 79ab386169dbae203a8ee15486d0af167f9933db Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Wed, 15 Jan 2025 13:58:43 +0800 Subject: [PATCH 1/3] sync: add `len` and `is_empty` methods to mpsc senders --- tokio/src/sync/mpsc/bounded.rs | 41 +++++++++++++++++++++- tokio/src/sync/mpsc/chan.rs | 18 ++++++++++ tokio/src/sync/mpsc/unbounded.rs | 41 +++++++++++++++++++++- tokio/tests/sync_mpsc.rs | 58 ++++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 99a3f0d5c4e..ef173fb8abc 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -515,7 +515,6 @@ impl Receiver { /// tx.send(0).await.unwrap(); /// assert!(!rx.is_empty()); /// } - /// /// ``` pub fn is_empty(&self) -> bool { self.chan.is_empty() @@ -1060,6 +1059,46 @@ impl Sender { self.chan.is_closed() } + /// Checks if a channel is empty. + /// + /// This method returns `true` if the channel has no messages. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::unbounded_channel(); + /// assert!(rx.is_empty()); + /// + /// tx.send(0).await.unwrap(); + /// assert!(!rx.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + self.chan.semaphore().bound - self.chan.semaphore().semaphore.available_permits() == 0 + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::unbounded_channel(); + /// assert_eq!(0, rx.len()); + /// + /// tx.send(0).await.unwrap(); + /// assert_eq!(1, rx.len()); + /// } + /// ``` + pub fn len(&self) -> usize { + self.chan.semaphore().bound - self.chan.semaphore().semaphore.available_permits() + } + /// Waits for channel capacity. Once capacity to send one message is /// available, it is reserved for the caller. /// diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 1e6eaab1798..2cb679b9291 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -44,6 +44,8 @@ pub(crate) trait Semaphore { fn add_permits(&self, n: usize); + fn available_permits(&self) -> usize; + fn close(&self); fn is_closed(&self) -> bool; @@ -193,6 +195,14 @@ impl Tx { } impl Tx { + pub(super) fn is_empty(&self) -> bool { + self.inner.semaphore.available_permits() == 0 + } + + pub(super) fn len(&self) -> usize { + self.inner.semaphore.available_permits() + } + pub(crate) fn is_closed(&self) -> bool { self.inner.semaphore.is_closed() } @@ -576,6 +586,10 @@ impl Semaphore for bounded::Semaphore { self.semaphore.release(n) } + fn available_permits(&self) -> usize { + self.semaphore.available_permits() + } + fn is_idle(&self) -> bool { self.semaphore.available_permits() == self.bound } @@ -610,6 +624,10 @@ impl Semaphore for unbounded::Semaphore { } } + fn available_permits(&self) -> usize { + self.0.load(Acquire) >> 1 + } + fn is_idle(&self) -> bool { self.0.load(Acquire) >> 1 == 0 } diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index a9232dc934c..7480a3a931c 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -382,7 +382,6 @@ impl UnboundedReceiver { /// tx.send(0).unwrap(); /// assert!(!rx.is_empty()); /// } - /// /// ``` pub fn is_empty(&self) -> bool { self.chan.is_empty() @@ -577,6 +576,46 @@ impl UnboundedSender { } } + /// Checks if a channel is empty. + /// + /// This method returns `true` if the channel has no messages. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::unbounded_channel(); + /// assert!(tx.is_empty()); + /// + /// tx.send(0).unwrap(); + /// assert!(!tx.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + self.chan.is_empty() + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::unbounded_channel(); + /// assert_eq!(0, tx.len()); + /// + /// tx.send(0).unwrap(); + /// assert_eq!(1, tx.len()); + /// } + /// ``` + pub fn len(&self) -> usize { + self.chan.len() + } + /// Completes when the receiver has dropped. /// /// This allows the producers to get notified when interest in the produced diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 577e9c35faa..25ec78b81da 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1039,6 +1039,64 @@ async fn test_tx_capacity() { assert_eq!(tx.max_capacity(), 10); } +#[tokio::test] +async fn test_bounded_tx_len() { + let (tx, mut rx) = mpsc::channel::<()>(10); + + // initially len should be 0 + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + + // queue one message, and len should be 1 + tx.send(()).await.unwrap(); + assert_eq!(tx.len(), 1); + assert!(!tx.is_empty()); + + // queue a second message, and len should be 2 + tx.send(()).await.unwrap(); + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + + // consume a message, and len should be 1 + let _ = rx.recv().await; + assert_eq!(tx.len(), 1); + assert!(!tx.is_empty()); + + // consume a final message, and len should be 0 + let _ = rx.recv().await; + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); +} + +#[tokio::test] +async fn test_unbounded_tx_len() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + // initially len should be 0 + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + + // queue one message, and len should be 1 + tx.send(()).unwrap(); + assert_eq!(tx.len(), 1); + assert!(!tx.is_empty()); + + // queue a second message, and len should be 2 + tx.send(()).unwrap(); + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + + // consume a message, and len should be 1 + let _ = rx.recv().await; + assert_eq!(tx.len(), 1); + assert!(!tx.is_empty()); + + // consume a final message, and len should be 0 + let _ = rx.recv().await; + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); +} + #[tokio::test] async fn test_rx_is_closed_when_calling_close_with_sender() { // is_closed should return true after calling close but still has a sender From 1689ba15c934090c56e4fc476d957f98ee8251da Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Wed, 15 Jan 2025 14:08:26 +0800 Subject: [PATCH 2/3] fix bounded sender len and is_empty examples --- tokio/src/sync/mpsc/bounded.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index ef173fb8abc..aeb178527c7 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1069,7 +1069,7 @@ impl Sender { /// /// #[tokio::main] /// async fn main() { - /// let (tx, rx) = mpsc::unbounded_channel(); + /// let (tx, rx) = mpsc::channel(10); /// assert!(rx.is_empty()); /// /// tx.send(0).await.unwrap(); @@ -1088,7 +1088,7 @@ impl Sender { /// /// #[tokio::main] /// async fn main() { - /// let (tx, rx) = mpsc::unbounded_channel(); + /// let (tx, rx) = mpsc::channel(10); /// assert_eq!(0, rx.len()); /// /// tx.send(0).await.unwrap(); From c879b7bf0ed785dc7278aec30fff27319687ea0d Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Wed, 22 Jan 2025 12:29:49 +0800 Subject: [PATCH 3/3] docs: correct bounded `Sender` `is_empty` and `len` examples --- tokio/src/sync/mpsc/bounded.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index aeb178527c7..4f2eecfcfe2 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1070,10 +1070,10 @@ impl Sender { /// #[tokio::main] /// async fn main() { /// let (tx, rx) = mpsc::channel(10); - /// assert!(rx.is_empty()); + /// assert!(tx.is_empty()); /// /// tx.send(0).await.unwrap(); - /// assert!(!rx.is_empty()); + /// assert!(!tx.is_empty()); /// } /// ``` pub fn is_empty(&self) -> bool { @@ -1089,10 +1089,10 @@ impl Sender { /// #[tokio::main] /// async fn main() { /// let (tx, rx) = mpsc::channel(10); - /// assert_eq!(0, rx.len()); + /// assert_eq!(0, tx.len()); /// /// tx.send(0).await.unwrap(); - /// assert_eq!(1, rx.len()); + /// assert_eq!(1, tx.len()); /// } /// ``` pub fn len(&self) -> usize {