diff --git a/src/enlyze/api_clients/timeseries/models.py b/src/enlyze/api_clients/timeseries/models.py index 1347bd0..59531db 100644 --- a/src/enlyze/api_clients/timeseries/models.py +++ b/src/enlyze/api_clients/timeseries/models.py @@ -75,6 +75,28 @@ def extend(self, other: "TimeseriesData") -> None: """Add records from ``other`` after the existing records.""" self.records.extend(other.records) + def merge(self, other: "TimeseriesData") -> "TimeseriesData": + """Merge records from ``other`` into the existing records.""" + if not (slen := len(self.records)) == (olen := len(other.records)): + raise ValueError( + "Cannot merge. Number of records in both instances has to be the same," + f" trying to merge an instance with {olen} records into an instance" + f" with {slen} records." + ) + + self.columns.extend(other.columns[1:]) + + for s, o in zip(self.records, other.records): + if s[0] != o[0]: + raise ValueError( + "Cannot merge. Attempted to merge records " + f"with mismatched timestamps {s[0]}, {o[0]}" + ) + + s.extend(o[1:]) + + return self + def to_user_model( self, start: datetime, diff --git a/src/enlyze/client.py b/src/enlyze/client.py index ce93707..a29f6bf 100644 --- a/src/enlyze/client.py +++ b/src/enlyze/client.py @@ -1,6 +1,7 @@ +from collections import abc from datetime import datetime -from functools import cache -from typing import Iterator, Mapping, Optional, Sequence +from functools import cache, reduce +from typing import Any, Iterator, Mapping, Optional, Sequence, Tuple, Union from uuid import UUID import enlyze.api_clients.timeseries.models as timeseries_api_models @@ -10,9 +11,11 @@ from enlyze.api_clients.timeseries.client import TimeseriesApiClient from enlyze.constants import ( ENLYZE_BASE_URL, + MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST, VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR, ) -from enlyze.errors import EnlyzeError +from enlyze.errors import EnlyzeError, ResamplingValidationError +from enlyze.iterable_tools import chunk from enlyze.validators import ( validate_datetime, validate_resampling_interval, @@ -21,6 +24,8 @@ validate_timeseries_arguments, ) +FETCHING_TIMESERIES_DATA_ERROR_MSG = "Error occurred when fetching timeseries data." + def _get_timeseries_data_from_pages( pages: Iterator[timeseries_api_models.TimeseriesData], @@ -42,6 +47,38 @@ def _get_timeseries_data_from_pages( return timeseries_data +def _get_variables_sequence_and_query_parameter_list( + variables: Union[ + Sequence[user_models.Variable], + Mapping[user_models.Variable, user_models.ResamplingMethod], + ], + resampling_interval: Optional[int], +) -> Tuple[Sequence[user_models.Variable], Sequence[str]]: + if isinstance(variables, abc.Sequence) and resampling_interval is not None: + raise ResamplingValidationError( + "`variables` must be a mapping {variable: ResamplingMethod}" + ) + + if resampling_interval: + validate_resampling_interval(resampling_interval) + variables_sequence = [] + variables_query_parameter_list = [] + for variable, resampling_method in variables.items(): # type: ignore + variables_sequence.append(variable) + variables_query_parameter_list.append( + f"{variable.uuid}" + f"{VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR}" + f"{resampling_method.value}" + ) + + validate_resampling_method_for_data_type( + resampling_method, variable.data_type + ) + return variables_sequence, variables_query_parameter_list + + return variables, [str(v.uuid) for v in variables] # type: ignore + + class EnlyzeClient: """Main entrypoint for interacting with the ENLYZE platform. @@ -150,6 +187,95 @@ def get_variables( for variable in self._get_variables(machine.uuid) ] + def _get_paginated_timeseries( + self, + *, + machine_uuid: str, + start: datetime, + end: datetime, + variables: Sequence[str], + resampling_interval: Optional[int], + ) -> Iterator[timeseries_api_models.TimeseriesData]: + params: dict[str, Any] = { + "appliance": machine_uuid, + "start_datetime": start.isoformat(), + "end_datetime": end.isoformat(), + "variables": ",".join(variables), + } + + if resampling_interval: + params["resampling_interval"] = resampling_interval + + return self._timeseries_api_client.get_paginated( + "timeseries", timeseries_api_models.TimeseriesData, params=params + ) + + def _get_timeseries( + self, + start: datetime, + end: datetime, + variables: Union[ + Sequence[user_models.Variable], + Mapping[user_models.Variable, user_models.ResamplingMethod], + ], + resampling_interval: Optional[int] = None, + ) -> Optional[user_models.TimeseriesData]: + variables_sequence, variables_query_parameter_list = ( + _get_variables_sequence_and_query_parameter_list( + variables, resampling_interval + ) + ) + + start, end, machine_uuid = validate_timeseries_arguments( + start, end, variables_sequence + ) + + try: + chunks = chunk( + variables_query_parameter_list, + MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST, + ) + except ValueError as e: + raise EnlyzeError(FETCHING_TIMESERIES_DATA_ERROR_MSG) from e + + chunks_pages = ( + self._get_paginated_timeseries( + machine_uuid=machine_uuid, + start=start, + end=end, + variables=chunk, + resampling_interval=resampling_interval, + ) + for chunk in chunks + ) + + timeseries_data_chunked = [ + _get_timeseries_data_from_pages(pages) for pages in chunks_pages + ] + + if not timeseries_data_chunked or all( + data is None for data in timeseries_data_chunked + ): + return None + + if any(data is None for data in timeseries_data_chunked) and any( + data is not None for data in timeseries_data_chunked + ): + raise EnlyzeError( + "The timeseries API didn't return data for some of the variables." + ) + + try: + timeseries_data = reduce(lambda x, y: x.merge(y), timeseries_data_chunked) # type: ignore # noqa + except ValueError as e: + raise EnlyzeError(FETCHING_TIMESERIES_DATA_ERROR_MSG) from e + + return timeseries_data.to_user_model( # type: ignore + start=start, + end=end, + variables=variables_sequence, + ) + def get_timeseries( self, start: datetime, @@ -180,28 +306,7 @@ def get_timeseries( """ - start, end, machine_uuid = validate_timeseries_arguments(start, end, variables) - - pages = self._timeseries_api_client.get_paginated( - "timeseries", - timeseries_api_models.TimeseriesData, - params={ - "appliance": machine_uuid, - "start_datetime": start.isoformat(), - "end_datetime": end.isoformat(), - "variables": ",".join(str(v.uuid) for v in variables), - }, - ) - - timeseries_data = _get_timeseries_data_from_pages(pages) - if timeseries_data is None: - return None - - return timeseries_data.to_user_model( - start=start, - end=end, - variables=variables, - ) + return self._get_timeseries(start, end, variables) def get_timeseries_with_resampling( self, @@ -241,48 +346,7 @@ def get_timeseries_with_resampling( request """ # noqa: E501 - variables_sequence = [] - variables_query_parameter_list = [] - for variable, resampling_method in variables.items(): - variables_sequence.append(variable) - variables_query_parameter_list.append( - f"{variable.uuid}" - f"{VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR}" - f"{resampling_method.value}" - ) - - validate_resampling_method_for_data_type( - resampling_method, variable.data_type - ) - - start, end, machine_uuid = validate_timeseries_arguments( - start, - end, - variables_sequence, - ) - validate_resampling_interval(resampling_interval) - - pages = self._timeseries_api_client.get_paginated( - "timeseries", - timeseries_api_models.TimeseriesData, - params={ - "appliance": machine_uuid, - "start_datetime": start.isoformat(), - "end_datetime": end.isoformat(), - "variables": ",".join(variables_query_parameter_list), - "resampling_interval": resampling_interval, - }, - ) - - timeseries_data = _get_timeseries_data_from_pages(pages) - if timeseries_data is None: - return None - - return timeseries_data.to_user_model( - start=start, - end=end, - variables=variables_sequence, - ) + return self._get_timeseries(start, end, variables, resampling_interval) def _get_production_runs( self, diff --git a/src/enlyze/constants.py b/src/enlyze/constants.py index 90c8eaf..337f312 100644 --- a/src/enlyze/constants.py +++ b/src/enlyze/constants.py @@ -9,7 +9,7 @@ #: HTTP timeout for requests to the Timeseries API. #: -#: Reference: https://www.python-httpx.org/advanced/#timeout-configuration +#: Reference: https://www.python-httpx.org/advanced/timeouts/ HTTPX_TIMEOUT = 30.0 #: The separator to use when to separate the variable UUID and the resampling method @@ -18,3 +18,7 @@ #: The minimum allowed resampling interval when resampling timeseries data. MINIMUM_RESAMPLING_INTERVAL = 10 + +#: The maximum number of variables that can be used in a single request when querying +#: timeseries data. +MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST = 100 diff --git a/src/enlyze/iterable_tools.py b/src/enlyze/iterable_tools.py new file mode 100644 index 0000000..64ca17c --- /dev/null +++ b/src/enlyze/iterable_tools.py @@ -0,0 +1,12 @@ +from typing import Iterable, Sequence, TypeVar + +MINIMUM_CHUNK_SIZE = 1 + +T = TypeVar("T") + + +def chunk(seq: Sequence[T], chunk_size: int) -> Iterable[Sequence[T]]: + if chunk_size < MINIMUM_CHUNK_SIZE: + raise ValueError(f"{chunk_size=} is less than {MINIMUM_CHUNK_SIZE=}") + + return (seq[i : i + chunk_size] for i in range(0, len(seq), chunk_size)) diff --git a/tests/enlyze/api_clients/timeseries/test_models.py b/tests/enlyze/api_clients/timeseries/test_models.py new file mode 100644 index 0000000..0fbc0a0 --- /dev/null +++ b/tests/enlyze/api_clients/timeseries/test_models.py @@ -0,0 +1,83 @@ +from datetime import datetime, timedelta, timezone + +import pytest + +from enlyze.api_clients.timeseries.models import TimeseriesData + +# We use this to skip columns that contain the timestamp assuming +# it starts at the beginning of the sequence. We also use it +# when computing lengths to account for a timestamp column. +TIMESTAMP_OFFSET = 1 + + +@pytest.fixture +def timestamp(): + return datetime.now(tz=timezone.utc) + + +@pytest.fixture +def timeseries_data_1(timestamp): + return TimeseriesData( + columns=["time", "var1", "var2"], + records=[ + [timestamp.isoformat(), 1, 2], + [(timestamp - timedelta(minutes=10)).isoformat(), 3, 4], + ], + ) + + +@pytest.fixture +def timeseries_data_2(timestamp): + return TimeseriesData( + columns=["time", "var3"], + records=[ + [timestamp.isoformat(), 5], + [(timestamp - timedelta(minutes=10)).isoformat(), 6], + ], + ) + + +class TestTimeseriesData: + def test_merge(self, timeseries_data_1, timeseries_data_2): + timeseries_data_1_records_len = len(timeseries_data_1.records) + timeseries_data_1_columns_len = len(timeseries_data_1.columns) + timeseries_data_2_records_len = len(timeseries_data_2.records) + timeseries_data_2_columns_len = len(timeseries_data_2.columns) + expected_merged_record_len = len(timeseries_data_1.records[0]) + len( + timeseries_data_2.records[0][TIMESTAMP_OFFSET:] + ) + + merged = timeseries_data_1.merge(timeseries_data_2) + + assert merged is timeseries_data_1 + assert ( + len(merged.records) + == timeseries_data_1_records_len + == timeseries_data_2_records_len + ) + assert ( + len(merged.columns) + == timeseries_data_1_columns_len + + timeseries_data_2_columns_len + - TIMESTAMP_OFFSET + ) + + for r in merged.records: + assert len(r) == expected_merged_record_len == len(merged.columns) + + def test_merge_raises_number_of_records_mismatch( + self, timeseries_data_1, timeseries_data_2 + ): + timeseries_data_2.records = timeseries_data_2.records[1:] + with pytest.raises( + ValueError, match="Number of records in both instances has to be the same" + ): + timeseries_data_1.merge(timeseries_data_2) + + def test_merge_raises_mismatched_timestamps( + self, timeseries_data_1, timeseries_data_2, timestamp + ): + timeseries_data_2.records[0][0] = (timestamp - timedelta(days=1)).isoformat() + + with pytest.raises(ValueError, match="mismatched timestamps"): + timeseries_data_1.merge(timeseries_data_2) diff --git a/tests/enlyze/test_client.py b/tests/enlyze/test_client.py index 5a43939..9bfe547 100644 --- a/tests/enlyze/test_client.py +++ b/tests/enlyze/test_client.py @@ -23,10 +23,11 @@ from enlyze.client import EnlyzeClient from enlyze.constants import ( ENLYZE_BASE_URL, + MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST, PRODUCTION_RUNS_API_SUB_PATH, TIMESERIES_API_SUB_PATH, ) -from enlyze.errors import EnlyzeError +from enlyze.errors import EnlyzeError, ResamplingValidationError from tests.conftest import ( datetime_before_today_strategy, datetime_today_until_now_strategy, @@ -34,7 +35,7 @@ MOCK_RESPONSE_HEADERS = {"Content-Type": "application/json"} -APPLIANCE_UUID = "ebef7e5a-5921-4cf3-9a52-7ff0e98e8306" +MACHINE_UUID = "ebef7e5a-5921-4cf3-9a52-7ff0e98e8306" PRODUCT_CODE = "product-code" PRODUCTION_ORDER = "production-order" SITE_ID = 1 @@ -62,7 +63,7 @@ start=datetime_before_today_strategy, end=datetime_today_until_now_strategy, appliance=st.builds( - production_runs_api_models.Machine, uuid=st.just(APPLIANCE_UUID) + production_runs_api_models.Machine, uuid=st.just(MACHINE_UUID) ), product=st.builds( production_runs_api_models.Product, @@ -335,6 +336,74 @@ def test_get_timeseries_returns_none_on_empty_response( ) +@given( + data_strategy=st.data(), + records=st.lists( + st.tuples( + datetime_today_until_now_strategy.map(datetime.isoformat), + st.integers(), + ), + min_size=2, + max_size=5, + ), + machine=st.builds(timeseries_api_models.Machine, uuid=st.just(MACHINE_UUID)), +) +@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) +def test__get_timeseries_raises_on_mixed_response( + data_strategy, + start_datetime, + end_datetime, + records, + machine, +): + """ + Tests that an `EnlyzeError` is raised if the timeseries API returns + data for some of the variables but not all of them. + """ + client = make_client() + variables = data_strategy.draw( + st.lists( + st.builds( + user_models.Variable, + data_type=st.just("INTEGER"), + machine=st.just(machine), + ), + min_size=MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST + 1, + max_size=MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST + 5, + ) + ) + + with respx_mock_with_base_url(TIMESERIES_API_SUB_PATH) as mock: + mock.get("timeseries").mock( + side_effect=[ + PaginatedTimeseriesApiResponse( + data=timeseries_api_models.TimeseriesData( + columns=[ + "time", + *[ + str(variable.uuid) + for variable in variables[ + :MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST + ] + ], + ], + records=records, + ).model_dump(), + ), + PaginatedTimeseriesApiResponse( + data=timeseries_api_models.TimeseriesData( + columns=[], + records=[], + ).model_dump(), + ), + ] + ) + with pytest.raises( + EnlyzeError, match="didn't return data for some of the variables" + ): + client._get_timeseries(start_datetime, end_datetime, variables) + + def test_get_timeseries_raises_no_variables(start_datetime, end_datetime): client = make_client() with pytest.raises(EnlyzeError, match="at least one variable"): @@ -390,6 +459,79 @@ def test_get_timeseries_raises_api_returned_no_timestamps( client.get_timeseries(start_datetime, end_datetime, [variable]) +@given( + variable=st.builds(user_models.Variable), +) +@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) +def test__get_timeseries_raises_variables_without_resampling_method( + start_datetime, end_datetime, variable +): + """ + Test that `get_timeseries` will raise an `EnlyzeError` when a + `resampling_interval` is specified but variables don't have + resampling methods. + """ + client = make_client() + with pytest.raises(ResamplingValidationError): + client._get_timeseries(start_datetime, end_datetime, [variable], 30) + + +@given( + variable=st.builds(user_models.Variable), +) +@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) +def test__get_timeseries_raises_on_chunk_value_error( + start_datetime, end_datetime, variable, monkeypatch +): + monkeypatch.setattr( + "enlyze.client.MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST", 0 + ) + client = make_client() + with pytest.raises(EnlyzeError) as exc_info: + client._get_timeseries(start_datetime, end_datetime, [variable]) + assert isinstance(exc_info.value.__cause__, ValueError) + + +@given( + start=datetime_before_today_strategy, + end=datetime_today_until_now_strategy, + variable=st.builds( + user_models.Variable, + data_type=st.just("INTEGER"), + machine=st.builds(timeseries_api_models.Machine), + ), + records=st.lists( + st.tuples( + datetime_today_until_now_strategy.map(datetime.isoformat), + st.integers(), + ), + min_size=2, + ), +) +@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) +def test__get_timeseries_raises_on_merge_value_error( + start, end, variable, records, monkeypatch +): + client = make_client() + + def f(*args, **kwargs): + raise ValueError + + monkeypatch.setattr("enlyze.client.reduce", f) + + with respx_mock_with_base_url(TIMESERIES_API_SUB_PATH) as mock: + mock.get("timeseries").mock( + PaginatedTimeseriesApiResponse( + data=timeseries_api_models.TimeseriesData( + columns=["time", str(variable.uuid)], + records=records, + ).model_dump() + ) + ) + with pytest.raises(EnlyzeError): + client._get_timeseries(start, end, [variable]) + + @given( production_order=st.just(PRODUCTION_ORDER), product=st.one_of( @@ -399,7 +541,7 @@ def test_get_timeseries_raises_api_returned_no_timestamps( machine=st.builds( timeseries_api_models.Machine, site=st.just(SITE_ID), - uuid=st.just(APPLIANCE_UUID), + uuid=st.just(MACHINE_UUID), ), site=st.builds(timeseries_api_models.Site, id=st.just(SITE_ID)), start=st.one_of(datetime_before_today_strategy, st.none()), diff --git a/tests/enlyze/test_iterable_tools.py b/tests/enlyze/test_iterable_tools.py new file mode 100644 index 0000000..eb9f034 --- /dev/null +++ b/tests/enlyze/test_iterable_tools.py @@ -0,0 +1,26 @@ +from typing import Sequence + +import pytest +from hypothesis import given +from hypothesis.strategies import integers, lists + +from enlyze.iterable_tools import MINIMUM_CHUNK_SIZE, chunk + + +@given( + seq=lists(integers()), + chunk_size=integers(min_value=MINIMUM_CHUNK_SIZE), +) +def test_chunk(seq: Sequence[int], chunk_size: int): + result = list(chunk(seq, chunk_size)) + assert sum(len(sublist) for sublist in result) == len(seq) + assert all(len(sublist) <= chunk_size for sublist in result) + + +@given( + seq=lists(integers()), + chunk_size=integers(max_value=MINIMUM_CHUNK_SIZE - 1), +) +def test_chunk_raises_invalid_chunk_size(seq: Sequence[int], chunk_size: int): + with pytest.raises(ValueError): + chunk(seq, chunk_size)