Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

While registering a scheduler plugin get TypeError: PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc() takes 0 positional arguments but 1 was given #9001

Open
markcoletti opened this issue Feb 3, 2025 · 5 comments

Comments

@markcoletti
Copy link

Describe the issue:

When registering a Dask scheduler plugin via Scheduler.add_plugin, we get the following error:

TypeError: PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc() takes 0 positional arguments but 1 was given

The offending code is in distributed's core.py:

     def __getattr__(self, key):
        async def send_recv_from_rpc(**kwargs): # THIS IS WHERE THE ERROR IS RAISED
            if self.serializers is not None and kwargs.get("serializers") is None:
                kwargs["serializers"] = self.serializers
            if self.deserializers is not None and kwargs.get("deserializers") is None:
                kwargs["deserializers"] = self.deserializers
            comm = await self.pool.connect(self.addr)
            prev_name, comm.name = comm.name, "ConnectionPool." + key
            try:
                return await send_recv(comm=comm, op=key, **kwargs)
            finally:
                self.pool.reuse(self.addr, comm)
                comm.name = prev_name

Minimal Complete Verifiable Example:

Taken from your own example of a scheduler plugin, which DOES NOT WORK NOW.

from distributed import Client, LocalCluster, SchedulerPlugin

class MySchedulerPlugin(SchedulerPlugin):
    def __init__(self):
        self.counter = 0

    def transition(self, key, start, finish, *args, **kwargs):
        if start == 'processing' and finish == 'memory':
            self.counter += 1

    def restart(self, scheduler):
        self.counter = 0

if __name__ == '__main__':
    with Client() as client:
        print(f'scheduler={client.scheduler}')
        my_scheduler_plugin = MySchedulerPlugin()
        client.scheduler.add_plugin(my_scheduler_plugin)
    print('Done')

This produces the following;

scheduler=<pooled rpc to 'tcp://127.0.0.1:57991'>
Traceback (most recent call last):
  File "/Users/may/Projects/scratch/dask_scheduler/scheduler.py", line 22, in <module>
    client.scheduler.add_plugin(my_scheduler_plugin)
TypeError: PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc() takes 0 positional arguments but 1 was given

Anything else we need to know?:
N/A

Environment:

  • Dask version:
dask                    2024.10.0
distributed             2024.10.0

And the same behavior observed for 2024.11.2

  • Python version: 3.10.15 and 3.11.7
  • Operating System:
    • Darwin mac135909 24.2.0 Darwin Kernel Version 24.2.0: Fri Dec 6 18:56:34 PST 2024; root:xnu-11215.61.5~2/RELEASE_ARM64_T6020 arm64
    • Linux login05 5.14.21-150500.55.49_13.0.57-cray_shasta_c #1 SMP Sun May 12 13:35:37 UTC 2024 (33add2b) x86_64 x86_64 x86_64 GNU/Linux
  • Install method (conda, pip, source):
    • conda and pip
@hendrikmakait
Copy link
Member

Hi, please use Client.register_plugin(...) instead of the undocumened Client.scheduler.add_plugin. FWIW, Client.scheduler is considered private as it's not documented, but we may want to mark it explicitly as private.

@hendrikmakait
Copy link
Member

Would you be interested in submitting a PR that fixes the outdated example?

@renan-souza
Copy link

Hey @hendrikmakait thanks for your help! I work with @markcoletti and we are debugging this together.

The issue we are seeing is that cluster.scheduler.add_plugin behaves different than client.register_plugin for the scheduler plugin.

If we use client.register_plugin, cloudpickle.dumps is called, but our scheduler plugin can't be pickled (it's not serializable), so an error is thrown.

That's why we have been using cluster.scheduler.add_plugin which has been working fine as it doesn't seem to pickle anything, but recently we are seeing this PooledRPCCall error reported in the original issue.

Is there anything we could do to avoid having to cloud pickle our scheduler plugin, since it has a private attribute that can't be pickled?

@hendrikmakait
Copy link
Member

I assume you're trying to register a scheduler plugin? Otherwise we'd have to serialize your plugin anyhow to ship it to the workers, which means that you'd be out of luck.

That's why we have been using cluster.scheduler.add_plugin which has been working fine as it doesn't seem to pickle anything, but recently we are seeing this PooledRPCCall error reported in the original issue.

That's rather odd. cluster.scheduler.add_plugin also requires you to be able to serialize the plugin in order to register it, so this shouldn't work based on your description.

There are two possibilities that I see for you:

  1. Instead of initializing the non-serializable attribute upon __init__, defer it to SchedulerPlugin.start which executes on the scheduler when the plugin is added.
  2. Utilize Dask's preload functionality as shown here to avoid the transfer from the client.

@hendrikmakait
Copy link
Member

but recently we are seeing this PooledRPCCall error reported in the original issue.

Did you upgrade any packages recently?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants