Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior change in Future / task resolution for Client.submit with complicated arguments #8998

Open
TomAugspurger opened this issue Jan 31, 2025 · 4 comments

Comments

@TomAugspurger
Copy link
Member

Describe the issue:

I'm debugging a behavior change somewhere between distributed 2024.11.2 and 2024.12.1 that's affecting xgboost.

In https://github.com/dmlc/xgboost/blob/a46585a36c4bf30bfd58a2653fe8ae40beea25ce/python-package/xgboost/dask/__init__.py#L358-L488, there's some pretty complicated logic for laying splitting references to data.

I'm still trying to understand that code, but in the meantime here's a minimal(?) reproducer:

Minimal Complete Verifiable Example:

import dask
from dask.delayed import Delayed
from distributed import Client, LocalCluster, wait
import dask.array as da
import numpy as np
import dask.array.core


def func_plain(x: np.ndarray) -> str:
    assert isinstance(x, np.ndarray), x
    return type(x).__name__


def func_boxed(x: list[np.ndarray]) -> str:
    assert isinstance(x[0], np.ndarray), x
    return type(x).__name__


def main():
    with LocalCluster(n_workers=1) as cluster:
        with Client(cluster) as client:
            arr = da.random.uniform(size=(10, 10), chunks=(10, 10))
            arr = arr.persist()
            delayed = arr.to_delayed().flatten().tolist()

            # # client.submit(func, arg) works fine for a `Delayed` arg.
            # delayed_plain = [dask.delayed(x) for x in delayed]
            # fut_plain = client.compute(delayed_plain)
            # wait(fut_plain)
            # result = client.submit(func_plain, fut_plain[0]).result()
            # print(f"{result=}")

            # Boxing the Delayed object in a list or a dict causes the error
            boxed_parts: list[dict[str, Delayed]] = [[o] for o in delayed]

            # list[Delayed[list[Delayed[np.ndarray]]]]
            delayed_boxed_parts = [dask.delayed(x) for x in boxed_parts]

            fut_boxed_parts = client.compute(delayed_boxed_parts)
            wait(fut_boxed_parts)
            result = client.submit(func_boxed, fut_boxed_parts[0]).result()
            print(f"{result=}")


if __name__ == "__main__":
    main()

That completes fine with dask[complete]==2024.11.2. With dask[complete]==2024.12.1 that raises with an assertion error on the worker inside func_boxed:

2025-01-31 07:16:20,171 - distributed.worker - ERROR - Compute Failed
Key:       func_boxed-b2f2ed77eb007708d94c86cdd1e0640c
State:     executing
Task:  <Task 'func_boxed-b2f2ed77eb007708d94c86cdd1e0640c' func_boxed(...)>
Exception: "AssertionError([('uniform-4fe800e9f8fbe6f6234e25b7c6f797e7', 0, 0)])"
Traceback: '  File "/home/nfs/toaugspurger/gh/dmlc/xgboost/debug.py", line 15, in func_boxed\n    assert isinstance(x[0], np.ndarray), x\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n'

Traceback (most recent call last):
  File "/home/nfs/toaugspurger/gh/dmlc/xgboost/debug.py", line 45, in <module>
    main()
  File "/home/nfs/toaugspurger/gh/dmlc/xgboost/debug.py", line 40, in main
    result = client.submit(func_boxed, fut_boxed_parts[0]).result()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/raid/toaugspurger/envs/xgboost/lib/python3.12/site-packages/distributed/client.py", line 402, in result
    return self.client.sync(self._result, callback_timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nfs/toaugspurger/gh/dmlc/xgboost/debug.py", line 15, in func_boxed
    assert isinstance(x[0], np.ndarray), x
      ^^^^^^^^^^^^^^^^^
AssertionError: [('uniform-4fe800e9f8fbe6f6234e25b7c6f797e7', 0, 0)]

Previously, the worker got the data that's wrapped in the future (after unboxing several levels), so that func_boxed got an ndarray. Now it gets a task key.

Anything else we need to know?:

I'm still working to understand that xgboost code. Seeing all the Delayed in Delayed objects makes me think there might be room for simplification. In the meantime, I wanted to check if this was expected. The fact that we're getting a task key like (uniform-4fe..., 0, 0) makes me think it's not deliberate, but I know that the Task stuff has been under some flux.

The snippet includes a commented-out block that calls client.submit(func_plain, arg) where the argument is just a Future[ndarray]. I'm not sure, but the change in behavior might be in the handling of client.compute on this complicated data, rather than client.submit.

(Pdb) pp fut_plain[0]
<Future: finished, type: numpy.ndarray, key: ('uniform-48875bc65c093d89f0e4c416d5d471e5', 0, 0)>
(Pdb) pp fut_plain[0].result()[0, :2]
array([0.92037022, 0.86310934])
(Pdb) pp fut_boxed_parts[0]
<Future: finished, type: list, key: list-eaeba21a-5cce-4fbb-8b20-c3e4447988e2>
(Pdb) pp fut_boxed_parts[0].result()
[('uniform-48875bc65c093d89f0e4c416d5d471e5', 0, 0)]

Environment:

  • Dask version: the issue is reproducible with Dask 2024.12.1 through 2025.1.0. I haven't tested with main
  • Python version: 3.12
  • Operating System: linux
  • Install method (conda, pip, source): pip
@TomAugspurger
Copy link
Member Author

TomAugspurger commented Jan 31, 2025

Oh, the title at #8920 (Remove recursion in task spec) looks pretty darn relevant.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Jan 31, 2025

Two probably important points I missed earlier:

  1. the line
arr = arr.persist()

is necessary to reproduce the failure. If you just have a non-perisisted dask Array there's no issue. The difference is that the task graph of the persisted array contains a Future instead of a Task.

>>> dict(arr.dask)
{('uniform-36f326fdb1f2ebfb4909c489ff907b07',
  0,
  0): <Task ('uniform-36f326fdb1f2ebfb4909c489ff907b07', 0, 0) _apply_random(...)>}

>>> arr2 = arr.persist()
>>> dict(arr2.dask)
{('uniform-36f326fdb1f2ebfb4909c489ff907b07',
  0,
  0): <Future: finished, type: numpy.ndarray, key: ('uniform-36f326fdb1f2ebfb4909c489ff907b07', 0, 0)>}
  1. the Array.to_delayed() needs optimize_graph=True which is the default. If the graph isn't optimized there, then there's no error.

This gives a slightly simpler reproducer:

import dask
from distributed import LocalCluster
import dask.array as da
import numpy as np


def main():
    with LocalCluster(n_workers=1) as cluster:
        with cluster.get_client() as client:
            arr1 = da.random.uniform(size=(10,), chunks=(10,))
            arr2 = arr1.persist()

            a = dask.delayed([arr1.to_delayed().flatten().tolist()]).compute()
            b = dask.delayed([arr2.to_delayed().flatten().tolist()]).compute()

            assert isinstance(a[0][0], np.ndarray), a  # OK
            assert isinstance(b[0][0], np.ndarray), b  # error



if __name__ == "__main__":
    main()

which fails with

Traceback (most recent call last):
  File "/home/nfs/toaugspurger/gh/dask/distributed/bug2.py", line 22, in <module>
    main()
  File "/home/nfs/toaugspurger/gh/dask/distributed/bug2.py", line 17, in main
    assert isinstance(b[0][0], np.ndarray), b  # error
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: [[('uniform-f325b9030be3fffa906d1695ac839cc1', 0)]]

@TomAugspurger

This comment has been minimized.

@TomAugspurger
Copy link
Member Author

I've traced things to where we call convert_legacy_graph in dask.array.optimization.optimize: https://github.com/dask/dask/blob/5544405272578e4be4e780392e620f0c4fd1b19b/dask/array/optimization.py#L61

Going into that call, we have a dictionary with the Future as a value. After that, the dictionary is empty.

-> dsk = convert_legacy_graph(dsk)
(Pdb) pp dsk
{('uniform-cfc740e22295c5a0a775c36c67646be3', 0): <Future: finished, type: numpy.ndarray, key: ('uniform-cfc740e22295c5a0a775c36c67646be3', 0)>}
(Pdb) n
> /Users/toaugspurger/gh/dask/dask/dask/array/optimization.py(63)optimize()
-> dsk = fuse_linear_task_spec(dsk, keys=keys)
(Pdb) pp dsk
{}

That's because we reach see that the (converted) task is an Alias (which I think makes sense for a Future?) and the target of the alias is equal to the key at https://github.com/dask/dask/blob/5544405272578e4be4e780392e620f0c4fd1b19b/dask/_task_spec.py#L271-L273, so we don't put it into the converted task graph.

I think the problem, though, is that we don't ever have the target of the alias in the task graph, since no one else is putting it there. At this point, I might need some help from @fjetter or someone else familiar with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant