From 980addfdcb25b2c32535a571e89d83e92c3756ea Mon Sep 17 00:00:00 2001 From: Eugene M Date: Thu, 16 May 2024 12:20:11 -0400 Subject: [PATCH 01/25] ENH: write the configuration table --- src/bluesky/callbacks/tiled_writer.py | 32 ++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 77b30b23e8..a42e2151f7 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -301,6 +301,7 @@ def descriptor(self, doc: EventDescriptor): desc_node = self.root_node.create_container(key=desc_name, metadata=metadata) desc_node.create_container(key="external") desc_node.create_container(key="internal") + desc_node.create_container(key="configuration") else: # Get existing descriptor node (with fixed and variable metadata saved before) desc_node = self.root_node[desc_name] @@ -308,14 +309,39 @@ def descriptor(self, doc: EventDescriptor): # Update (add new values to) variable fields of the metadata metadata = deep_update(dict(desc_node.metadata), var_fields) desc_node.update_metadata(metadata) + + # Write the configuration data; loop over all detectors + conf_node = desc_node["configuration"] + for det_name, det_dict in conf_dict: + df_dict = {"descriptor_uid": uid} + df_dict.update(det_dict.get("data", {})) + df_dict.update({f"ts_{c}": v for c, v in det_dict.get("timestamps", {}).items()}) + df = pd.DataFrame(df_dict, index=[0]) + if det_name in conf_node.keys(): + conf_node[det_name].append_partition(df, 0) + else: + conf_node.new( + structure_family=StructureFamily.table, + data_sources=[ + DataSource( + structure_family=StructureFamily.table, + structure=TableStructure.from_pandas(df), + mimetype="text/csv", + ), + ], + key=det_name, + ) + conf_node[det_name].write_partition(df, 0) + self._desc_nodes[uid] = desc_node def event(self, doc): descriptor_node = self._desc_nodes[doc["descriptor"]] parent_node = descriptor_node["internal"] - df_dict = {c: [v] for c, v in doc["data"].items()} - df_dict.update({f"ts_{c}": [v] for c, v in doc["timestamps"].items()}) - df = pd.DataFrame(df_dict) + df_dict = {"seq_num": doc["seq_num"]} + df_dict.update(doc["data"]) + df_dict.update({f"ts_{c}": v for c, v in doc["timestamps"].items()}) + df = pd.DataFrame(df_dict, index=[0]) if "events" in parent_node.keys(): parent_node["events"].append_partition(df, 0) else: From 64648dd5cca9d8b5954008572c50990fd570b062 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Thu, 16 May 2024 12:30:12 -0400 Subject: [PATCH 02/25] ENH: write the configuration table --- src/bluesky/callbacks/tiled_writer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index a42e2151f7..11c1217e24 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -310,9 +310,9 @@ def descriptor(self, doc: EventDescriptor): metadata = deep_update(dict(desc_node.metadata), var_fields) desc_node.update_metadata(metadata) - # Write the configuration data; loop over all detectors + # Write the configuration data: loop over all detectors conf_node = desc_node["configuration"] - for det_name, det_dict in conf_dict: + for det_name, det_dict in conf_dict.items(): df_dict = {"descriptor_uid": uid} df_dict.update(det_dict.get("data", {})) df_dict.update({f"ts_{c}": v for c, v in det_dict.get("timestamps", {}).items()}) @@ -408,7 +408,7 @@ def stream_datum(self, doc: StreamDatum): # NOTE: Assigning data_source.id in the object and passing it in http params is superflous, but it is currently required by Tiled. # noqa sres_node.refresh() data_source = handler.get_data_source() - data_source.id = sres_node.data_sources()[0]["id"] # ID of the exisiting DataSource record + data_source.id = sres_node.data_sources()[0].id # ID of the exisiting DataSource record endpoint = sres_node.uri.replace("/metadata/", "/data_source/", 1) handle_error( sres_node.context.http_client.put( From b5dfd1c9e8033cb72aa5287251e43fadc9ef9fd1 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 17 May 2024 15:11:13 -0400 Subject: [PATCH 03/25] ENH: process old-style bluesky documents --- src/bluesky/callbacks/tiled_writer.py | 98 +++++++++++++++++++-------- 1 file changed, 70 insertions(+), 28 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 3e9dc2c71d..694cd7c2de 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -1,10 +1,10 @@ import copy -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np import pandas as pd from event_model import DocumentRouter, RunRouter -from event_model.documents import EventDescriptor, StreamDatum, StreamResource +from event_model.documents import Datum, EventDescriptor, Resource, StreamDatum, StreamResource from pydantic.utils import deep_update from tiled.client import from_profile, from_uri from tiled.client.base import BaseClient @@ -21,6 +21,7 @@ "ADHDF5_SWMR_STREAM": "application/x-hdf5", "AD_HDF5_SWMR_SLICE": "application/x-hdf5", "AD_TIFF": "multipart/related;type=image/tiff", + "AD_HDF5_GERM": "application/x-hdf5", } DTYPE_LOOKUP = {"number": " StreamResource: + def _ensure_resource_backcompat(self, doc: StreamResource) -> StreamResource: """Kept for back-compatibility with old StreamResource schema from event_model<1.20.0 Will make changes to and return a shallow copy of StreamRsource dictionary adhering to the new structure. """ - sres = copy.copy(sres) - if ("mimetype" not in sres.keys()) and ("spec" not in sres.keys()): + doc = copy.copy(doc) + if ("mimetype" not in doc.keys()) and ("spec" not in doc.keys()): raise RuntimeError("StreamResource document is missing a mimetype or spec") else: - sres["mimetype"] = sres.get("mimetype") or MIMETYPE_LOOKUP[sres.get("spec")] - if "parameters" not in sres.keys(): - sres["parameters"] = sres.pop("resource_kwargs", {}) - if "uri" not in sres.keys(): - file_path = sres.pop("root").strip("/") + "/" + sres.pop("resource_path").strip("/") - sres["uri"] = "file://localhost/" + file_path + doc["mimetype"] = doc.get("mimetype") or MIMETYPE_LOOKUP[doc.get("spec")] + if "parameters" not in doc.keys(): + doc["parameters"] = doc.pop("resource_kwargs", {}) + if "uri" not in doc.keys(): + file_path = doc.pop("root").strip("/") + "/" + doc.pop("resource_path").strip("/") + doc["uri"] = "file://localhost/" + file_path - return sres + return doc def start(self, doc): self.root_node = self.client.create_container( @@ -310,13 +317,18 @@ def descriptor(self, doc: EventDescriptor): metadata = deep_update(dict(desc_node.metadata), var_fields) desc_node.update_metadata(metadata) + # Keep specifications for external and internal data_keys for faster access + self.data_keys_int.update({k: v for k, v in metadata["data_keys"].items() if "external" not in v.keys()}) + self.data_keys_ext.update({k: v for k, v in metadata["data_keys"].items() if "external" in v.keys()}) + # Write the configuration data: loop over all detectors conf_node = desc_node["configuration"] - for det_name, det_dict in conf_dict.items(): + for det_name, det_dict in conf_dict[uid].items(): + print(det_name, det_dict) df_dict = {"descriptor_uid": uid} df_dict.update(det_dict.get("data", {})) df_dict.update({f"ts_{c}": v for c, v in det_dict.get("timestamps", {}).items()}) - df = pd.DataFrame(df_dict, index=[0]) + df = pd.Series(df_dict).to_frame().T if det_name in conf_node.keys(): conf_node[det_name].append_partition(df, 0) else: @@ -330,18 +342,23 @@ def descriptor(self, doc: EventDescriptor): ), ], key=det_name, + metadata=det_dict["data_keys"], ) conf_node[det_name].write_partition(df, 0) self._desc_nodes[uid] = desc_node def event(self, doc): - descriptor_node = self._desc_nodes[doc["descriptor"]] - parent_node = descriptor_node["internal"] + desc_node = self._desc_nodes[doc["descriptor"]] + + # Process _internal_ data -- those without external flag or those that have been filled + data_keys_spec = {k: v for k, v in self.data_keys_int.items() if doc["filled"].get(k, True)} + data_keys_spec.update({k: v for k, v in self.data_keys_ext.items() if doc["filled"].get(k, False)}) + parent_node = desc_node["internal"] df_dict = {"seq_num": doc["seq_num"]} - df_dict.update(doc["data"]) - df_dict.update({f"ts_{c}": v for c, v in doc["timestamps"].items()}) - df = pd.DataFrame(df_dict, index=[0]) + df_dict.update({k: v for k, v in doc["data"].items() if k in data_keys_spec.keys()}) + df_dict.update({f"ts_{k}": v for k, v in doc["timestamps"].items()}) # Keep all timestamps + df = pd.Series(df_dict).to_frame().T # data_keys become column names in the df if "events" in parent_node.keys(): parent_node["events"].append_partition(df, 0) else: @@ -355,16 +372,41 @@ def event(self, doc): ), ], key="events", + metadata=data_keys_spec, ) parent_node["events"].write_partition(df, 0) - def stream_resource(self, doc: StreamResource): - """Process a StreamResource document + # Process _external_ data: Loop over all referenced Datums + for data_key in self.data_keys_ext.keys(): + if doc["filled"].get(data_key, False): + continue - Only _cache_ the StreamResource for now; add the node when at least one StreamDatum is added - """ + if datum_id := doc["data"].get(data_key): + if datum_id in self._docs_cache.keys(): + # Convert the Datum document to the StreamDatum format + datum_doc = self._docs_cache.pop(datum_id) + datum_doc["uid"] = datum_doc.pop("datum_id") + datum_doc["stream_resource"] = datum_doc.pop("resource") + datum_doc["descriptor"] = doc["descriptor"] # From Event document + datum_doc["indices"] = {"start": doc["seq_num"] - 1, "stop": doc["seq_num"]} + datum_doc["seq_nums"] = {"start": doc["seq_num"], "stop": doc["seq_num"] + 1} - self._sres_cache[doc["uid"]] = self._ensure_sres_backcompat(doc) + # Update the Resource document (add data_key as in StreamResource) + if datum_doc["stream_resource"] in self._docs_cache.keys(): + self._docs_cache[datum_doc["stream_resource"]]["data_key"] = data_key + + self.stream_datum(datum_doc) + else: + raise RuntimeError(f"Datum {datum_id} is referenced before being declared.") + + def datum(self, doc): + self._docs_cache[doc["datum_id"]] = copy.copy(doc) + + def resource(self, doc): + self._docs_cache[doc["uid"]] = self._ensure_resource_backcompat(doc) + + def stream_resource(self, doc: StreamResource): + self._docs_cache[doc["uid"]] = self._ensure_resource_backcompat(doc) def get_sres_node(self, sres_uid: str, desc_uid: Optional[str] = None) -> Tuple[BaseClient, ConsolidatorBase]: """Get Stream Resource node from Tiled, if it already exists, or register it from a cached SR document""" @@ -373,11 +415,11 @@ def get_sres_node(self, sres_uid: str, desc_uid: Optional[str] = None) -> Tuple[ sres_node = self._sres_nodes[sres_uid] handler = self._handlers[sres_uid] - elif sres_uid in self._sres_cache.keys(): + elif sres_uid in self._docs_cache.keys(): if not desc_uid: raise RuntimeError("Descriptor uid must be specified to initialise a Stream Resource node") - sres_doc = self._sres_cache.pop(sres_uid) + sres_doc = self._docs_cache.pop(sres_uid) desc_node = self._desc_nodes[desc_uid] # Initialise a bluesky handler (consolidator) for the StreamResource From a1042665da659fb30c307e509d1b3001c7057ceb Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 17 May 2024 15:40:44 -0400 Subject: [PATCH 04/25] ENH: moved Consolidators to a separate module --- src/bluesky/callbacks/core.py | 8 + src/bluesky/callbacks/tiled_writer.py | 213 +------------------ src/bluesky/consolidators.py | 284 ++++++++++++++++++++++++++ 3 files changed, 300 insertions(+), 205 deletions(-) create mode 100644 src/bluesky/consolidators.py diff --git a/src/bluesky/callbacks/core.py b/src/bluesky/callbacks/core.py index 782af44d7e..7cd7d6ca32 100644 --- a/src/bluesky/callbacks/core.py +++ b/src/bluesky/callbacks/core.py @@ -16,6 +16,14 @@ from ..utils import ensure_uid +MIMETYPE_LOOKUP = { + "hdf5": "application/x-hdf5", + "ADHDF5_SWMR_STREAM": "application/x-hdf5", + "AD_HDF5_SWMR_SLICE": "application/x-hdf5", + "AD_TIFF": "multipart/related;type=image/tiff", + "AD_HDF5_GERM": "application/x-hdf5", +} + logger = logging.getLogger(__name__) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 694cd7c2de..137edbe76b 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -1,218 +1,22 @@ import copy -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, Optional, Tuple, Union -import numpy as np import pandas as pd -from event_model import DocumentRouter, RunRouter +from event_model import RunRouter from event_model.documents import Datum, EventDescriptor, Resource, StreamDatum, StreamResource from pydantic.utils import deep_update from tiled.client import from_profile, from_uri from tiled.client.base import BaseClient from tiled.client.container import Container from tiled.client.utils import handle_error -from tiled.structures.array import ArrayStructure, BuiltinDtype -from tiled.structures.core import Spec, StructureFamily -from tiled.structures.data_source import Asset, DataSource, Management +from tiled.structures.core import Spec from tiled.structures.table import TableStructure from tiled.utils import safe_json_dump -MIMETYPE_LOOKUP = { - "hdf5": "application/x-hdf5", - "ADHDF5_SWMR_STREAM": "application/x-hdf5", - "AD_HDF5_SWMR_SLICE": "application/x-hdf5", - "AD_TIFF": "multipart/related;type=image/tiff", - "AD_HDF5_GERM": "application/x-hdf5", -} -DTYPE_LOOKUP = {"number": " Tuple[int]: - """Native shape of the data stored in assets - - This includes the leading (0-th) dimension corresponding to the number of rows, including skipped rows, if - any. The number of relevant usable data rows may be lower, which is determined by the `seq_nums` field of - StreamDatum documents.""" - return self._num_rows, *self.datum_shape - - @property - def chunks(self) -> Tuple[Tuple[int, ...], ...]: - """Chunking specification based on the Stream Resource parameter `chunk_size`: - None or 0 -- single chunk for all existing and new elements - int -- fixed-sized chunks with at most `chunk_size` elements, last chunk can be smaller - """ - if not self.chunk_size: - dim0_chunk = [self._num_rows] - else: - dim0_chunk = [self.chunk_size] * int(self._num_rows / self.chunk_size) - if self._num_rows % self.chunk_size: - dim0_chunk.append(self._num_rows % self.chunk_size) - - return tuple(dim0_chunk), *[(d,) for d in self.datum_shape] - - @property - def has_skips(self) -> bool: - """Indicates whether any rows should be skipped when mapping their indices to frame numbers - - This flag is intended to provide a shortcut for more efficient data access when there are no skips, and the - mapping between indices and seq_nums is straightforward. In other case, the _seqnums_to_indices_map needs - to be taken into account. - """ - return self._num_rows > len(self._seqnums_to_indices_map) - - @property - def adapter_parameters(self) -> Dict: - """A dictionary of parameters passed to an Adapter - - These parameters are intended to provide any additional information required to read a data source of a - specific mimetype, e.g. "path" the path into an HDF5 file or "template" the filename pattern of a TIFF - sequence. - """ - return {} - - def consume_stream_datum(self, doc: StreamDatum): - """Process a new StreamDatum and update the internal data structure - - This will be called for every new StreamDatum received to account for the new added rows. - This method _may_need_ to be subclassed and expanded depending on a specific mimetype. - Actions: - - Parse the fields in a new StreamDatum - - Increment the number of rows (implemented by the Base class) - - Keep track of the correspondence between indices and seq_nums (implemented by the Base class) - - Update the list of assets, including their uris, if necessary - - Update shape and chunks - """ - self._num_rows += doc["indices"]["stop"] - doc["indices"]["start"] - new_seqnums = range(doc["seq_nums"]["start"], doc["seq_nums"]["stop"]) - new_indices = range(doc["indices"]["start"], doc["indices"]["stop"]) - self._seqnums_to_indices_map.update(dict(zip(new_seqnums, new_indices))) - - def get_data_source(self) -> DataSource: - """Return a DataSource object reflecting the current state of the streamed dataset. - - The returned DataSource is conceptually similar (and can be an instance of) tiled.structures.DataSource. In - general, it describes associated Assets (filepaths, mimetype) along with their internal data structure - (array shape, chunks, additional parameters) and should contain all information necessary to read the file. - """ - return DataSource( - mimetype=self.mimetype, - assets=self.assets, - structure_family=StructureFamily.array, - structure=ArrayStructure( - data_type=BuiltinDtype.from_numpy_dtype(self.dtype), - shape=self.shape, - chunks=self.chunks, - ), - parameters=self.adapter_parameters, - management=Management.external, - ) - - -class HDF5Consolidator(ConsolidatorBase): - supported_mimetypes = {"application/x-hdf5"} - - def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor): - super().__init__(stream_resource, descriptor) - self.assets.append(Asset(data_uri=self.uri, is_directory=False, parameter="data_uri")) - self.swmr = self._sres_parameters.get("swmr", True) +from ..consolidators import ConsolidatorBase, DataSource, StructureFamily, consolidator_factory +from .core import MIMETYPE_LOOKUP, CallbackBase - @property - def adapter_parameters(self) -> Dict: - return {"path": self._sres_parameters["path"].strip("/").split("/")} - - -class TIFFConsolidator(ConsolidatorBase): - supported_mimetypes = {"multipart/related;type=image/tiff"} - - def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor): - super().__init__(stream_resource, descriptor) - self.data_uris: List[str] = [] - - def get_datum_uri(self, indx: int): - """Return a full uri for a datum (an individual TIFF file) based on its index in the sequence. - - This relies on the `template` parameter passed in the StreamResource, which is a string either in the "new" - Python formatting style that can be evaluated to a file name using the `.format(indx)` method given an - integer index, e.g. "{:05}.tif". - """ - return self.uri.strip("/") + "/" + self._sres_parameters["template"].format(indx) - - def consume_stream_datum(self, doc: StreamDatum): - indx = int(doc["uid"].split("/")[1]) - new_datum_uri = self.get_datum_uri(indx) - new_asset = Asset( - data_uri=new_datum_uri, - is_directory=False, - parameter="data_uris", - num=len(self.assets) + 1, - ) - self.assets.append(new_asset) - self.data_uris.append(new_datum_uri) - - super().consume_stream_datum(doc) - - -CONSOLIDATOR_REGISTRY = { - "application/x-hdf5": HDF5Consolidator, - "multipart/related;type=image/tiff": TIFFConsolidator, -} +DTYPE_LOOKUP = {"number": " Tuple[ desc_node = self._desc_nodes[desc_uid] # Initialise a bluesky handler (consolidator) for the StreamResource - handler_class = CONSOLIDATOR_REGISTRY[sres_doc["mimetype"]] - handler = handler_class(sres_doc, dict(desc_node.metadata)) + handler = consolidator_factory(sres_doc, dict(desc_node.metadata)) sres_node = desc_node["external"].new( key=handler.data_key, diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py new file mode 100644 index 0000000000..009b00429c --- /dev/null +++ b/src/bluesky/consolidators.py @@ -0,0 +1,284 @@ +import collections +import dataclasses +import enum +from typing import Any, Dict, List, Optional, Set, Tuple + +import numpy as np +from event_model.documents import EventDescriptor, StreamDatum, StreamResource +from tiled.mimetypes import DEFAULT_ADAPTERS_BY_MIMETYPE +from tiled.structures.array import ArrayStructure, BuiltinDtype + +DTYPE_LOOKUP = {"number": " Tuple[int]: + """Native shape of the data stored in assets + + This includes the leading (0-th) dimension corresponding to the number of rows, including skipped rows, if + any. The number of relevant usable data rows may be lower, which is determined by the `seq_nums` field of + StreamDatum documents.""" + return self._num_rows, *self.datum_shape + + @property + def chunks(self) -> Tuple[Tuple[int, ...], ...]: + """Chunking specification based on the Stream Resource parameter `chunk_size`: + None or 0 -- single chunk for all existing and new elements + int -- fixed-sized chunks with at most `chunk_size` elements, last chunk can be smaller + """ + if not self.chunk_size: + dim0_chunk = [self._num_rows] + else: + dim0_chunk = [self.chunk_size] * int(self._num_rows / self.chunk_size) + if self._num_rows % self.chunk_size: + dim0_chunk.append(self._num_rows % self.chunk_size) + + return tuple(dim0_chunk), *[(d,) for d in self.datum_shape] + + @property + def has_skips(self) -> bool: + """Indicates whether any rows should be skipped when mapping their indices to frame numbers + + This flag is intended to provide a shortcut for more efficient data access when there are no skips, and the + mapping between indices and seq_nums is straightforward. In other case, the _seqnums_to_indices_map needs + to be taken into account. + """ + return self._num_rows > len(self._seqnums_to_indices_map) + + @property + def adapter_parameters(self) -> Dict: + """A dictionary of parameters passed to an Adapter + + These parameters are intended to provide any additional information required to read a data source of a + specific mimetype, e.g. "path" the path into an HDF5 file or "template" the filename pattern of a TIFF + sequence. + """ + return {} + + def consume_stream_datum(self, doc: StreamDatum): + """Process a new StreamDatum and update the internal data structure + + This will be called for every new StreamDatum received to account for the new added rows. + This method _may_need_ to be subclassed and expanded depending on a specific mimetype. + Actions: + - Parse the fields in a new StreamDatum + - Increment the number of rows (implemented by the Base class) + - Keep track of the correspondence between indices and seq_nums (implemented by the Base class) + - Update the list of assets, including their uris, if necessary + - Update shape and chunks + """ + self._num_rows += doc["indices"]["stop"] - doc["indices"]["start"] + new_seqnums = range(doc["seq_nums"]["start"], doc["seq_nums"]["stop"]) + new_indices = range(doc["indices"]["start"], doc["indices"]["stop"]) + self._seqnums_to_indices_map.update(dict(zip(new_seqnums, new_indices))) + + def get_data_source(self) -> DataSource: + """Return a DataSource object reflecting the current state of the streamed dataset. + + The returned DataSource is conceptually similar (and can be an instance of) tiled.structures.DataSource. In + general, it describes associated Assets (filepaths, mimetype) along with their internal data structure + (array shape, chunks, additional parameters) and should contain all information necessary to read the file. + """ + return DataSource( + mimetype=self.mimetype, + assets=self.assets, + structure_family=StructureFamily.array, + structure=ArrayStructure( + data_type=BuiltinDtype.from_numpy_dtype(self.dtype), + shape=self.shape, + chunks=self.chunks, + ), + parameters=self.adapter_parameters, + management=Management.external, + ) + + def get_adapter(self, adapters_by_mimetype=None): + """Return an Adapter suitable for reading the data + + Uses a dictionary mapping of a mimetype to a callable that returns an Adapter instance. + This might be a class, classmethod constructor, factory function... + it does not matter here; it is just a callable. + """ + + # NOTE: This could be a method on DataSource instead, which seems more appropriate. + + # User-provided adapters take precedence over defaults. + all_adapters_by_mimetype = collections.ChainMap((adapters_by_mimetype or {}), DEFAULT_ADAPTERS_BY_MIMETYPE) + adapter_factory = all_adapters_by_mimetype[self.mimetype] + + # Construct kwargs to pass to Adapter. + parameters = collections.defaultdict(list) + for asset in self.assets: + if asset.parameter is None: + # This asset is not directly opened by the Adapter. It is used indirectly, such as the case of HDF5 + # virtual dataset 'data' files are referenced from 'master' files. + continue + if asset.num is None: + # This parameters takes the URI as a scalar value. + parameters[asset.parameter] = asset.data_uri + else: + # This parameters takes a list of URIs. + parameters[asset.parameter].append(asset.data_uri) + + parameters["structure"] = ArrayStructure( + data_type=BuiltinDtype.from_numpy_dtype(self.dtype), + shape=self.shape, + chunks=self.chunks, + ) + adapter_kwargs = dict(parameters) + adapter_kwargs.update(self.adapter_parameters) + + return adapter_factory(**adapter_kwargs) + + +class HDF5Consolidator(ConsolidatorBase): + supported_mimetypes = {"application/x-hdf5"} + + def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor): + super().__init__(stream_resource, descriptor) + self.assets.append(Asset(data_uri=self.uri, is_directory=False, parameter="data_uri")) + self.swmr = self._sres_parameters.get("swmr", True) + + @property + def adapter_parameters(self) -> Dict: + return {"path": self._sres_parameters["path"].strip("/").split("/"), "swmr": self.swmr} + + # def get_adapter(self): + # with h5py.File(path_from_uri(self.uri), "r", swmr=True) as f: + # arr = np.array(f["/".join(self.adapter_parameters["path"])]) + # return ArrayAdapter.from_array(arr, shape=self.shape, chunks=self.chunks) + + +class TIFFConsolidator(ConsolidatorBase): + supported_mimetypes = {"multipart/related;type=image/tiff"} + + def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor): + super().__init__(stream_resource, descriptor) + self.data_uris: List[str] = [] + + def get_datum_uri(self, indx: int): + """Return a full uri for a datum (an individual TIFF file) based on its index in the sequence. + + This relies on the `template` parameter passed in the StreamResource, which is a string either in the "new" + Python formatting style that can be evaluated to a file name using the `.format(indx)` method given an + integer index, e.g. "{:05}.tif". + """ + return self.uri.strip("/") + "/" + self._sres_parameters["template"].format(indx) + + def consume_stream_datum(self, doc: StreamDatum): + indx = int(doc["uid"].split("/")[1]) + new_datum_uri = self.get_datum_uri(indx) + new_asset = Asset( + data_uri=new_datum_uri, + is_directory=False, + parameter="data_uris", + num=len(self.assets) + 1, + ) + self.assets.append(new_asset) + self.data_uris.append(new_datum_uri) + + super().consume_stream_datum(doc) + + +CONSOLIDATOR_REGISTRY = { + "application/x-hdf5": HDF5Consolidator, + "multipart/related;type=image/tiff": TIFFConsolidator, +} + + +def consolidator_factory(stream_resource_doc, descriptor_doc): + consolidator_class = CONSOLIDATOR_REGISTRY[stream_resource_doc["mimetype"]] + return consolidator_class(stream_resource_doc, descriptor_doc) From c711a5d0c6849767eab0ef2518db67e033e802e8 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 17 May 2024 17:40:13 -0400 Subject: [PATCH 05/25] clean-up --- src/bluesky/consolidators.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index 009b00429c..aecad72cfd 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -193,8 +193,6 @@ def get_adapter(self, adapters_by_mimetype=None): it does not matter here; it is just a callable. """ - # NOTE: This could be a method on DataSource instead, which seems more appropriate. - # User-provided adapters take precedence over defaults. all_adapters_by_mimetype = collections.ChainMap((adapters_by_mimetype or {}), DEFAULT_ADAPTERS_BY_MIMETYPE) adapter_factory = all_adapters_by_mimetype[self.mimetype] @@ -236,11 +234,6 @@ def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor) def adapter_parameters(self) -> Dict: return {"path": self._sres_parameters["path"].strip("/").split("/"), "swmr": self.swmr} - # def get_adapter(self): - # with h5py.File(path_from_uri(self.uri), "r", swmr=True) as f: - # arr = np.array(f["/".join(self.adapter_parameters["path"])]) - # return ArrayAdapter.from_array(arr, shape=self.shape, chunks=self.chunks) - class TIFFConsolidator(ConsolidatorBase): supported_mimetypes = {"multipart/related;type=image/tiff"} From 5efbdabee09657e6e611ef08ce016e354706aa90 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Mon, 20 May 2024 11:38:06 -0400 Subject: [PATCH 06/25] ensure backcompatibility woth older version of Tiled --- src/bluesky/callbacks/tiled_writer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 137edbe76b..dc0f41b021 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -253,7 +253,10 @@ def stream_datum(self, doc: StreamDatum): # NOTE: Assigning data_source.id in the object and passing it in http params is superflous, but it is currently required by Tiled. # noqa sres_node.refresh() data_source = handler.get_data_source() - data_source.id = sres_node.data_sources()[0].id # ID of the exisiting DataSource record + try: + data_source.id = sres_node.data_sources()[0].id # ID of the exisiting DataSource record + except AttributeError: + data_source.id = sres_node.data_sources()[0]["id"] # TODO: Old format, to be removed. endpoint = sres_node.uri.replace("/metadata/", "/data_source/", 1) handle_error( sres_node.context.http_client.put( From 93a5ae8ed8b9f10190364d8facb27a00000ed41f Mon Sep 17 00:00:00 2001 From: Eugene M Date: Mon, 20 May 2024 16:44:07 -0400 Subject: [PATCH 07/25] Rename configuration node to config --- src/bluesky/callbacks/tiled_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index dc0f41b021..6de8061733 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -112,7 +112,7 @@ def descriptor(self, doc: EventDescriptor): desc_node = self.root_node.create_container(key=desc_name, metadata=metadata) desc_node.create_container(key="external") desc_node.create_container(key="internal") - desc_node.create_container(key="configuration") + desc_node.create_container(key="config") else: # Get existing descriptor node (with fixed and variable metadata saved before) desc_node = self.root_node[desc_name] @@ -126,7 +126,7 @@ def descriptor(self, doc: EventDescriptor): self.data_keys_ext.update({k: v for k, v in metadata["data_keys"].items() if "external" in v.keys()}) # Write the configuration data: loop over all detectors - conf_node = desc_node["configuration"] + conf_node = desc_node["config"] for det_name, det_dict in conf_dict[uid].items(): print(det_name, det_dict) df_dict = {"descriptor_uid": uid} From b5ffb75abc2abdf67b63207985bdfced247c1547 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Tue, 11 Jun 2024 10:21:56 -0400 Subject: [PATCH 08/25] Remove old style of getting DataSource id --- src/bluesky/callbacks/tiled_writer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 87984330e0..cdbaf694a9 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -254,10 +254,7 @@ def stream_datum(self, doc: StreamDatum): # params is superfluous, but it is currently required by Tiled. sres_node.refresh() data_source = handler.get_data_source() - try: - data_source.id = sres_node.data_sources()[0].id # ID of the exisiting DataSource record - except AttributeError: - data_source.id = sres_node.data_sources()[0]["id"] # TODO: Old format, to be removed. + data_source.id = sres_node.data_sources()[0].id # ID of the exisiting DataSource record endpoint = sres_node.uri.replace("/metadata/", "/data_source/", 1) handle_error( sres_node.context.http_client.put( From 6da456d0222982c6cb814890154e1a42f9f06c0d Mon Sep 17 00:00:00 2001 From: Eugene M Date: Tue, 11 Jun 2024 16:51:31 -0400 Subject: [PATCH 09/25] TST: unit tests for non-stream Resourse/Datum/Event --- src/bluesky/callbacks/tiled_writer.py | 5 +-- src/bluesky/consolidators.py | 2 +- src/bluesky/tests/test_tiled_writer.py | 52 +++++++++++++++++++++++++- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index cdbaf694a9..cce71ced7f 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -128,11 +128,10 @@ def descriptor(self, doc: EventDescriptor): # Write the configuration data: loop over all detectors conf_node = desc_node["config"] for det_name, det_dict in conf_dict[uid].items(): - print(det_name, det_dict) df_dict = {"descriptor_uid": uid} df_dict.update(det_dict.get("data", {})) df_dict.update({f"ts_{c}": v for c, v in det_dict.get("timestamps", {}).items()}) - df = pd.Series(df_dict).to_frame().T + df = pd.DataFrame(df_dict, index=[0], columns=df_dict.keys()) if det_name in conf_node.keys(): conf_node[det_name].append_partition(df, 0) else: @@ -162,7 +161,7 @@ def event(self, doc): df_dict = {"seq_num": doc["seq_num"]} df_dict.update({k: v for k, v in doc["data"].items() if k in data_keys_spec.keys()}) df_dict.update({f"ts_{k}": v for k, v in doc["timestamps"].items()}) # Keep all timestamps - df = pd.Series(df_dict).to_frame().T # data_keys become column names in the df + df = pd.DataFrame(df_dict, index=[0], columns=df_dict.keys()) if "events" in parent_node.keys(): parent_node["events"].append_partition(df, 0) else: diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index aecad72cfd..d08cc0d496 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -249,7 +249,7 @@ def get_datum_uri(self, indx: int): Python formatting style that can be evaluated to a file name using the `.format(indx)` method given an integer index, e.g. "{:05}.tif". """ - return self.uri.strip("/") + "/" + self._sres_parameters["template"].format(indx) + return self.uri + self._sres_parameters["template"].format(indx) def consume_stream_datum(self, doc: StreamDatum): indx = int(doc["uid"].split("/")[1]) diff --git a/src/bluesky/tests/test_tiled_writer.py b/src/bluesky/tests/test_tiled_writer.py index 7c70a054b9..ce5db60df6 100644 --- a/src/bluesky/tests/test_tiled_writer.py +++ b/src/bluesky/tests/test_tiled_writer.py @@ -2,6 +2,7 @@ import h5py import numpy as np +import ophyd.sim import pytest import tifffile as tf from event_model.documents.event_descriptor import DataKey @@ -121,7 +122,7 @@ def _get_tiff_stream(self, data_key: str, index: int) -> Tuple[StreamResource, S parameters={"chunk_size": 1, "template": "{:05d}.tif"}, data_key=data_key, root=self.root, - uri="file://localhost" + self.root, + uri="file://localhost" + self.root + "/", spec="AD_TIFF", mimetype="multipart/related;type=image/tiff", uid=uid, @@ -181,6 +182,29 @@ def read(self) -> Dict[str, Reading]: return {} +class SynSignalWithRegistry(ophyd.sim.SynSignalWithRegistry): + """A readable image detector that writes a sequence of files and generates relevant Bluesky documents. + + Subclassed from ophyd.sim to match the updated schema of Resource documents. + """ + + def __init__(self, *args, dtype_str="uint8", **kwargs): + self.dtype_str = dtype_str + super().__init__(*args, **kwargs) + + def stage(self): + super().stage() + parameters = {"chunk_size": 1, "template": "_{:d}." + self.save_ext} + self._asset_docs_cache[-1][1]["resource_kwargs"].update(parameters) + + def describe(self): + res = super().describe() + for key in res: + res[key]["external"] = "FILESTORE" + res[key]["dtype_str"] = self.dtype_str + return res + + def test_stream_datum_readable_counts(RE, client, tmp_path): tw = TiledWriter(client) det = StreamDatumReadableCollectable(name="det", root=str(tmp_path)) @@ -204,6 +228,32 @@ def test_stream_datum_collectable(RE, client, tmp_path): assert arrs[2].read() is not None +def test_stuff(RE, client, tmp_path): + det = SynSignalWithRegistry( + func=lambda: np.random.randint(0, 255, (10, 15), dtype="uint8"), + dtype_str="uint8", + name="img", + labels={"detectors"}, + save_func=tf.imwrite, + save_path=str(tmp_path), + save_spec="AD_TIFF", + save_ext="tif", + ) + tw = TiledWriter(client) + RE(bp.count([det], 3), tw) + extr = client.values().last()["primary"]["external"]["img"] + intr = client.values().last()["primary"]["internal"]["events"] + conf = client.values().last()["primary"]["config"]["img"] + + assert extr.shape == (3, 10, 15) + assert extr.read() is not None + assert set(intr.columns) == set("seq_num", "ts_img") + assert len(intr.read()) == 3 + assert (intr["seq_num"].read() == [1, 2, 3]).all() + assert set(conf.columns) == set("descriptor_uid", "img", "ts_img") + assert len(conf.read()) == 1 + + def collect_plan(*objs, name="primary"): yield from bps.open_run() yield from bps.declare_stream(*objs, collect=True, name=name) From a0ae8835e23ccdea4051c71ac9138e14c2a009d6 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 13:08:40 -0400 Subject: [PATCH 10/25] Update src/bluesky/callbacks/tiled_writer.py Co-authored-by: Max Rakitin --- src/bluesky/callbacks/tiled_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index cce71ced7f..2a7ea09f9e 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -44,7 +44,7 @@ def __call__(self, name, doc): class _RunWriter(CallbackBase): - """Write documents from one Bluesky Run into Tile + """Write documents from one Bluesky Run into Tiled. Datum, Resource, and StreamResource documents are cached until Event or StreamDatum documents are received, after which corresponding nodes are created in Tiled. From a2019019f669b19df2164160c7b521f317244212 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 13:10:47 -0400 Subject: [PATCH 11/25] Update src/bluesky/consolidators.py Co-authored-by: Max Rakitin --- src/bluesky/consolidators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index d08cc0d496..f10fb5fd6e 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -152,7 +152,7 @@ def consume_stream_datum(self, doc: StreamDatum): """Process a new StreamDatum and update the internal data structure This will be called for every new StreamDatum received to account for the new added rows. - This method _may_need_ to be subclassed and expanded depending on a specific mimetype. + This method _may need_ to be subclassed and expanded depending on a specific mimetype. Actions: - Parse the fields in a new StreamDatum - Increment the number of rows (implemented by the Base class) From 432abebf79711bf652d45bc85831f5f57835c634 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 13:11:03 -0400 Subject: [PATCH 12/25] Update src/bluesky/callbacks/tiled_writer.py Co-authored-by: Max Rakitin --- src/bluesky/callbacks/tiled_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 2a7ea09f9e..ec5d9bd0cf 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -253,7 +253,7 @@ def stream_datum(self, doc: StreamDatum): # params is superfluous, but it is currently required by Tiled. sres_node.refresh() data_source = handler.get_data_source() - data_source.id = sres_node.data_sources()[0].id # ID of the exisiting DataSource record + data_source.id = sres_node.data_sources()[0].id # ID of the existing DataSource record endpoint = sres_node.uri.replace("/metadata/", "/data_source/", 1) handle_error( sres_node.context.http_client.put( From 4bf492edc45b23440fe5df55f8c3b32f6b92f62e Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 13:57:19 -0400 Subject: [PATCH 13/25] Update src/bluesky/consolidators.py Co-authored-by: Max Rakitin --- src/bluesky/consolidators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index f10fb5fd6e..633d5adacc 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -247,7 +247,7 @@ def get_datum_uri(self, indx: int): This relies on the `template` parameter passed in the StreamResource, which is a string either in the "new" Python formatting style that can be evaluated to a file name using the `.format(indx)` method given an - integer index, e.g. "{:05}.tif". + integer index, e.g. "{:05d}.tif". """ return self.uri + self._sres_parameters["template"].format(indx) From b1870a4df4021c2015d676e4bcb58f4c0ea780d8 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Wed, 12 Jun 2024 17:25:03 -0400 Subject: [PATCH 14/25] cleanup --- src/bluesky/callbacks/core.py | 2 +- src/bluesky/callbacks/tiled_writer.py | 23 +++++++++++++------ src/bluesky/consolidators.py | 7 ++++++ .../tests/test_external_assets_and_paging.py | 2 +- src/bluesky/tests/test_tiled_writer.py | 9 ++++---- 5 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/bluesky/callbacks/core.py b/src/bluesky/callbacks/core.py index 7cd7d6ca32..b0ae352c50 100644 --- a/src/bluesky/callbacks/core.py +++ b/src/bluesky/callbacks/core.py @@ -18,7 +18,7 @@ MIMETYPE_LOOKUP = { "hdf5": "application/x-hdf5", - "ADHDF5_SWMR_STREAM": "application/x-hdf5", + "AD_HDF5_SWMR_STREAM": "application/x-hdf5", "AD_HDF5_SWMR_SLICE": "application/x-hdf5", "AD_TIFF": "multipart/related;type=image/tiff", "AD_HDF5_GERM": "application/x-hdf5", diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index cce71ced7f..0b51a7d1f7 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -3,7 +3,16 @@ import pandas as pd from event_model import RunRouter -from event_model.documents import Datum, EventDescriptor, Resource, StreamDatum, StreamResource +from event_model.documents import ( + Datum, + Event, + EventDescriptor, + Resource, + RunStart, + RunStop, + StreamDatum, + StreamResource, +) from pydantic.utils import deep_update from tiled.client import from_profile, from_uri from tiled.client.base import BaseClient @@ -79,14 +88,14 @@ def _ensure_resource_backcompat(self, doc: StreamResource) -> StreamResource: return doc - def start(self, doc): + def start(self, doc: RunStart): self.root_node = self.client.create_container( key=doc["uid"], metadata={"start": doc}, specs=[Spec("BlueskyRun", version="1.0")], ) - def stop(self, doc): + def stop(self, doc: RunStop): if self.root_node is None: raise RuntimeError("RunWriter is properly initialized: no Start document has been recorded.") @@ -151,10 +160,10 @@ def descriptor(self, doc: EventDescriptor): self._desc_nodes[uid] = desc_node - def event(self, doc): + def event(self, doc: Event): desc_node = self._desc_nodes[doc["descriptor"]] - # Process _internal_ data -- those without external flag or those that have been filled + # Process _internal_ data -- those keys without 'external' flag or those that have been filled data_keys_spec = {k: v for k, v in self.data_keys_int.items() if doc["filled"].get(k, True)} data_keys_spec.update({k: v for k, v in self.data_keys_ext.items() if doc["filled"].get(k, False)}) parent_node = desc_node["internal"] @@ -202,10 +211,10 @@ def event(self, doc): else: raise RuntimeError(f"Datum {datum_id} is referenced before being declared.") - def datum(self, doc): + def datum(self, doc: Datum): self._docs_cache[doc["datum_id"]] = copy.copy(doc) - def resource(self, doc): + def resource(self, doc: Resource): self._docs_cache[doc["uid"]] = self._ensure_resource_backcompat(doc) def stream_resource(self, doc: StreamResource): diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index d08cc0d496..befd552230 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -145,6 +145,8 @@ def adapter_parameters(self) -> Dict: These parameters are intended to provide any additional information required to read a data source of a specific mimetype, e.g. "path" the path into an HDF5 file or "template" the filename pattern of a TIFF sequence. + + This property is to be subclassed as necessary. """ return {} @@ -232,6 +234,11 @@ def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor) @property def adapter_parameters(self) -> Dict: + """Parameters to be passed to the HDF5 adapter, a dictionary with the keys: + + path: List[str] - file path represented as list split at `/` + swmr: bool -- True to enable the single writer / multiple readers regime + """ return {"path": self._sres_parameters["path"].strip("/").split("/"), "swmr": self.swmr} diff --git a/src/bluesky/tests/test_external_assets_and_paging.py b/src/bluesky/tests/test_external_assets_and_paging.py index 02efc2446f..b0720a8f57 100644 --- a/src/bluesky/tests/test_external_assets_and_paging.py +++ b/src/bluesky/tests/test_external_assets_and_paging.py @@ -137,7 +137,7 @@ def collect_asset_docs_stream_datum(self: Named, index: Optional[int] = None) -> data_key=data_key, root="/root", resource_path="/path.h5", - spec="ADHDF5_SWMR_STREAM", + spec="AD_HDF5_SWMR_STREAM", uid=uid, ) yield "stream_resource", stream_resource diff --git a/src/bluesky/tests/test_tiled_writer.py b/src/bluesky/tests/test_tiled_writer.py index ce5db60df6..301728493c 100644 --- a/src/bluesky/tests/test_tiled_writer.py +++ b/src/bluesky/tests/test_tiled_writer.py @@ -79,7 +79,7 @@ def _get_hdf5_stream(self, data_key: str, index: int) -> Tuple[StreamResource, S root=self.root, resource_path="/dataset.h5", uri="file://localhost" + file_path, - spec="ADHDF5_SWMR_STREAM", + spec="AD_HDF5_SWMR_STREAM", mimetype="application/x-hdf5", uid=uid, ) @@ -223,12 +223,13 @@ def test_stream_datum_collectable(RE, client, tmp_path): tw = TiledWriter(client) RE(collect_plan(det, name="primary"), tw) arrs = client.values().last()["primary"]["external"].values() + assert arrs[0].read() is not None assert arrs[1].read() is not None assert arrs[2].read() is not None -def test_stuff(RE, client, tmp_path): +def test_handling_non_stream_resource(RE, client, tmp_path): det = SynSignalWithRegistry( func=lambda: np.random.randint(0, 255, (10, 15), dtype="uint8"), dtype_str="uint8", @@ -247,10 +248,10 @@ def test_stuff(RE, client, tmp_path): assert extr.shape == (3, 10, 15) assert extr.read() is not None - assert set(intr.columns) == set("seq_num", "ts_img") + assert set(intr.columns) == {"seq_num", "ts_img"} assert len(intr.read()) == 3 assert (intr["seq_num"].read() == [1, 2, 3]).all() - assert set(conf.columns) == set("descriptor_uid", "img", "ts_img") + assert set(conf.columns) == {"descriptor_uid", "img", "ts_img"} assert len(conf.read()) == 1 From 49e3e648d9a0795382a0d09e2eec4c728899c5e6 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 18:07:15 -0400 Subject: [PATCH 15/25] Update src/bluesky/consolidators.py Co-authored-by: Max Rakitin --- src/bluesky/consolidators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index d2cfcfa870..4686983f74 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -70,7 +70,7 @@ class ConsolidatorBase: - Interpreting those DataSource and Asset parameters to do I/O (the adapter's job). To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the - `consume_stream_datum` and `get_data_source` methods, and ensure that the returned the `adapter_parameters` + `consume_stream_datum` and `get_data_source` methods, and ensure that the returned `adapter_parameters` property matches the expected adapter signature. Declare a set of supported mimetypes to allow valiadtion and automated discovery of the subclassed Consolidator. """ From 6490febc540f507dcafc462ed260a75dec73a7b6 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 18:08:56 -0400 Subject: [PATCH 16/25] Update src/bluesky/consolidators.py Co-authored-by: Max Rakitin --- src/bluesky/consolidators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index 4686983f74..bfbe5c1997 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -52,7 +52,7 @@ class DataSource: class ConsolidatorBase: - """Consolidator of StremDatums + """Consolidator of StreamDatums A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource From ef200bb7778fb2814a381a76c5d6f0ae763f540e Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 18:09:09 -0400 Subject: [PATCH 17/25] Update src/bluesky/consolidators.py Co-authored-by: Max Rakitin --- src/bluesky/consolidators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index bfbe5c1997..7f611f01e4 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -63,7 +63,7 @@ class ConsolidatorBase: We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled. - The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog paramter adapters_by_mimetype can be used + The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator's job); From b481ef87df9608465a2c1ab77a3368a9f9933aa2 Mon Sep 17 00:00:00 2001 From: Eugene Date: Wed, 12 Jun 2024 18:09:33 -0400 Subject: [PATCH 18/25] Update src/bluesky/consolidators.py Co-authored-by: Max Rakitin --- src/bluesky/consolidators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index 7f611f01e4..d7542d73cf 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -71,7 +71,7 @@ class ConsolidatorBase: To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the `consume_stream_datum` and `get_data_source` methods, and ensure that the returned `adapter_parameters` - property matches the expected adapter signature. Declare a set of supported mimetypes to allow valiadtion and + property matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator. """ From ce0cae082e26fbec0e3ca1e43f02a866a6059e14 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Wed, 12 Jun 2024 18:12:22 -0400 Subject: [PATCH 19/25] remove duplicate definition --- src/bluesky/callbacks/tiled_writer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/bluesky/callbacks/tiled_writer.py b/src/bluesky/callbacks/tiled_writer.py index 9a3b33579e..3d8da69f02 100644 --- a/src/bluesky/callbacks/tiled_writer.py +++ b/src/bluesky/callbacks/tiled_writer.py @@ -25,8 +25,6 @@ from ..consolidators import ConsolidatorBase, DataSource, StructureFamily, consolidator_factory from .core import MIMETYPE_LOOKUP, CallbackBase -DTYPE_LOOKUP = {"number": " Date: Thu, 13 Jun 2024 10:14:53 -0400 Subject: [PATCH 20/25] TST: consolidators shape and chunks --- src/bluesky/consolidators.py | 2 +- src/bluesky/tests/test_consolidators.py | 82 +++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 src/bluesky/tests/test_consolidators.py diff --git a/src/bluesky/consolidators.py b/src/bluesky/consolidators.py index d7542d73cf..05b6941c23 100644 --- a/src/bluesky/consolidators.py +++ b/src/bluesky/consolidators.py @@ -126,7 +126,7 @@ def chunks(self) -> Tuple[Tuple[int, ...], ...]: if self._num_rows % self.chunk_size: dim0_chunk.append(self._num_rows % self.chunk_size) - return tuple(dim0_chunk), *[(d,) for d in self.datum_shape] + return tuple(dim0_chunk or [0]), *[(d,) for d in self.datum_shape] @property def has_skips(self) -> bool: diff --git a/src/bluesky/tests/test_consolidators.py b/src/bluesky/tests/test_consolidators.py new file mode 100644 index 0000000000..851eae6675 --- /dev/null +++ b/src/bluesky/tests/test_consolidators.py @@ -0,0 +1,82 @@ +import pytest + +from bluesky.consolidators import HDF5Consolidator + + +@pytest.fixture +def descriptor(): + doc = { + "data_keys": { + "test_img": { + "shape": [10, 15], + "dtype": "array", + "dtype_str": " Date: Thu, 13 Jun 2024 10:15:38 -0400 Subject: [PATCH 21/25] TST: consolidators shape and chunks --- src/bluesky/tests/test_consolidators.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/bluesky/tests/test_consolidators.py b/src/bluesky/tests/test_consolidators.py index 851eae6675..2ac4e1d292 100644 --- a/src/bluesky/tests/test_consolidators.py +++ b/src/bluesky/tests/test_consolidators.py @@ -5,7 +5,7 @@ @pytest.fixture def descriptor(): - doc = { + return { "data_keys": { "test_img": { "shape": [10, 15], @@ -17,7 +17,6 @@ def descriptor(): }, "uid": "descriptor-uid", } - return doc @pytest.fixture From 9a818ff780eb5e35c1227e43cff0b158bbfde89f Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 14 Jun 2024 13:44:55 -0400 Subject: [PATCH 22/25] Use older pyepics version to pass tests on py3.8 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 15de235c4b..b2af42210c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,7 @@ dev = [ "pipdeptree", "pre-commit", "pydata-sphinx-theme>=0.12", - "pyepics", + "pyepics==3.5.2", "pyqt5", "pytest", "pytest-cov", From e2b8f24118e6204c370288da0aca9815a619d470 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 14 Jun 2024 15:23:13 -0400 Subject: [PATCH 23/25] Use older pyepics version to pass tests on py3.8 --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b2af42210c..7b307ad435 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,7 @@ dev = [ "pipdeptree", "pre-commit", "pydata-sphinx-theme>=0.12", + # Fixing the version of pyepics to pass CI/CD tests on py3.8; to be removed once we drop py3.8 support. "pyepics==3.5.2", "pyqt5", "pytest", From 16eca986a10e1b475c906248d1176ed8e21e3ae0 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 14 Jun 2024 15:37:44 -0400 Subject: [PATCH 24/25] Use older pyepics version to pass tests on py3.8 --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7b307ad435..075af9917b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,8 +60,8 @@ dev = [ "pipdeptree", "pre-commit", "pydata-sphinx-theme>=0.12", - # Fixing the version of pyepics to pass CI/CD tests on py3.8; to be removed once we drop py3.8 support. - "pyepics==3.5.2", + "pyepics==3.5.2;python_version<'3.9'", # Needed to pass CI/CD tests; To be removed once we drop support for py3.8 + "pyepics;python_version>='3.9'", "pyqt5", "pytest", "pytest-cov", From 0d35152e1eeb3d6efee36d03318853838291e735 Mon Sep 17 00:00:00 2001 From: Eugene M Date: Fri, 14 Jun 2024 15:44:05 -0400 Subject: [PATCH 25/25] Use older pyepics version to pass tests on py3.8 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 075af9917b..d56bbf1691 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,7 @@ dev = [ "pipdeptree", "pre-commit", "pydata-sphinx-theme>=0.12", - "pyepics==3.5.2;python_version<'3.9'", # Needed to pass CI/CD tests; To be removed once we drop support for py3.8 + "pyepics<=3.5.2;python_version<'3.9'", # Needed to pass CI/CD tests; To be removed once we drop support for py3.8 "pyepics;python_version>='3.9'", "pyqt5", "pytest",