-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
89eebc3
commit 15cff6a
Showing
2 changed files
with
77 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from datetime import datetime, timedelta, timezone | ||
|
||
import pytest | ||
|
||
from enlyze.api_clients.timeseries.models import TimeseriesData | ||
|
||
|
||
@pytest.fixture | ||
def timestamp(): | ||
return datetime.now(tz=timezone.utc) | ||
|
||
|
||
@pytest.fixture | ||
def timeseries_data_1(timestamp): | ||
return TimeseriesData( | ||
columns=["time", "var1", "var2"], | ||
records=[ | ||
[timestamp.isoformat(), 1, 2], | ||
[(timestamp - timedelta(minutes=10)).isoformat(), 3, 4], | ||
], | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def timeseries_data_2(timestamp): | ||
return TimeseriesData( | ||
columns=["time", "var3"], | ||
records=[ | ||
[timestamp.isoformat(), 5], | ||
[(timestamp - timedelta(minutes=10)).isoformat(), 6], | ||
], | ||
) | ||
|
||
|
||
class TestTimeseriesData: | ||
def test_merge(self, timeseries_data_1, timeseries_data_2): | ||
timeseries_data_1_records_len = len(timeseries_data_1.records) | ||
timeseries_data_1_columns_len = len(timeseries_data_1.columns) | ||
timeseries_data_2_records_len = len(timeseries_data_2.records) | ||
timeseries_data_2_columns_len = len(timeseries_data_2.columns) | ||
expected_merged_record_len = len(timeseries_data_1.records[0]) + len( | ||
timeseries_data_2.records[0][1:] | ||
) | ||
|
||
merged = timeseries_data_1.merge(timeseries_data_2) | ||
|
||
assert merged is timeseries_data_1 | ||
assert ( | ||
len(merged.records) | ||
== timeseries_data_1_records_len | ||
== timeseries_data_2_records_len | ||
) | ||
assert ( | ||
len(merged.columns) | ||
== timeseries_data_1_columns_len + timeseries_data_2_columns_len - 1 | ||
) | ||
|
||
for r in merged.records: | ||
assert len(r) == expected_merged_record_len | ||
|
||
def test_merge_raises_number_of_records_mismatch( | ||
self, timeseries_data_1, timeseries_data_2 | ||
): | ||
timeseries_data_2.records = timeseries_data_2.records[1:] | ||
with pytest.raises( | ||
ValueError, match="Number of records in both instances has to be the same" | ||
): | ||
timeseries_data_1.merge(timeseries_data_2) | ||
|
||
def test_merge_raises_mismatched_timestamps( | ||
self, timeseries_data_1, timeseries_data_2, timestamp | ||
): | ||
timeseries_data_2.records[0][0] = (timestamp - timedelta(days=1)).isoformat() | ||
|
||
with pytest.raises(ValueError, match="mismatched timestamps"): | ||
timeseries_data_1.merge(timeseries_data_2) |