-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask_workflow.py
38 lines (25 loc) · 1.03 KB
/
task_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
BENCHMARK_NAME = "Async Task Workflow"
async def subtask(_: str) -> None:
await asyncio.sleep(0.1)
def future_callback(fut: asyncio.Future[str]) -> None:
result = fut.result()
loop = asyncio.get_event_loop()
for i in range(3):
sub_task_id = f"{result}_sub_{i}"
loop.create_task(subtask(sub_task_id))
async def consumer(fut: asyncio.Future[str], cid: int) -> None:
await asyncio.sleep(0.1)
fut.set_result(f"Consumer_{cid}")
async def main_task(tid: int) -> None:
loop = asyncio.get_event_loop()
fut = loop.create_future()
fut.add_done_callback(future_callback)
asyncio.create_task(consumer(fut, tid))
immediate_subtask = asyncio.create_task(subtask(f"main_{tid}"))
await immediate_subtask
async def main(num_tasks: int) -> None:
tasks = [asyncio.create_task(main_task(i)) for i in range(num_tasks)]
await asyncio.gather(*tasks)
def run(loop: asyncio.AbstractEventLoop, num_producers: int) -> None:
loop.run_until_complete(main(num_producers))