Skip to content

Commit

Permalink
improving _run_delivery_handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniele Palaia authored and Daniele Palaia committed Oct 10, 2023
1 parent 7016cfc commit 25c6cbb
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(
self._last_heartbeat: float = 0
self._connection_closed_handler = connection_closed_handler
self._frames: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
self._is_not_closed: bool = True

def start_task(self, name: str, coro: Awaitable[None]) -> None:
assert name not in self._tasks
Expand Down Expand Up @@ -187,15 +188,16 @@ async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):

while True:
frame_entry = None
while self._is_not_closed:
frame_entry = await self._frames[subscriber_name].get()
try:
frame_entry = await self._frames[subscriber_name].get()
maybe_coro = handler(frame_entry)
if maybe_coro is not None:
await maybe_coro
except Exception:
logger.exception("Error while handling %s frame ", str(frame_entry.__class__))
except Exception as e:
logger.exception(
"Error while handling %s frame exception raised %s", frame_entry.__class__, e
)

async def _listener(self) -> None:
assert self._conn
Expand Down Expand Up @@ -284,6 +286,8 @@ async def close(self) -> None:

await self.stop_task("listener")

self._is_not_closed = False

for subscriber_name in self._frames:
await self.stop_task(f"run_delivery_handlers_{subscriber_name}")

Expand Down

0 comments on commit 25c6cbb

Please sign in to comment.