Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add virtual-rechunk example #520

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

thodson-usgs
Copy link
Contributor

Rechunk a virtual dataset

This example demonstrates how to rechunk a collection of necdf files on s3 into a single zarr store.

First, lithops and VirtualiZarr construct a virtual dataset comprised of the netcdf files on s3. Then, xarray-cubed rechunks the virtual dataset into a zarr. Inspired by the Pythia cookbook by @norlandrhagen.

STATUS

Fails during the chunk operation. Hoping someone with more experience might catch an obvious error on my part.

2024-07-25 09:19:10,806 [WARNING] future.py:254 -- ExecutorID 7a9116-3 | JobID M002 - CallID: 00000 - There was an exception - Activation ID: 65c097fe-6362-4aeb-8a77-074977f98559 - FileNotFoundError
2024-07-25 09:19:10,806 [INFO] invokers.py:107 -- ExecutorID 7a9116-3 | JobID M006 - Selected Runtime: virtualizarr-runtime - 2000MB
2024-07-25 09:19:10,806 [INFO] invokers.py:174 -- ExecutorID 7a9116-3 | JobID M006 - Starting function invocation: <lambda>() - Total: 1 activations
2024-07-25 09:19:10,806 [INFO] invokers.py:213 -- ExecutorID 7a9116-3 | JobID M006 - View execution logs at /private/var/folders/zr/jc46d6cs531df81bpgj4681sz25hhs/T/lithops-thodson/logs/7a9116-3-M006.log

The execution log complains FileNotFoundError: [Errno 2] No such file or directory: '/function/combined.json/.zmetadata':

Activation: 'virtualizarr-runtime' (445a08aa-60ba-48db-a3ee-659be326444f)
[
    2024-07-25 14:19:08,140 [INFO] handler.py:177 -- Lithops v3.4.1 - Starting AWS Lambda execution
    2024-07-25 14:19:08,140 [INFO] handler.py:178 -- Execution ID: 7a9116-3-M005/00000
    2024-07-25 14:19:08,243 [INFO] aws_s3.py:59 -- S3 client created - Region: us-west-2
    ----------------------- EXCEPTION !-----------------------
    Traceback (most recent call last):
      File "/function/lithops/worker/jobrunner.py", line 210, in run
        func = pickle.loads(self.job.func)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 33, in make_instance
        return cls(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 81, in __call__
        obj = super().__call__(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 713, in __init__
        for ref in self.references.values():
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 59, in __iter__
        for val in self._mapping.zmetadata.values():
                   ^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 141, in __getattr__
        self.setup()
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 148, in setup
        self._items[".zmetadata"] = self.fs.cat_file(
                                    ^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 773, in cat_file
        with self.open(path, "rb", **kwargs) as f:
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 1303, in open
        f = self._open(
            ^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py", line 191, in _open
        return LocalFileOpener(path, mode, fs=self, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py", line 355, in __init__
        self._open()
      File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py", line 360, in _open
        self.f = open(self.path, mode=self.mode)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    FileNotFoundError: [Errno 2] No such file or directory: '/function/combined.json/.zmetadata'
    ----------------------------------------------------------
    2024-07-25 14:19:09,259 [INFO] jobrunner.py:312 -- Process finished
]

@norlandrhagen
Copy link
Contributor

This is super cool @thodson-usgs! Excited to see a rechunking <-> virtualizarr <-> cubed example.

@norlandrhagen
Copy link
Contributor

@thodson-usgs have you tried a version of this with rechunker?

@thodson-usgs
Copy link
Contributor Author

@thodson-usgs have you tried a version of this with rechunker?

I haven't. Cubed uses the rechunker algorithm, but this could help to isolate the problem. I'll add that the workflow runs fine with Dask.

@TomNicholas
Copy link
Member

Looking at this a bit more closely @thodson-usgs ... So you you're doing two 3 pretty unrelated things in this example:

  1. Using virtualizarr on your specific netCDF files
  2. Using lithops to do a the concatenation of virtualizarr virtual datasets
  3. Using cubed to perform a rechunk

That is awesome (especially (2), which I haven't even tried to do myself yet), but I think we should make sure each of these steps works individually first.

The FileNotFoundError error you're currently seeing is completely something to do with virtualizarr / kerchunk, nothing to do with Cubed.

I still think that putting this rechunking example in the Cubed repo is the right call though, to show how Cubed is basically a superset of Rechunker. But perhaps it should be broken up - i.e. have an example of (1) and (2) together in the virtualizarr library, and the example (3) lives here. Then if you want a full end-to-end notebook example that's maybe more of a pythia-level thing. But the important thing is to get it working first.

Also note that you could test cubed's rechunk just by using xr.open_mfdataset to open your netcdf files. That would allow you to bypass any problems with virtualizarr until they are fixed upstream.

Speaking of which, when I try to use open_virtual_dataset on just one of your netcdf files, save it as kerchunk references, and open it with engine='kerchunk', I get a different error. I'll raise an issue on virtualizarr to track that (thanks for making your example so easily reproducible!)

@TomNicholas TomNicholas added bug Something isn't working xarray-integration Uses or required for cubed-xarray integration upstream Involves changes to an upstream library labels Jul 25, 2024
@thodson-usgs
Copy link
Contributor Author

The FileNotFoundError error you're currently seeing is completely something to do with virtualizarr / kerchunk, nothing to do with Cubed.

Debugging this has been tricky. Invariably something is inconsistent with my environment and the behavior changes. All that is good justification for breaking up the problem, which I'm happy to do in the long run. In the meantime, I'll try all your good suggestions.

On your last point, I'd reiterate that the workflow runs if I comment one line:

   # chunked_array_type='cubed',

which makes me think that this bug is not in lithops, virtualizarr, kerchunk, or dask...

@TomNicholas
Copy link
Member

On your last point, I'd reiterate that the workflow runs if I comment one line:

Okay what. That's really surprising to me and breaks my mental model of what's going on here.

breaking up the problem

Yes, let's try to get xr.open_dataset/open_mfdataset + cubed working without virtualizarr, and get virtualizarr working without lithops or cubed (i.e. zarr-developers/VirtualiZarr#201), then combine them all back together at the end.

@tomwhite
Copy link
Member

Thanks for opening this @thodson-usgs!

Using a local Lithops executor (or even just the default single-threaded Cubed executor on your local machine), might be helpful in isolating where the problem is.

@thodson-usgs
Copy link
Contributor Author

Indeed, there was an issue with my cubed environment, and I needed to repass my spec to .chunk or else the array was converted back to dask, which leads to the previous error. I suspect we might be hitting multiple bugs. I'll isolate on one here

Following @tomwhite's suggestion, I tried:

from cubed import Spec
spec = Spec(work_dir="tmp", allowed_mem="2GB")

combined_ds = xr.open_dataset('combined.json',
                              engine="kerchunk",
                              chunks={},
                              chunked_array_type='cubed', # optional so long as the spec is passed to .chunk
                              from_array_kwargs={'spec': spec}, #optional
                              )

combined_ds['Time'].attrs = {}  # to_zarr complains about attrs

rechunked_ds = combined_ds.chunk(
    chunks={'Time': 5, 'south_north': 25, 'west_east': 32},
    chunked_array_type='cubed',
    from_array_kwargs={'spec': spec},
)

# this succeeds
rechunked_ds.compute()

# but this fails
rechunked_ds.to_zarr('rechunked.zarr',
                     mode='w',
                     encoding={},  # TODO
                     consolidated=True,
                     safe_chunks=False,
                     )

which complains

ValueError: Arrays must have same spec in single computation. Specs: [cubed.Spec(work_dir=tmp, allowed_mem=2000000000, reserved_mem=0, executor=None, storage_options=None), cubed.Spec(work_dir=tmp, allowed_mem=2000000000, reserved_mem=0, executor=None, storage_options=None), cubed.Spec(work_dir=None, allowed_mem=200000000, reserved_mem=100000000, executor=None, storage_options=None)]

Apparently to_zarr is defaulting to another cubed spec, nor can I pass a spec to to_zarr.

If I comment every instance of # from_array_kwargs={'spec': spec} such that everything uses the default spec, the script will run to completion.

Anyway, I don't think this is the bug I'm looking for, so I'll keep at it.

@TomNicholas
Copy link
Member

🙏 for helping us surface these issues @thodson-usgs !

Indeed, there was an issue with my cubed environment, and I needed to repass my spec to .chunk or else the array was converted back to dask, which leads to the previous error.

That makes a lot more sense, but also it sounds like potentially a bug with xarray's ChunkManager system (which dispatches to dask or cubed as available).

Apparently to_zarr is defaulting to another cubed spec

Again this could be another problem with the ChunkManager, or maybe with cubed.

nor can I pass a spec to to_zarr

You should be able to pass cubed-related arguments via the chunkmanager_store_kwargs keyword argument to .to_zarr. Though I'm not sure you want to pass a spec in again.

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Aug 1, 2024

After resolving the regression in the recent version of VirtualiZarr, we still hit an error during the final rechunck operation that runs within to_zarr(). The same error occurs with .chunk(...).compute()

The script succeeds for certain cases:

  1. substituting a local Dask cluster
  2. or even a local cubed executor like
spec:
  work_dir: "tmp"
  allowed_mem: "2GB"

But it fails for the Lambda executor, both for the virtual dataset AND open_mfdataset() with different errors.
(at one point I was getting JSON serialization errors for both cases, but can't reproduce that now).

Furthermore, I don't have a great strategy for debugging a Lambda-only failure mode. In the meantime, I'll carefully rebuild my runtime environment.

Error messages

For the virtual dataset:

Traceback (most recent call last):
  File "/Users/thodson/Desktop/dev/software/cubed/examples/virtual-rechunk/rechunk-only.py", line 29, in <module>
    rechunked_ds.to_zarr("rechunked.zarr",
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/core/dataset.py", line 2549, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/api.py", line 1698, in to_zarr
    writes = writer.sync(
             ^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/common.py", line 267, in sync
    delayed_store = chunkmanager.store(
                    ^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed_xarray/cubedmanager.py", line 207, in store
    return store(
           ^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/ops.py", line 168, in store
    compute(*arrays, executor=executor, _return_in_memory_array=False, **kwargs)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/array.py", line 282, in compute
    plan.execute(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/plan.py", line 212, in execute
    executor.execute_dag(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 265, in execute_dag
    execute_dag(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 190, in execute_dag
    for _, stats in map_unordered(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 119, in map_unordered
    future.status(throw_except=True)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops_retries.py", line 83, in status
    reraise(*self.response_future._exception)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/six.py", line 719, in reraise
    raise value
  File "/function/lithops/worker/jobrunner.py", line 210, in run
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 33, in make_instance
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 81, in __call__
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 713, in __init__
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 59, in __iter__
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 141, in __getattr__
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/reference.py", line 148, in setup
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 773, in cat_file
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 1303, in open
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py", line 191, in _open
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py", line 355, in __init__
  File "/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py", line 360, in _open
FileNotFoundError: [Errno 2] No such file or directory: '/function/combined.json/.zmetadata'

For the open_mfdataset() case:

Traceback (most recent call last):
  File "/Users/thodson/Desktop/dev/software/cubed/examples/virtual-rechunk/mf-rechunk.py", line 46, in <module>
    rechunked_ds.to_zarr("rechunked.zarr",
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/core/dataset.py", line 2549, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/api.py", line 1698, in to_zarr
    writes = writer.sync(
             ^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/common.py", line 267, in sync
    delayed_store = chunkmanager.store(
                    ^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed_xarray/cubedmanager.py", line 207, in store
    return store(
           ^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/ops.py", line 168, in store
    compute(*arrays, executor=executor, _return_in_memory_array=False, **kwargs)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/array.py", line 282, in compute
    plan.execute(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/plan.py", line 212, in execute
    executor.execute_dag(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 265, in execute_dag
    execute_dag(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 190, in execute_dag
    for _, stats in map_unordered(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 119, in map_unordered
    future.status(throw_except=True)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops_retries.py", line 83, in status
    reraise(*self.response_future._exception)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/six.py", line 718, in reraise
    raise value.with_traceback(tb)
  File "/function/lithops/worker/jobrunner.py", line 210, in run
  File "/usr/local/lib/python3.11/site-packages/zarr/core.py", line 2570, in __setstate__
  File "/usr/local/lib/python3.11/site-packages/zarr/core.py", line 170, in __init__
  File "/usr/local/lib/python3.11/site-packages/zarr/core.py", line 193, in _load_metadata
  File "/usr/local/lib/python3.11/site-packages/zarr/core.py", line 204, in _load_metadata_nosync
zarr.errors.ArrayNotFoundError: array not found at path %r' "array not found at path %r' 'SNOWH'"

Here the error was associated with the SNOWH variable but the variable changes run to run.

@tomwhite
Copy link
Member

tomwhite commented Aug 1, 2024

Furthermore, I don't have a great strategy for debugging a Lambda-only failure mode.

Does it work with Cubed running on local Lithops (https://lithops-cloud.github.io/docs/source/compute_config/localhost.html)?

@thodson-usgs
Copy link
Contributor Author

Does it work with Cubed running on local Lithops (https://lithops-cloud.github.io/docs/source/compute_config/localhost.html)?

Good suggestion. This fails, but it does reproduce the JSON serilization error. I suppose the next step is to try entering the debugger within the container runtime or else litter print statements into the source until I've isolated the error. Should I dig into cubed or xarray-cubed, first? Do you have a suspect?

Traceback (most recent call last):
  File "/Users/thodson/Desktop/dev/software/cubed/examples/virtual-rechunk/rechunk-only.py", line 29, in <module>
    rechunked_ds.to_zarr("rechunked.zarr",
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/core/dataset.py", line 2549, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/api.py", line 1698, in to_zarr
    writes = writer.sync(
             ^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/xarray/backends/common.py", line 267, in sync
    delayed_store = chunkmanager.store(
                    ^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed_xarray/cubedmanager.py", line 207, in store
    return store(
           ^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/ops.py", line 168, in store
    compute(*arrays, executor=executor, _return_in_memory_array=False, **kwargs)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/array.py", line 282, in compute
    plan.execute(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/core/plan.py", line 212, in execute
    executor.execute_dag(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 265, in execute_dag
    execute_dag(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 190, in execute_dag
    for _, stats in map_unordered(
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops.py", line 93, in map_unordered
    futures = lithops_function_executor.map(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/cubed/runtime/executors/lithops_retries.py", line 120, in map
    futures_list = self.executor.map(
                   ^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/executors.py", line 276, in map
    futures = self.invoker.run_job(job)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/invokers.py", line 268, in run_job
    futures = self._run_job(job)
              ^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/invokers.py", line 210, in _run_job
    raise e
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/invokers.py", line 207, in _run_job
    self._invoke_job(job)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/invokers.py", line 255, in _invoke_job
    activation_id = self.compute_handler.invoke(payload)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/localhost/v2/localhost.py", line 140, in invoke
    self.env.run_job(job_payload)
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/site-packages/lithops/localhost/v2/localhost.py", line 225, in run_job
    self.work_queue.put(json.dumps(task_payload))
                        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/json/encoder.py", line 200, in encode
    chunks = self.iterencode(o, _one_shot=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/json/encoder.py", line 258, in iterencode
    return _iterencode(o, 0)
           ^^^^^^^^^^^^^^^^^
  File "/Users/thodson/micromamba/envs/cubed-dev/lib/python3.11/json/encoder.py", line 180, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type slice is not JSON serializable

@tomwhite
Copy link
Member

tomwhite commented Aug 2, 2024

Can you try with Lithops localhost version 1 - that's what we use in the Cubed unit tests as version 2 had some problems when we last tried it.

Another thing to try would be the Cubed processes executor which uses a different serialization mechanism to Lithops. If that worked then it would show that it's something to do with how Lithops is doing serialization.

Is the data to run this publicly available? I could try to look at it next week if there's a minimal reproducible example.

@thodson-usgs
Copy link
Contributor Author

Thanks for the suggestion, but neither worked for me.

To replicate the error, download the data

aws s3api get-object --region us-west-2 --no-sign-request --bucket wrf-se-ak-ar5 --key ccsm/rcp85/daily/2060/WRFDS_2060-01-01.nc WRFDS_2060-01-01.nc

then run this example.py:

import xarray as xr

combined_ds = xr.open_dataset("WRFDS_2060-01-01.nc", chunks={})

combined_ds['Time'].attrs = {}  # to_zarr complains about attrs

rechunked_ds = combined_ds.chunk(
    chunks={'Time': 1, 'south_north': 25, 'west_east': 32},
    chunked_array_type="cubed",
)

rechunked_ds.to_zarr("rechunked.zarr",
                     mode="w",
                     consolidated=True,
                     safe_chunks=False,
                     )

@tomwhite
Copy link
Member

tomwhite commented Aug 5, 2024

Thanks for the the instructions @thodson-usgs! I have managed to reproduce this - getting the Object of type slice is not JSON serializable error with the local lithops executor.

With the processes executor I got a different error: TypeError: run_func() got an unexpected keyword argument 'lock'. This led me to cubed-xarray's store function, which is being passed various arguments from Xarray that it doesn't know how to handle (namely flush, lock, compute, and regions):
https://github.com/pydata/xarray/blob/1ac19c4231e3e649392add503ae5a13d8e968aef/xarray/backends/common.py#L267-L274

When I remove these kwargs in a local copy of cubed-xarray, the rechunk works using the local processes executor.

I will look into writing a proper fix.

@TomNicholas
Copy link
Member

the rechunk works using the local processes executor.

Yay progress! Thank you @tomwhite !

This led me to cubed-xarray's store function, which is being passed various arguments from Xarray that it doesn't know how to handle (namely flush, lock, compute, and regions):

Ah my bad for apparently not properly testing that.

I will look into writing a proper fix.

Is that cubed's fault for not understanding dask-relevant kwargs, xarray's for passing them, or cubed-xarray somehow? We should raise an issue specifically to track this bug.

@tomwhite
Copy link
Member

tomwhite commented Aug 5, 2024

Is that cubed's fault for not understanding dask-relevant kwargs, xarray's for passing them, or cubed-xarray somehow? We should raise an issue specifically to track this bug.

Not sure. I have opened cubed-dev/cubed-xarray#14 to show a possible fix.

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Aug 30, 2024

Pebcak! This may have worked all along. The problem was I was passing local file paths off to lambda. I just moved the kerchunk json and target zarr to S3, and everything ran..albiet slowly. I should up the chunk sizes, but then this should be ready.
The question now is whether to contribute this to cubed or cubed-xarray...

I'll add a bit more explanation, but a minimal example is given below. We might reference the VirtualiZarr example for instruction on how to create combined.json rather than doing it here.

import xarray as xr
import fsspec

combined_ds = xr.open_dataset("s3://wma-uncertainty/scratch/combined.json",
                              engine="kerchunk",
                              chunks={},
                              chunked_array_type="cubed",
                              )

combined_ds['Time'].attrs = {}  # to_zarr complains about attrs

rechunked_ds = combined_ds.chunk(
    chunks={'Time': 5, 'south_north': 25, 'west_east': 32},
    chunked_array_type="cubed",
)

target = fsspec.get_mapper("s3://wma-uncertainty/scratch/rechunked.zarr",
                                                 client_kwargs={'region_name':'us-west-2'})


rechunked_ds.to_zarr(target,
                     mode="w",
                     consolidated=True,
                     safe_chunks=False,
                     )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working upstream Involves changes to an upstream library xarray-integration Uses or required for cubed-xarray integration
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants