Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve callback handlers #134

Merged
merged 7 commits into from
Oct 11, 2023
Merged

Improve callback handlers #134

merged 7 commits into from
Oct 11, 2023

Conversation

DanielePalaia
Copy link
Collaborator

@DanielePalaia DanielePalaia commented Sep 28, 2023

This closes: #126
This closes: #123

This PR is running the delivery callback of the consumer in a different thread in order to let the listener unblocked to receive new frame.

It allows the defined consumer callback to be able to do synchronous requests like in the unit test

@DanielePalaia DanielePalaia marked this pull request as draft September 28, 2023 07:48
@DanielePalaia DanielePalaia force-pushed the improve_callback_handlers branch from 4c608a4 to f8822bc Compare September 28, 2023 08:39
@DanielePalaia DanielePalaia force-pushed the improve_callback_handlers branch from f8822bc to f06ccc9 Compare September 28, 2023 09:04
@DanielePalaia DanielePalaia marked this pull request as ready for review October 2, 2023 07:32
@Gsantomaggio
Copy link
Member

Hi, @garvenlee and @qweeze do you have time to leave feedback about that?
We are testing it and looks good

@garvenlee
Copy link

garvenlee commented Oct 2, 2023

let the listener unblocked to receive new frame

yes, it helps. Perhaps do the same for publish_confirm?

And when I use rstream, I may do some network I/O in subscriber_callback. So if the handler in run_delivery_task isn’t wrapped in asyncio.Task, message processing is actually performed sequentially.

Of course, it can be solved by aggregating these messages and using another coroutine to do this network I/O in the user side. But, If the subscriber_callback passed in by the user is already non-blocking, then there is no need to deliberately use such a run_delivery_task.

So I think it depends on whether the ball is thrown entirely to the user, or the library itself solves all the problems.

@garvenlee
Copy link

garvenlee commented Oct 2, 2023

message processing is actually performed sequentially

Consider using async for as #114 when consuming messages, the block under async for is also in sequence.

@garvenlee
Copy link

garvenlee commented Oct 2, 2023

Currently, before subscriber_callback completes, one Credit is sent back to RabbitMQ server in _on_delivery.

It's not clear to me how Credit specifically affects the rate of consumption. But I think the timing of writing it is important.

If subscriber_callback is wrapped in asyncio.Task, then in fact the task has not been processed, but RabbitMQ server is notified that the client can continue to consume, which I think it a bit unreasonable. It may cause a lots of pending tasks, which may be the reason for performance degradation.

Also, as mentioned in #132, the read operation in Connection may be slow, which affects the rate of consumption.

decode_frame in #133 too.

@qweeze
Copy link
Collaborator

qweeze commented Oct 4, 2023

the listener unblocked to receive new frame

Yes, this PR solves this problem but there's another one - long-running callbacks can block each other and cause queue overfilling. As @garvenlee mentioned, Deliver handlers are run sequentially, which may be not what the user expects

As for Credit frames, I'm not sure what's the correct way to handle them. I'm guessing this approach when we send credit frames no matter how many callbacks are pending and don't apply any backpressure isn't the safest way. Maybe check out other clients implementations (Java/Go)?

@Gsantomaggio
Copy link
Member

We should handle the back pressure in somehow. For example int the .NET client there is a setting

  • MaxInFlight = sent- confirmed

When MaxInFlight > xxxx the _semaphore blocks the send.

In the GoClient v1 there is a channel. It is not 100% perfect (yet).

@DanielePalaia
Copy link
Collaborator Author

DanielePalaia commented Oct 4, 2023

Yeah I made a few tests running callbacks asynchronously with asyncio.create_task, pretty much as we discussed in theconnected issue similarly to: https://github.com/qweeze/rstream/compare/making_user_callbacks_asynchronous

The idea we discussed was to wrap the callbacks inside asyncio.create_task()

Also I'm using a Semaphore in order to limit the number of tasks created.
Issue is that performance degrade quite a lot (I tuned the Semaphore with various settings without any improvements). It seems like it gets the maximum performance with 50 tasks with my laptop but still very slow (like double the time in order to consume 5 millions of messages).

Maybe we can start integrating this PR (which is at least solving an important issue and allow users to run synchronous calls inside callbacks that can be important for some user cases) and maybe come back later on the callback part.

@qweeze
Copy link
Collaborator

qweeze commented Oct 4, 2023

@DanielePalaia can you detect if performance degradation is caused by Semaphore, or by overhead of spawning asyncio.Tasks? Also could you show your code for benchmarks?

@garvenlee
Copy link

For the consumer of a plain queue, I know there is a basic_qos limiting the number of inflight unacked messages. With the qos setting, aiormq directly uses a task to wrap the customer_callback.

According to this, I think the consumption rate just depends on the user of rstream, instead of pulling messages into the client's local queue as quickly as possible.

@DanielePalaia
Copy link
Collaborator Author

DanielePalaia commented Oct 4, 2023

Well, I originally tried without the Semaphore, running as many tasks (asyncio.create_tasks) as possible and getting very bad results.
Then as discussing in the issue with garvenlee I Introduced a Semaphore. This is improving the situation for some Semaphore values (like 50 at least with my laptop). With lower and higher values of the Semaphore performance is even lower.

But still with the optimal value 50 is still much slower than the original solution.
Unfortunately I don't have anymore the test benchmark as then I discarded the solution but it was simply the basic consumer:
https://github.com/qweeze/rstream/blob/master/docs/examples/basic_consumers/basic_consumer.py

Reading like 1/5/7 millions items

if maybe_coro is not None:
await maybe_coro
except Exception as e:
logger.debug("Error reading item from _run_delivery_handlers: " + str(e))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to be a critical error, probably shouldn't be debug level. Also, currently we're just silently stopping _run_delivery_handlers task without raising any error, it's unclear what the user should do about it.

This try/except wraps both library code and user-provided callback, which is why it's difficult to handle errors carefully. But I think at least we should log an error message (so the user can get a monitoring alert) and if we choose to suppress such kind of errors, the loop shouldn't be stopped:

while True:
    try:
        ...
    except Exception:
        logger.exception("Error while handlilng %s frame", frame.__class__)
        # no break

while True:
try:
frame_entry = await self._frames.get()
maybe_coro = await frame_entry.handler(frame_entry.frame)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extra await here

This thing with if maybe_coro is not None was introduced to allow to have both sync and async handlers. The equivalent code would be

if asyncio.iscoroutinefunction(handler):
    await handler(frame)
else:
    handler(frame)

(but if the handler is sync, there's no point in moving it to another asyncio.Task)

maybe_coro = handler(frame)
if maybe_coro is not None:
await maybe_coro
if frame.__class__ == schema.Deliver:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This special handling of Deliver frame feels kinda hacky, maybe move this to Consumer._on_deliver?

@@ -107,6 +107,24 @@ async def consumer(pytestconfig, ssl_context):
await consumer.close()


@pytest.fixture()
async def test_consumer_close(pytestconfig, ssl_context):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixture fully duplicates the previous one, do we need it?

if maybe_coro is not None:
await maybe_coro
if frame.__class__ == schema.Deliver:
await self._frames.put(FrameEntry(handler, frame))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get a corresponding handler for a frame when receiving it from self_frames queue, it seems a bit redundant to put handler along with each frame

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @qweeze do you mean to get it from self._handlers in _run_delivery_handlers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind I think I've understood now!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean to get it from self._handlers in _run_delivery_handlers?

Yes, exactly

@qweeze
Copy link
Collaborator

qweeze commented Oct 4, 2023

But still with the optimal value 50 is still much slower than the original solution

What did you use as a callback function? I'd like to try to run some benchmarks with a "realistic" callback, maybe smth like

async def on_message(msg):
    # sumulate some CPU work
    time.sleep(0.001)
    # simulate a DB hit
    await asyncio.sleep(0.015)

@DanielePalaia
Copy link
Collaborator Author

DanielePalaia commented Oct 4, 2023

@qweeze It was a simple one mostly like counting the number of messages (through a global variable) and reporting the time after 1/3/5 millions of messages received.

We can try to redo the tests simulating some heavier operations

@qweeze
Copy link
Collaborator

qweeze commented Oct 4, 2023

It was a simple one mostly like counting the number of messages

I think this kind of usage is sort of unrealistic, because there's no point in async function if it doesn't do I/O - it's supposed to be slower than the sync one due to event loop overhead. In case if no I/O is needed, a user can provide a sync callback and the performance won't be affected at all (simply because the code under if maybe_coro is not None won't run)

But when the callback really needs to be async (because it's doing awaits), the results are different. I run a couple tests with the following settings:

LOOP = 5000
BATCH = 50
...
    async def on_message(msg: AMQPMessage, message_context: MessageContext):
        await asyncio.sleep(100 * 1e-6) # 100 us / 0.1 ms

        nonlocal cnt
        cnt += 1
        if cnt == LOOP * BATCH:
            print(time.perf_counter() - start_time)
            asyncio.create_task(consumer.close())

And I got 43.2 seconds on master and 9.5 on making_user_callbacks_asynchronous branch on my laptop

@DanielePalaia DanielePalaia force-pushed the improve_callback_handlers branch 2 times, most recently from 5a44507 to e911d01 Compare October 9, 2023 09:00
@DanielePalaia DanielePalaia force-pushed the improve_callback_handlers branch from e911d01 to dc88983 Compare October 9, 2023 09:28
@DanielePalaia
Copy link
Collaborator Author

DanielePalaia commented Oct 9, 2023

Hi,

Thanks a lot for your feedbacks on this issue/PR.
After an internal discussion with @Gsantomaggio we decided that at this time we will proceed with the solution described in this PR. This solution is currently in line with the other stream clients, and is improving the current situation. We would like to merge it if there are no other feedbacks

We also implemented a modification in the last days, in order to run a different task for every subscriber/consumer, so that in case a subscriber make heavily I/O on its callback the others can work concurrently.

If we will receive feedbacks from users were a solution like https://github.com/qweeze/rstream/compare/making_user_callbacks_asynchronous is preferable we will take it in consideration (we can also leave the issue open for future reference)

@DanielePalaia DanielePalaia force-pushed the improve_callback_handlers branch from 3017bd9 to 55e7ea0 Compare October 9, 2023 15:21
rstream/client.py Outdated Show resolved Hide resolved

if subscriber_name not in self._frames:
self.start_task(
"run_delivery_handlers" + subscriber_name,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: python since 3.6 has string interpolation

f"run_delivery_handlers_{subscriber_name}"

if maybe_coro is not None:
await maybe_coro
except Exception as e:
if frame_entry is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: frame_entry cannot be None here, I think this check is redundant

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I made this test because mypy was complaining about the possibility of frame_entry to be None, but now it doesn't anymore.. so OK!

rstream/client.py Outdated Show resolved Hide resolved
logger.exception("Error while handling %s frame ", str(frame_entry.__class__))
else:
logger.exception("Error while handling a frame " + str(e))
break
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this break, we probably shouldn't stop handling frames if there's an exception in a user-provided callback function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qweeze it seems like this specific modification makes one of the encoding test to remain pending.

I'm not able to reproduce the issue with various scenarios, and even here through github actions looks working fine.
But in my local environment this seems happening (without any exceptions raised).
Could you double check if you have time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is the info we have (from github action)

=============================== warnings summary ===============================
tests/test_encoding.py::test_encoding[frame26]
  /home/runner/.cache/pypoetry/virtualenvs/rstream-KivhoGsP-py3.9/lib/python3.9/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object BaseClient._run_delivery_handlers at 0x7f9b74cf68c0>
  
  Traceback (most recent call last):
    File "/home/runner/.cache/pypoetry/virtualenvs/rstream-KivhoGsP-py3.9/lib/python3.9/site-packages/_pytest/fixtures.py", line 642, in _compute_fixture_value
      with suppress(KeyError):
  RuntimeError: coroutine ignored GeneratorExit
  
    warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like something is happening when we are terminating the task causing that issue.
Originally in my laptop the test encoding frame24 was pending for this reason:

WARNING  rstream.client:client.py:202 Error while handling a frame Event loop is closed
ERROR    asyncio:base_events.py:1771 Task was destroyed but it is pending!
task: <Task cancelling name='Task-726' coro=<BaseClient._run_delivery_handlers() running at /Users/dpalaia/projects/rstream/rstream/client.py:None> wait_for=<Future cancelled> cb=[BaseClient.start_task.<locals>.on_task_done() at /Users/dpalaia/projects/rstream/rstream/client.py:104]>
WARNING  rstream.client:client.py:191 im here
WARNING  rstream.client:client.py:202 Error while handling a frame <Queue at 0x110fbd150 maxsize=0 _getters[1] tasks=1> is bound to a different event loop
WARNING  rstream.client:client.py:191 im here
WARNING  rstream.client:client.py:202 Error while handling a frame <Queue at 0x110fbd150 maxsize=0 _getters[1] tasks=1> is bound to a different event loop

asyncio.Queue shouldn't raise exceptions when get is blocking. So I tried to put outside the except block.
In this case the test is not pending anymore but I still had this issue:

Traceback (most recent call last):
    File "/Users/dpalaia/projects/rstream/rstream/client.py", line 191, in _run_delivery_handlers
      frame_entry = await self._frames[subscriber_name].get()
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File "/usr/local/Cellar/[email protected]/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/queues.py", line 160, in get
      getter.cancel()  # Just in case getter is not done yet.
      ^^^^^^^^^^^^^^^
    File "/usr/local/Cellar/[email protected]/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 761, in call_soon
      self._check_closed()
    File "/usr/local/Cellar/[email protected]/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 519, in _check_closed
      raise RuntimeError('Event loop is closed')
  RuntimeError: Event loop is closed

Maybe the best solution would be to rewrite the code in this way:
Removing the while true and terminating the task when we close() the client (in close() we also set self._is_not_closed to False.

        while self._is_not_closed:
            frame_entry = await self._frames[subscriber_name].get()
            try:
                maybe_coro = handler(frame_entry)
                if maybe_coro is not None:
                    await maybe_coro
            except Exception as e:
                logger.warning("Error while handling %s frame ", frame_entry.__class__)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's happening because test_encoding is the only sync test, for some reason the event loop gets closed before run_delivery_handlers task is finished.

also set self._is_not_closed to False

Yeah, that seems an OK thing to do

@@ -256,6 +286,9 @@ async def close(self) -> None:
resp_schema=schema.CloseResponse,
)

for subscriber_name in self._frames:
await self.stop_task("run_delivery_handlers" + subscriber_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to notice that we don't wait for all enqueued frames to be handled at shutdown, is it OK? Probably it's fine, since streams protocol has offsets to specify where to start from next time, but I'm not 100% sure

@qweeze
Copy link
Collaborator

qweeze commented Oct 9, 2023

I added some minor comments, but other than that this LGTM

@DanielePalaia DanielePalaia force-pushed the improve_callback_handlers branch from bb198bf to 25c6cbb Compare October 10, 2023 13:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make Handler in Client._listener a Task Consumer.close throws a RuntimeError
4 participants