Skip to content

Commit

Permalink
removing handlers from frames structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniele Palaia authored and Daniele Palaia committed Oct 9, 2023
1 parent dc88983 commit 3017bd9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
13 changes: 7 additions & 6 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,25 +184,26 @@ async def start(self) -> None:
self.add_handler(schema.Heartbeat, self._on_heartbeat)
self.add_handler(schema.Close, self._on_close)

async def run_queue_listener_task(self, subscriber_name: str):
async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]):

if subscriber_name not in self._frames:
self.start_task(
"run_delivery_handlers" + subscriber_name, self._run_delivery_handlers(subscriber_name)
"run_delivery_handlers" + subscriber_name,
self._run_delivery_handlers(subscriber_name, handler),
)

async def _run_delivery_handlers(self, subscriber_name: str):
async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):

while True:
frame_entry = None
try:
frame_entry = await self._frames[subscriber_name].get()
maybe_coro = frame_entry.handler(frame_entry.frame)
maybe_coro = handler(frame_entry)
if maybe_coro is not None:
await maybe_coro
except Exception as e:
if frame_entry is not None:
logger.exception("Error while handling %s frame ", str(frame_entry.frame.__class__))
logger.exception("Error while handling %s frame ", str(frame_entry.__class__))
else:
logger.exception("Error while handling a frame " + str(e))
break
Expand Down Expand Up @@ -239,7 +240,7 @@ async def _listener(self) -> None:
for subscriber_name, handler in self._handlers.get(frame.__class__, {}).items():
try:
if frame.__class__ == schema.Deliver:
await self._frames[subscriber_name].put(FrameEntry(handler, frame))
await self._frames[subscriber_name].put(frame)
else:
maybe_coro = handler(frame)
if maybe_coro is not None:
Expand Down
4 changes: 3 additions & 1 deletion rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ async def subscribe(
offset=offset_specification.offset,
)

await subscriber.client.run_queue_listener_task(subscriber_name=subscriber.reference)
await subscriber.client.run_queue_listener_task(
subscriber_name=subscriber.reference, handler=partial(self._on_deliver, subscriber=subscriber)
)

subscriber.client.add_handler(
schema.Deliver,
Expand Down

0 comments on commit 3017bd9

Please sign in to comment.