Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelization of IO for readers #99

Open
mgrover1 opened this issue Mar 17, 2023 · 4 comments
Open

Improve parallelization of IO for readers #99

mgrover1 opened this issue Mar 17, 2023 · 4 comments

Comments

@mgrover1
Copy link
Collaborator

  • xradar version: 0.1.0

Description

We should take a look at how we can speed up the xarray backends, and if there are more levels of parallelization possible.

I wonder if upstream enhancements of xarray
pydata/xarray#7437

Might help with this, enabling us to plug in the io directly/benefit from more parallelization here.

What I Did

I read the data the following code:

import xarray as xr
import xradar as xd
import numpy as np

def fix_angle(ds):
    """
    Aligns the radar volumes
    """
    ds["time"] = ds.time.load()  # Convert time from dask to numpy

    start_ang = 0  # Set consistent start/end values
    stop_ang = 360

    # Find the median angle resolution
    angle_res = 0.5

    # Determine whether the radar is spinning clockwise or counterclockwise
    median_diff = ds.azimuth.diff("time").median()
    ascending = median_diff > 0
    direction = 1 if ascending else -1

    # first find exact duplicates and remove
    ds = xd.util.remove_duplicate_rays(ds)

    # second reindex according to retrieved parameters
    ds = xd.util.reindex_angle(
        ds, start_ang, stop_ang, angle_res, direction, method="nearest"
    )

    ds = ds.expand_dims("volume_time")  # Expand for volumes for concatenation

    ds["volume_time"] = [np.nanmin(ds.time.values)]

    return ds

ds = xr.open_mfdataset(
    files,
    preprocess=fix_angle,
    engine="cfradial1",
    group="sweep_0",
    concat_dim="volume_time",
    combine="nested",
    chunks={'range':250},
)

Which resulted in this task graph, where the green is the open_dataset function.
Screenshot 2023-03-17 at 10 25 50 AM

Which has quite a bit of whitespace/could use some optimization.

@kmuehlbauer
Copy link
Collaborator

I've tried to reproduce this with some of our ODIM_H5 data with similar outcome.

dask

@kmuehlbauer
Copy link
Collaborator

@mgrover1 I've tried to track that down now using some GAMIC source data from our BoXPol radar.

In the normal case I get the above shown white spaces in the task graph.

If I remove the additional lines from the gamic open_dataset-function after store_entrypoint.open_dataset:

ds = store_entrypoint.open_dataset(

the call to open_mfdataset returns without triggering any dask-operation.

Only if I .load(), .compute() or otherwise trigger a computation(eg.plotting), the files are accessed and the data is loaded and processed.

That leads to the task graph's as shown below:

One Timestep Single Moment of 15 (time: 12, azimuth: 360, range: 750):
dask-gamic-01

All Timesteps Single Moment of 15 (time: 12, azimuth: 360, range: 750):
dask-gamic-02

Compute the whole thing:
dask-gamic-03

So as a consequence we might need to make sure no immediate dask-computations are triggered before actually doing something with the data. Would it make sense to create a test repo for that?

@mgrover1
Copy link
Collaborator Author

Yeah, let's create a test repo to try this out - this is promising! We can take a look at more testing/establishing some benchmarks to dig in here.

@mgrover1
Copy link
Collaborator Author

Maybe xradar-benchmark?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants