diff --git a/examples/rechunking/Dockerfile_virtualizarr b/examples/rechunking/Dockerfile_virtualizarr new file mode 100644 index 00000000..d1793c6a --- /dev/null +++ b/examples/rechunking/Dockerfile_virtualizarr @@ -0,0 +1,59 @@ +# Python 3.11 +FROM python:3.11-slim-buster + + +RUN apt-get update \ + # Install aws-lambda-cpp build dependencies + && apt-get install -y \ + g++ \ + make \ + cmake \ + unzip \ + # cleanup package lists, they are not used anymore in this image + && rm -rf /var/lib/apt/lists/* \ + && apt-cache search linux-headers-generic + +ARG FUNCTION_DIR="/function" + +# Copy function code +RUN mkdir -p ${FUNCTION_DIR} + +# Update pip +# NB botocore/boto3 are pinned due to https://github.com/boto/boto3/issues/3648 +# using versions from https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/setup.py +# due to s3fs dependency +RUN pip install --upgrade --ignore-installed pip wheel six setuptools \ + && pip install --upgrade --no-cache-dir --ignore-installed \ + awslambdaric \ + botocore==1.29.76 \ + boto3==1.26.76 \ + redis \ + httplib2 \ + requests \ + numpy \ + scipy \ + pandas \ + pika \ + kafka-python \ + cloudpickle \ + ps-mem \ + tblib + +# Set working directory to function root directory +WORKDIR ${FUNCTION_DIR} + +# Add Lithops +COPY lithops_lambda.zip ${FUNCTION_DIR} +RUN unzip lithops_lambda.zip \ + && rm lithops_lambda.zip \ + && mkdir handler \ + && touch handler/__init__.py \ + && mv entry_point.py handler/ + +# Put your dependencies here, using RUN pip install... or RUN apt install... + +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] +CMD [ "handler.entry_point.lambda_handler" ] diff --git a/examples/rechunking/README.md b/examples/rechunking/README.md new file mode 100644 index 00000000..44b65ff6 --- /dev/null +++ b/examples/rechunking/README.md @@ -0,0 +1,29 @@ +# Example rechunking workflow + +1. Set up a Python environment +```bash +conda create --name virtualizarr-rechunk -y python=3.11 +conda activate virtualizarr-rechunk +pip install -r requirements.txt +``` + +1. Set up cubed executor by following https://github.com/cubed-dev/cubed/blob/main/examples/lithops/aws/README.md +```bash +export CUBED_CONFIG=$(pwd) + +1. Build a runtime image for Cubed +```bash +export LITHOPS_CONFIG_FILE=$(pwd)/config.aws +lithops runtime build -b aws_lambda -f Dockerfile_virtualizarr virtualizarr-runtime +``` + +1. Run the script +```bash +python cubed-rechunk.py +``` + +## Cleaning up +To rebuild the Litops image, delete the existing one by running +```bash +lithops runtime delete -b aws_lambda -d virtualizarr-runtime +``` diff --git a/examples/rechunking/cubed-rechunk.py b/examples/rechunking/cubed-rechunk.py new file mode 100644 index 00000000..ab803c8d --- /dev/null +++ b/examples/rechunking/cubed-rechunk.py @@ -0,0 +1,79 @@ +# Example rechunking flow using VirtualiZarr, cubed, and xarray-cubed. +# Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook +# This script proves the concept but requires further optimization for +# production. Please, suggest improvements. + +import fsspec +import lithops +import xarray as xr + +from virtualizarr import open_virtual_dataset + +fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True) +files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*") +file_pattern = sorted(["s3://" + f for f in files_paths]) + +# truncate file_pattern while debugging +file_pattern = file_pattern[:4] + +print(f"{len(file_pattern)} file paths were retrieved.") + + +def map_references(fil): + """ Open a virtual datasets from a list of file paths. + """ + vds = open_virtual_dataset(fil, + indexes={}, + loadable_variables=['Time'], + cftime_variables=['Time'], + ) + return vds + + +def reduce_references(results): + """ Combine virtual datasets into a single dataset. + + """ + combined_vds = xr.combine_nested( + results, + concat_dim=['Time'], + coords='minimal', + compat='override', + ) + # possibly write parquet to s3 here + return combined_vds + + +fexec = lithops.FunctionExecutor() # config=lambda_config + +futures = fexec.map_reduce( + map_references, + file_pattern, + reduce_references, + spawn_reducer=100 + ) + +ds = futures.get_result() + +ds.virtualize.to_kerchunk('combined.json', format='json') + +# In jupyter, open_dataset seems to cache the json, such that +# changes aren't propogated until the kernel is restarted. +combined_ds = xr.open_dataset('combined.json', + engine="kerchunk", + chunks={}, + chunked_array_type='cubed', + ) + +combined_ds['Time'].attrs = {} # to_zarr complains about attrs + +rechunked_ds = combined_ds.chunk( + chunks={'Time': 5, 'south_north': 25, 'west_east': 32} +) + +rechunked_ds.to_zarr('rechunked.zarr', + mode='w', + encoding={}, # TODO + consolidated=True, + safe_chunks=False, + ) diff --git a/examples/rechunking/cubed.yaml b/examples/rechunking/cubed.yaml new file mode 100644 index 00000000..b4d2173c --- /dev/null +++ b/examples/rechunking/cubed.yaml @@ -0,0 +1,7 @@ +spec: + work_dir: "s3://cubed-$USER-temp" + allowed_mem: "2GB" + executor_name: "lithops" + executor_options: + runtime: "virtualizarr-runtime" + runtime_memory: 2000 diff --git a/examples/rechunking/requirements.txt b/examples/rechunking/requirements.txt new file mode 100644 index 00000000..dfd39b4b --- /dev/null +++ b/examples/rechunking/requirements.txt @@ -0,0 +1,10 @@ +boto +cftime +cubed +cubed-xarray +h5py +kerchunk +lithops +s3fs +virtualizarr +xarray