Skip to content

Commit

Permalink
Move strict condition further up & fix typing
Browse files Browse the repository at this point in the history
  • Loading branch information
smheidrich committed Jul 11, 2024
1 parent 220afb3 commit dd3c2a0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
4 changes: 2 additions & 2 deletions aiostream/aiter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def anext(obj: AsyncIterator[T]) -> Awaitable[T]:


@overload
def anext(obj: AsyncIterator[T], default: U) -> Awaitable[T] | U:
def anext(obj: AsyncIterator[T], default: U) -> Awaitable[T | U]:
pass

Check warning on line 68 in aiostream/aiter_utils.py

View check run for this annotation

Codecov / codecov/patch

aiostream/aiter_utils.py#L68

Added line #L68 was not covered by tests


def anext(obj: AsyncIterator[T], default: Any = UNSET) -> Awaitable[T] | Any:
def anext(obj: AsyncIterator[T], default: Any = UNSET) -> Awaitable[T | Any]:
"""Access anext magic method."""
assert_async_iterator(obj)
if default is UNSET:
Expand Down
23 changes: 13 additions & 10 deletions aiostream/stream/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,23 @@ async def zip(
# Loop over items
_StopSentinelType = enum.Enum("_StopSentinelType", "STOP_SENTINEL")
STOP_SENTINEL = _StopSentinelType.STOP_SENTINEL
items: list[T]
while True:
coros = (
anext(streamer, STOP_SENTINEL) if strict else anext(streamer)
for streamer in streamers
)
try:
items = await asyncio.gather(*coros)
except StopAsyncIteration: # can only happen in non-strict mode
break
if strict:
if all(item == STOP_SENTINEL for item in items):
coros = (anext(streamer, STOP_SENTINEL) for streamer in streamers)
_items = await asyncio.gather(*coros)
if all(item == STOP_SENTINEL for item in _items):
break
elif any(item == STOP_SENTINEL for item in items):
elif any(item == STOP_SENTINEL for item in _items):
raise ValueError("iterables have different lengths")
# This holds because we've ruled out STOP_SENTINEL above:
items = cast(list[T], _items)
else:
coros = (anext(streamer) for streamer in streamers)
try:
items = await asyncio.gather(*coros)
except StopAsyncIteration:
break
yield tuple(items)


Expand Down

0 comments on commit dd3c2a0

Please sign in to comment.