Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Added WeakSender to sync::broadcast::channel #7100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

tglane
Copy link
Contributor

@tglane tglane commented Jan 14, 2025

Motivation

Closing issue #7003. Add a type WeakSender to the sync::broadcast::channel similar to sync::maps::channel.

Solution

The new WeakSender type just stores an Arc<Shared<T>> just like the normal Sender but active WeakSenders will not prevent the channel from being closed if all Senders are dropped.

Closes #7003.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jan 14, 2025
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jan 27, 2025
Comment on lines +354 to +355
/// Number of outstanding weak Sender handles.
num_weak_tx: AtomicUsize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are adding this only to add a weak_count method? You definitely do not need SeqCst for that.

Comment on lines +1365 to +1367
pub fn is_closed(&self) -> bool {
// Channel is closed when there are no strong senders left active
self.shared.num_tx.load(Acquire) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man, the orderings in this file are a mess. Mixing SeqCst and non-SeqCst orderings on the same atomic is not a good idea. The correct orderings for counters is:

  • Increments happen with relaxed.
  • Decrements happen with acqrel.
  • Checks for zero happen with acquire.

We shouldn't need SeqCst for num_tx ever.

@@ -56,6 +56,7 @@ macro_rules! assert_closed {
trait AssertSend: Send + Sync {}
impl AssertSend for broadcast::Sender<i32> {}
impl AssertSend for broadcast::Receiver<i32> {}
impl AssertSend for broadcast::WeakSender<i32> {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also see the async_send_sync.rs file.

match self
.shared
.num_tx
.compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sufficient. When this succeeds, the operation is an increment for which relaxed is enough. When it fails, we might exit through the zero-check, so it needs acquire.

Suggested change
.compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
.compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add WeakSender for tokio::sync::broadcast
2 participants