From 15d140439da185607c6238680e0f75ef7d45e18b Mon Sep 17 00:00:00 2001 From: Marcel Arpogaus <38564291+MArpogaus@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:02:53 +0200 Subject: [PATCH 1/6] feat: allows to filter out patches containing NaN values --- src/tensorflow_time_series_dataset/factory.py | 3 ++- .../pipeline/patch_generator.py | 23 ++++++++++++++++--- .../pipeline/windowed_time_series_pipeline.py | 8 +++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/tensorflow_time_series_dataset/factory.py b/src/tensorflow_time_series_dataset/factory.py index ff47474..e0107f4 100644 --- a/src/tensorflow_time_series_dataset/factory.py +++ b/src/tensorflow_time_series_dataset/factory.py @@ -4,7 +4,7 @@ # author : Marcel Arpogaus # # created : 2022-01-07 09:02:38 (Marcel Arpogaus) -# changed : 2024-02-19 12:57:42 (Marcel Arpogaus) +# changed : 2024-09-12 15:43:12 (Marcel Arpogaus) # DESCRIPTION ################################################################# # ... # LICENSE ##################################################################### @@ -53,6 +53,7 @@ class WindowedTimeSeriesDatasetFactory: "cycle_length": 1, "shuffle_buffer_size": 1000, "cache": True, + "filter_nans": True, } def __init__( diff --git a/src/tensorflow_time_series_dataset/pipeline/patch_generator.py b/src/tensorflow_time_series_dataset/pipeline/patch_generator.py index 447373e..7d037df 100644 --- a/src/tensorflow_time_series_dataset/pipeline/patch_generator.py +++ b/src/tensorflow_time_series_dataset/pipeline/patch_generator.py @@ -4,7 +4,7 @@ # author : Marcel Arpogaus # # created : 2022-01-07 09:02:38 (Marcel Arpogaus) -# changed : 2024-02-19 12:52:06 (Marcel Arpogaus) +# changed : 2024-09-12 15:52:32 (Marcel Arpogaus) # DESCRIPTION ################################################################# # ... # LICENSE ##################################################################### @@ -35,20 +35,25 @@ class PatchGenerator: The size of each patch. shift : int The shift between patches. + filter_nans : int + Apply a filter function to drop patches containing NaN values. """ - def __init__(self, window_size: int, shift: int) -> None: + def __init__(self, window_size: int, shift: int, filter_nans: bool) -> None: """Parameters ---------- window_size : int The size of each patch. shift : int The shift between patches. + filter_nans : int + If True, apply a filter function to drop patches containing NaN values. """ self.window_size: int = window_size self.shift: int = shift + self.filter_nans: bool = filter_nans def __call__(self, data: tf.Tensor) -> tf.data.Dataset: """Converts input data into patches of provided window size. @@ -71,6 +76,18 @@ def __call__(self, data: tf.Tensor) -> tf.data.Dataset: size=self.window_size, shift=self.shift, drop_remainder=True, - ).flat_map(lambda sub: sub.batch(self.window_size, drop_remainder=True)) + ) + + def sub_to_patch(sub): + return sub.batch(self.window_size, drop_remainder=True) + + data_set = data_set.flat_map(sub_to_patch) + + if self.filter_nans: + + def filter_func(patch): + return tf.reduce_all(tf.logical_not(tf.math.is_nan(patch))) + + data_set = data_set.filter(filter_func) return data_set diff --git a/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py b/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py index 36ea891..bfb549c 100644 --- a/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py +++ b/src/tensorflow_time_series_dataset/pipeline/windowed_time_series_pipeline.py @@ -4,7 +4,7 @@ # author : Marcel Arpogaus # # created : 2022-01-07 09:02:38 (Marcel Arpogaus) -# changed : 2024-02-19 12:53:06 (Marcel Arpogaus) +# changed : 2024-09-12 16:01:27 (Marcel Arpogaus) # DESCRIPTION ################################################################# # ... # LICENSE ##################################################################### @@ -64,6 +64,8 @@ class WindowedTimeSeriesPipeline: Whether to cache the dataset in memory or to a specific file. drop_remainder : bool Whether to drop the remainder of batches that are not equal to the batch size. + filter_nans : int + Apply a filter function to drop patches containing NaN values. Raises ------ @@ -85,6 +87,7 @@ def __init__( shuffle_buffer_size: int, cache: Union[str, bool], drop_remainder: bool, + filter_nans: bool, ) -> None: assert ( prediction_size > 0 @@ -101,6 +104,7 @@ def __init__( self.shuffle_buffer_size = shuffle_buffer_size self.cache = cache self.drop_remainder = drop_remainder + self.filter_nans = filter_nans def __call__(self, ds: Dataset) -> Dataset: """Applies the pipeline operations to the given dataset. @@ -117,7 +121,7 @@ def __call__(self, ds: Dataset) -> Dataset: """ ds = ds.interleave( - PatchGenerator(self.window_size, self.shift), + PatchGenerator(self.window_size, self.shift, self.filter_nans), cycle_length=self.cycle_length, num_parallel_calls=tf.data.experimental.AUTOTUNE, ) From f9b17e8455962f0fd1a5fc4c216abd53aae04d8e Mon Sep 17 00:00:00 2001 From: Marcel Arpogaus <38564291+MArpogaus@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:03:20 +0200 Subject: [PATCH 2/6] ci: update pipeline test to reflect latest changes to PatchGenerator --- tests/test_pipleine.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/tests/test_pipleine.py b/tests/test_pipleine.py index 14e8100..6627ea4 100644 --- a/tests/test_pipleine.py +++ b/tests/test_pipleine.py @@ -34,7 +34,7 @@ def patched_dataset( ds = tf.data.Dataset.from_tensors(df[sorted(used_cols)]) ds = ds.interleave( - PatchGenerator(window_size=window_size, shift=shift), + PatchGenerator(window_size=window_size, shift=shift, filter_nans=False), num_parallel_calls=tf.data.experimental.AUTOTUNE, ) return ds, df, window_size, shift @@ -53,7 +53,7 @@ def test_patch_generator(time_series_df, window_size, shift): ds = tf.data.Dataset.from_tensors(df) ds_patched = ds.interleave( - PatchGenerator(window_size=window_size, shift=shift), + PatchGenerator(window_size=window_size, shift=shift, filter_nans=False), num_parallel_calls=tf.data.experimental.AUTOTUNE, ) for i, patch in enumerate(ds_patched.as_numpy_iterator()): @@ -65,6 +65,36 @@ def test_patch_generator(time_series_df, window_size, shift): assert i + 1 == patches, "Not enough patches" +@pytest.mark.parametrize("window_size,shift", [(2 * 48, 48), (48 + 1, 1)]) +def test_patch_generator_filter_nans(time_series_df, window_size, shift): + df = time_series_df.set_index("date_time") + # randomly set 20% of elemnts in the dataset for nans + + df = time_series_df.set_index("date_time") + nan_mask = np.random.default_rng(1).uniform(0, 1, df.shape) < 0.01 + df[nan_mask] = np.nan + + initial_size = window_size - shift + data_size = df.index.size - initial_size + patches = data_size // shift + + expected_shape = (window_size, len(df.columns)) + + ds = tf.data.Dataset.from_tensors(df) + ds_patched = ds.interleave( + PatchGenerator(window_size=window_size, shift=shift, filter_nans=True), + num_parallel_calls=tf.data.experimental.AUTOTUNE, + ) + for i, patch in enumerate(ds_patched.as_numpy_iterator()): + assert patch.shape == expected_shape, "Wrong shape" + x1 = patch[0, 0] + idx = int(x1 % 1e5) + expected_values = df.iloc[idx : idx + window_size] + assert np.all(patch == expected_values), "Patch contains wrong data" + assert not np.isnan(patch).any(), "Patch contains NaNs." + assert i + 1 < patches, "No patches have been dropped" + + @pytest.mark.parametrize("window_size,shift", [(2 * 48, 48), (48 + 1, 1)]) def test_patch_generator_groupby(groupby_dataset, window_size, shift): ds, df = groupby_dataset @@ -78,7 +108,7 @@ def test_patch_generator_groupby(groupby_dataset, window_size, shift): expected_shape = (window_size, len(columns)) ds_patched = ds.interleave( - PatchGenerator(window_size=window_size, shift=shift), + PatchGenerator(window_size=window_size, shift=shift, filter_nans=True), num_parallel_calls=tf.data.experimental.AUTOTUNE, ) From de62125f5278f798b33067f450bb19a5239db68e Mon Sep 17 00:00:00 2001 From: Marcel Arpogaus <38564291+MArpogaus@users.noreply.github.com> Date: Thu, 12 Sep 2024 16:09:13 +0000 Subject: [PATCH 3/6] ci: remove macos platform test, since they dont support python 3.7 --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index ba6c48c..8e7eb5a 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -7,7 +7,7 @@ jobs: strategy: matrix: - platform: [ubuntu-latest, macos-latest, windows-latest] + platform: [ubuntu-latest, windows-latest] python-version: [3.7, 3.8, 3.9, '3.10', '3.11'] runs-on: ${{ matrix.platform }} From 740e51cb55139f95c1594c5b1a60cecf0648cb63 Mon Sep 17 00:00:00 2001 From: Marcel Arpogaus <38564291+MArpogaus@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:21:40 +0200 Subject: [PATCH 4/6] fix: disable NaN filtering per default to ensure errors if they are unexpected --- src/tensorflow_time_series_dataset/factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tensorflow_time_series_dataset/factory.py b/src/tensorflow_time_series_dataset/factory.py index e0107f4..ca015c7 100644 --- a/src/tensorflow_time_series_dataset/factory.py +++ b/src/tensorflow_time_series_dataset/factory.py @@ -4,7 +4,7 @@ # author : Marcel Arpogaus # # created : 2022-01-07 09:02:38 (Marcel Arpogaus) -# changed : 2024-09-12 15:43:12 (Marcel Arpogaus) +# changed : 2024-09-12 16:21:24 (Marcel Arpogaus) # DESCRIPTION ################################################################# # ... # LICENSE ##################################################################### @@ -53,7 +53,7 @@ class WindowedTimeSeriesDatasetFactory: "cycle_length": 1, "shuffle_buffer_size": 1000, "cache": True, - "filter_nans": True, + "filter_nans": False, } def __init__( From d5729f57a9f8d9dd4fe1b27beea5e22943d85772 Mon Sep 17 00:00:00 2001 From: Marcel Arpogaus <38564291+MArpogaus@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:26:08 +0200 Subject: [PATCH 5/6] ci: add missing filter_nans kwarg to Pipeline test --- tests/test_pipleine.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_pipleine.py b/tests/test_pipleine.py index 6627ea4..8d070fd 100644 --- a/tests/test_pipleine.py +++ b/tests/test_pipleine.py @@ -196,7 +196,9 @@ def test_windowed_time_series_pipeline( batch_size=batch_size, drop_remainder=True, ) - pipeline_kwargs = dict(cycle_length=1, shuffle_buffer_size=100, cache=True) + pipeline_kwargs = dict( + cycle_length=1, shuffle_buffer_size=100, cache=True, filter_nans=False + ) with validate_args( history_size=history_size, @@ -239,7 +241,9 @@ def test_windowed_time_series_pipeline_groupby( batch_size=batch_size, drop_remainder=False, ) - pipeline_kwargs = dict(cycle_length=len(ids), shuffle_buffer_size=1000, cache=True) + pipeline_kwargs = dict( + cycle_length=len(ids), shuffle_buffer_size=1000, cache=True, filter_nans=False + ) with validate_args( history_size=history_size, From 7fc54bd50c4dd58123838bc5e55b01741db1793c Mon Sep 17 00:00:00 2001 From: Marcel Arpogaus <38564291+MArpogaus@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:57:50 +0200 Subject: [PATCH 6/6] =?UTF-8?q?bump:=20version=200.1.2=20=E2=86=92=200.2.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8e9e4f..bc3d93f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## v0.2.0 (2024-09-12) + +### Feat + +- allows to filter out patches containing NaN values + +### Fix + +- disable NaN filtering per default to ensure errors if they are unexpected + ## v0.1.2 (2024-02-19) ## v0.1.1 (2024-02-19)