Skip to content

Commit

Permalink
Add tokio specific AsyncWaitGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Feb 16, 2024
1 parent b1388e3 commit fbb8d01
Show file tree
Hide file tree
Showing 10 changed files with 795 additions and 242 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ jobs:
run: rustup update stable --no-self-update && rustup default stable
- name: Test
run: cargo test --lib --no-default-features --features future

tokio:
name: tokio
strategy:
matrix:
os:
- ubuntu-latest
- macos-latest
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update stable --no-self-update && rustup default stable
- name: Test
run: cargo test --lib --no-default-features --features tokio

sync:
name: sync
strategy:
Expand All @@ -109,6 +127,7 @@ jobs:
name: cargo tarpaulin
runs-on: ubuntu-latest
needs:
- tokio
- future
- sync
- build
Expand Down
19 changes: 18 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ homepage = "https://github.com/al8n/wg"
repository = "https://github.com/al8n/wg.git"
documentation = "https://docs.rs/wg/"
readme = "README.md"
version = "0.6.2"
version = "0.7.0"
license = "MIT OR Apache-2.0"
keywords = ["waitgroup", "async", "sync", "notify", "wake"]
categories = ["asynchronous", "concurrency", "data-structures"]
Expand All @@ -20,16 +20,33 @@ parking_lot = ["dep:parking_lot"]

future = ["event-listener", "event-listener-strategy", "pin-project-lite"]

tokio = ["dep:tokio", "futures-core"]

[dependencies]
parking_lot = { version = "0.12", optional = true }
triomphe = { version = "0.1", optional = true }
event-listener = { version = "5", optional = true }
event-listener-strategy = { version = "0.5", optional = true }
pin-project-lite = { version = "0.2", optional = true }

tokio = { version = "1", default-features = false, optional = true, features = ["sync", "rt"] }
futures-core = { version = "0.3", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
async-std = { version = "1", features = ["attributes"] }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[test]]
name = "tokio"
path = "tests/tokio.rs"
required-features = ["tokio"]

[[test]]
name = "future"
path = "tests/future.rs"
required-features = ["future"]

67 changes: 57 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,41 @@ Golang like WaitGroup implementation for sync/async Rust.
<img alt="license" src="https://img.shields.io/badge/License-Apache%202.0/MIT-blue.svg?style=for-the-badge&fontColor=white&logoColor=f5c076&logo=" height="22">
</div>

## Installation
## Introduction

By default, blocking version `WaitGroup` is enabled, if you want to use non-blocking `AsyncWaitGroup`, you need to
enbale `future` feature in your `Cargo.toml`.
By default, blocking version `WaitGroup` is enabled.

If you are using `tokio`, you need to enable `tokio` feature in your `Cargo.toml` and use `wg::tokio::AsyncWaitGroup`.

If you are using other async runtime, you need to
enbale `future` feature in your `Cargo.toml` and use `wg::future::AsyncWaitGroup`.

### Sync

```toml
[dependencies]
wg = "0.6"
wg = "0.7"
```

### Async
### `tokio`

An async implementation for `tokio` runtime.

```toml
[dependencies]
wg = { version: "0.6", features = ["future"] }
wg = { version: "0.7", features = ["tokio"] }
```

### `future`

## Example
A more generic async implementation.

```toml
[dependencies]
wg = { version: "0.7", features = ["future"] }
```

## Instruction

### Sync

Expand Down Expand Up @@ -69,10 +83,10 @@ fn main() {
}
```

### Async
### `tokio`

```rust
use wg::AsyncWaitGroup;
use wg::tokio::AsyncWaitGroup;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{spawn, time::{sleep, Duration}};
Expand Down Expand Up @@ -100,9 +114,42 @@ async fn main() {
}
```

### `async-io`

```rust
use wg::future::AsyncWaitGroup;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use async_std::task::{spawn, block_on, sleep};

fn main() {
block_on(async {
let wg = AsyncWaitGroup::new();
let ctr = Arc::new(AtomicUsize::new(0));

for _ in 0..5 {
let ctrx = ctr.clone();
let t_wg = wg.add(1);
spawn(async move {
// mock some time consuming task
sleep(Duration::from_millis(50)).await;
ctrx.fetch_add(1, Ordering::Relaxed);

// mock task is finished
t_wg.done();
});
}

wg.wait().await;
assert_eq!(ctr.load(Ordering::Relaxed), 5);
});
}
```

## Acknowledgements

- Inspired by Golang sync.WaitGroup, [ibraheemdev's `AwaitGroup`] and [`crossbeam_utils::WaitGroup`].
- Inspired by Golang sync.WaitGroup and [`crossbeam_utils::WaitGroup`].

## License

Expand Down
30 changes: 30 additions & 0 deletions examples/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::{
spawn,
time::{sleep, Duration},
};
use wg::future::AsyncWaitGroup;

fn main() {
async_std::task::block_on(async {
let wg = AsyncWaitGroup::new();
let ctr = Arc::new(AtomicUsize::new(0));

for _ in 0..5 {
let ctrx = ctr.clone();
let t_wg = wg.add(1);
spawn(async move {
// mock some time consuming task
sleep(Duration::from_millis(50)).await;
ctrx.fetch_add(1, Ordering::Relaxed);

// mock task is finished
t_wg.done();
});
}

wg.wait().await;
assert_eq!(ctr.load(Ordering::Relaxed), 5);
});
}
4 changes: 2 additions & 2 deletions examples/axync.rs → examples/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use tokio::{
spawn,
time::{sleep, Duration},
};
use wg::AsyncWaitGroup;
use wg::tokio::AsyncWaitGroup;

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main]
async fn main() {
let wg = AsyncWaitGroup::new();
let ctr = Arc::new(AtomicUsize::new(0));
Expand Down
Loading

0 comments on commit fbb8d01

Please sign in to comment.