Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MOD-2454: add async_chain #2418

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3ac8aad
async_concat
kramstrom Oct 28, 2024
9536aa0
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 28, 2024
78003c9
prints
kramstrom Oct 28, 2024
70c0251
cancellation
kramstrom Oct 29, 2024
9b7ac42
asyncio cancellation handling + tests
kramstrom Oct 29, 2024
88ac3c6
remove unneccessary aclosing
kramstrom Oct 29, 2024
fd9b6a8
typing
kramstrom Oct 29, 2024
7c5a7f0
aclosing
kramstrom Oct 29, 2024
5d86a18
timeout test
kramstrom Oct 29, 2024
b39bff7
timeout test
kramstrom Oct 29, 2024
69fda56
Merge branch 'main' into kramstrom/mod-2454-remove-aiostream-dependen…
kramstrom Oct 29, 2024
b562fac
Merge branch 'main' into kramstrom/mod-2454-remove-aiostream-dependen…
kramstrom Oct 30, 2024
4e6ea48
Merge branch 'main' into kramstrom/mod-2454-remove-aiostream-dependen…
kramstrom Oct 30, 2024
82be122
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 30, 2024
5a819e0
only take async gens
kramstrom Oct 30, 2024
1f1fb08
Merge branch 'kramstrom/mod-2454-remove-aiostream-dependency-cancella…
kramstrom Oct 30, 2024
d5ff477
async_conat --> async_chain
kramstrom Oct 30, 2024
b5a4b5f
aclose
kramstrom Oct 30, 2024
92dc574
Merge branch 'kramstrom/mod-2454-remove-aiostream-dependency-cancella…
kramstrom Oct 30, 2024
1bfa896
aclose
kramstrom Oct 30, 2024
c250694
Merge branch 'kramstrom/mod-2454-remove-aiostream-dependency-cancella…
kramstrom Oct 30, 2024
d0141c9
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 31, 2024
9fd7249
rename
kramstrom Oct 31, 2024
4f596d8
closing generators in concat
kramstrom Oct 31, 2024
ac98e86
remove redundant aclosing wrappers
kramstrom Oct 31, 2024
c33d97f
Merge branch 'main' of github.com:modal-labs/modal-client into kramst…
kramstrom Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions modal/_utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,21 @@ async def producer(generator: AsyncGenerator[T, None]):

async def callable_to_agen(awaitable: Callable[[], Awaitable[T]]) -> AsyncGenerator[T, None]:
yield await awaitable()


async def async_chain(*generators: AsyncGenerator[T, None]) -> AsyncGenerator[T, None]:
try:
for gen in generators:
async for item in gen:
yield item
finally:
first_exception = None
for gen in generators:
try:
await gen.aclose()
except BaseException as e:
if first_exception is None:
first_exception = e
logger.exception(f"Error closing async generator: {e}")
if first_exception is not None:
raise first_exception
173 changes: 173 additions & 0 deletions test/async_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from modal._utils.async_utils import (
TaskContext,
aclosing,
async_chain,
async_merge,
async_zip,
callable_to_agen,
Expand Down Expand Up @@ -723,6 +724,178 @@ async def foo():
assert result == [await foo()]


@pytest.mark.asyncio
async def test_async_chain():
async def gen1():
await asyncio.sleep(0.1)
yield 1
yield 2

async def gen2():
yield 3
await asyncio.sleep(0.1)
yield 4

async def gen3():
yield 5
yield 6

result = []
async for item in async_chain(gen1(), gen2(), gen3()):
result.append(item)

assert result == [1, 2, 3, 4, 5, 6]


@pytest.mark.asyncio
async def test_async_chain_sequential():
ev = asyncio.Event()

async def gen1():
await asyncio.sleep(0.1)
yield 1
await ev.wait()
yield 2

async def gen2():
yield 3
ev.set()
await asyncio.sleep(0.1)
yield 4

results = []

async def concat_coro():
async with aclosing(async_chain(gen1(), gen2())) as stream:
async for item in stream:
results.append(item)

concat_task = asyncio.create_task(concat_coro())
await asyncio.sleep(0.5)
concat_task.cancel()
with pytest.raises(asyncio.CancelledError):
await concat_task

assert results == [1]


@pytest.mark.asyncio
async def test_async_chain_exception():
# test exception bubbling up
result = []
states = []

async def gen1():
states.append("enter 1")
try:
yield 1
yield 2
finally:
states.append("exit 1")

async def gen2():
states.append("enter 2")
try:
await asyncio.sleep(0.1)
yield 3
raise SampleException("test")
yield 4
finally:
await asyncio.sleep(0)
states.append("exit 2")

with pytest.raises(SampleException):
async for item in async_chain(gen1(), gen2()):
result.append(item)

assert result == [1, 2, 3]
assert states == ["enter 1", "exit 1", "enter 2", "exit 2"]


@pytest.mark.asyncio
async def test_async_chain_cancellation():
ev = asyncio.Event()

async def gen1():
await asyncio.sleep(0.1)
yield 1
await ev.wait()
raise asyncio.CancelledError()
yield 2

async def gen2():
yield 3
await asyncio.sleep(0.1)
yield 4

async def concat_coro():
async with aclosing(async_chain(gen1(), gen2())) as stream:
async for _ in stream:
pass

concat_task = asyncio.create_task(concat_coro())
await asyncio.sleep(0.1)
concat_task.cancel()
with pytest.raises(asyncio.CancelledError):
await concat_task


@pytest.mark.asyncio
async def test_async_chain_producer_cancellation():
async def gen1():
await asyncio.sleep(0.1)
yield 1
raise asyncio.CancelledError()
yield 2

async def gen2():
yield 3
await asyncio.sleep(0.1)
yield 4

await asyncio.sleep(0.1)
with pytest.raises(asyncio.CancelledError):
async with aclosing(async_chain(gen1(), gen2())) as stream:
async for _ in stream:
pass


@pytest.mark.asyncio
async def test_async_chain_cleanup():
# test cleanup of generators
result = []
states = []

async def gen1():
states.append("enter 1")
try:
await asyncio.sleep(0.1)
yield 1
yield 2
finally:
await asyncio.sleep(0)
states.append("exit 1")

async def gen2():
states.append("enter 2")
try:
yield 3
await asyncio.sleep(0.1)
yield 4
finally:
await asyncio.sleep(0)
states.append("exit 2")

async with aclosing(async_chain(gen1(), gen2())) as stream:
async for item in stream:
result.append(item)
if item == 3:
break

assert result == [1, 2, 3]
assert states == ["enter 1", "exit 1", "enter 2", "exit 2"]


def test_sigint_run_async_gen_shuts_down_gracefully():
code = textwrap.dedent(
"""
Expand Down
Loading