Skip to content

Commit

Permalink
Merge branch 'main' into dtype-meta-vlen
Browse files Browse the repository at this point in the history
  • Loading branch information
magland authored Apr 19, 2024
2 parents df759d0 + 3c4562f commit d269450
Show file tree
Hide file tree
Showing 13 changed files with 571 additions and 28 deletions.
17 changes: 9 additions & 8 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import base64
from typing import Union, List, IO, Any, Dict, Literal
from typing import Union, List, IO, Any, Dict
from dataclasses import dataclass
import numpy as np
import zarr
Expand All @@ -12,7 +12,8 @@
_get_chunk_byte_range,
_get_byte_range_for_contiguous_dataset,
_join,
_get_chunk_names_for_dataset
_get_chunk_names_for_dataset,
_write_rfs_to_file,
)
from ..conversion.attr_conversion import h5_to_zarr_attr
from ..conversion.reformat_json import reformat_json
Expand Down Expand Up @@ -460,17 +461,17 @@ def listdir(self, path: str = "") -> List[str]:
else:
return []

def to_file(self, file_name: str, *, file_type: Literal["zarr.json"] = "zarr.json"):
def write_reference_file_system(self, output_file_name: str):
"""Write a reference file system corresponding to this store to a file.
This can then be loaded using LindiH5pyFile.from_reference_file_system(file_name)
"""
if file_type != "zarr.json":
raise Exception(f"Unsupported file type: {file_type}")

ret = self.to_reference_file_system()
with open(file_name, "w") as f:
json.dump(ret, f, indent=2)
if not output_file_name.endswith(".lindi.json"):
raise Exception("The output file name must end with .lindi.json")

rfs = self.to_reference_file_system()
_write_rfs_to_file(rfs=rfs, output_file_name=output_file_name)

def to_reference_file_system(self) -> dict:
"""Create a reference file system corresponding to this store.
Expand Down
8 changes: 8 additions & 0 deletions lindi/LindiH5ZarrStore/_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import IO, List
import json
import numpy as np
import h5py

Expand Down Expand Up @@ -83,3 +84,10 @@ def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]:
for name0 in names0:
names.append(f"{i}.{name0}")
return names


def _write_rfs_to_file(*, rfs: dict, output_file_name: str):
"""Write a reference file system to a file.
"""
with open(output_file_name, "w") as f:
json.dump(rfs, f, indent=2, sort_keys=True)
31 changes: 26 additions & 5 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from .LindiH5pyReference import LindiH5pyReference
from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore

from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore


class LindiH5pyFile(h5py.File):
def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r"):
Expand All @@ -29,7 +32,7 @@ def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: U
self._id = f'{id(self._file_object)}/'

@staticmethod
def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r"):
def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -47,6 +50,10 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] =
to_reference_file_system() to export the updated reference file
system to the same file or a new file.
"""
if staging_area is not None:
if mode not in ['r+']:
raise Exception("Staging area cannot be used in read-only mode")

if isinstance(rfs, str):
if rfs.startswith("http") or rfs.startswith("https"):
with tempfile.TemporaryDirectory() as tmpdir:
Expand All @@ -55,15 +62,17 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] =
with open(filename, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode)
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area)
else:
with open(rfs, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode)
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area)
elif isinstance(rfs, dict):
# This store does not need to be closed
store = LindiReferenceFileSystemStore(rfs)
if staging_area:
store = LindiStagingStore(base_store=store, staging_area=staging_area)
return LindiH5pyFile.from_zarr_store(store, mode=mode)
else:
raise Exception(f"Unhandled type for rfs: {type(rfs)}")
Expand Down Expand Up @@ -131,9 +140,12 @@ def to_reference_file_system(self):
"""
if self._zarr_store is None:
raise Exception("Cannot convert to reference file system without zarr store")
if not isinstance(self._zarr_store, LindiReferenceFileSystemStore):
zarr_store = self._zarr_store
if isinstance(zarr_store, LindiStagingStore):
zarr_store = zarr_store._base_store
if not isinstance(zarr_store, LindiReferenceFileSystemStore):
raise Exception(f"Unexpected type for zarr store: {type(self._zarr_store)}")
rfs = self._zarr_store.rfs
rfs = zarr_store.rfs
rfs_copy = json.loads(json.dumps(rfs))
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs_copy)
return rfs_copy
Expand Down Expand Up @@ -341,6 +353,15 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds):
raise Exception("Cannot require dataset in read-only mode")
return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds)

##############################
# staging store
@property
def staging_store(self):
store = self._zarr_store
if not isinstance(store, LindiStagingStore):
return None
return store


def _download_file(url: str, filename: str) -> None:
headers = {
Expand Down
46 changes: 44 additions & 2 deletions lindi/LindiH5pyFile/writers/LindiH5pyGroupWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import h5py
import numpy as np
import zarr
import numcodecs
from numcodecs.abc import Codec

from ..LindiH5pyDataset import LindiH5pyDataset
from ..LindiH5pyReference import LindiH5pyReference
Expand All @@ -11,6 +13,8 @@

from ...conversion.create_zarr_dataset_from_h5_data import create_zarr_dataset_from_h5_data

_compression_not_specified_ = object()


class LindiH5pyGroupWriter:
def __init__(self, p: 'LindiH5pyGroup'):
Expand Down Expand Up @@ -39,15 +43,52 @@ def require_group(self, name):
return ret
return self.create_group(name)

def create_dataset(self, name, shape=None, dtype=None, data=None, **kwds):
def create_dataset(
self,
name,
shape=None,
dtype=None,
data=None,
**kwds
):
chunks = None
compression = _compression_not_specified_
compression_opts = None
for k, v in kwds.items():
if k == 'chunks':
chunks = v
elif k == 'compression':
compression = v
elif k == 'compression_opts':
compression_opts = v
else:
raise Exception(f'Unsupported kwds in create_dataset: {k}')

if compression is _compression_not_specified_:
_zarr_compressor = 'default'
if compression_opts is not None:
raise Exception('compression_opts is only supported when compression is provided')
elif isinstance(compression, Codec):
_zarr_compressor = compression
if compression_opts is not None:
raise Exception('compression_opts is not supported when compression is provided as a Codec')
elif isinstance(compression, str):
if compression == 'gzip':
if compression_opts is None:
level = 4 # default for h5py
elif isinstance(compression_opts, int):
level = compression_opts
else:
raise Exception(f'Unexpected type for compression_opts: {type(compression_opts)}')
_zarr_compressor = numcodecs.GZip(level=level)
else:
raise Exception(f'Compression {compression} is not supported')
else:
raise Exception(f'Unexpected type for compression: {type(compression)}')

if isinstance(self.p._group_object, h5py.Group):
if _zarr_compressor != 'default':
raise Exception('zarr_compressor is not supported when _group_object is h5py.Group')
return LindiH5pyDataset(
self._group_object.create_dataset(name, shape=shape, dtype=dtype, data=data, chunks=chunks), # type: ignore
self.p._file
Expand Down Expand Up @@ -77,7 +118,8 @@ def create_dataset(self, name, shape=None, dtype=None, data=None, **kwds):
h5_shape=shape,
h5_dtype=dtype,
h5_data=data,
h5f=None
h5f=None,
zarr_compressor=_zarr_compressor
)
return LindiH5pyDataset(ds, self.p._file)
else:
Expand Down
Loading

0 comments on commit d269450

Please sign in to comment.