Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite frame merge and more logging #581

Draft
wants to merge 7 commits into
base: 8.9.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iotfunctions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
import os
import pkgutil

__version__ = '8.8.0'
__version__ = '8.9.19'
__all__ = list(module for (_, module, _) in pkgutil.iter_modules([os.path.dirname(__file__)]))
59 changes: 33 additions & 26 deletions iotfunctions/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import time
import hashlib # encode feature names
import traceback

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -97,6 +98,10 @@
Saliency_normalizer = 1
Generalized_normalizer = 1 / 300

# Do away with numba logs
numba_logger = logging.getLogger('numba')
numba_logger.setLevel(logging.ERROR)

# from
# https://stackoverflow.com/questions/44790072/sliding-window-on-time-series-data
def view_as_windows1(temperature, length, step):
Expand Down Expand Up @@ -261,27 +266,6 @@ def transform_spectral_residual(self, values):
return spectral_residual


def merge_score(dfEntity, dfEntityOrig, column_name, score, mindelta):
"""
Fit interpolated score to original entity slice of the full dataframe
"""

# equip score with time values, make sure it's positive
score[score < 0] = 0
dfEntity[column_name] = score

# merge
dfEntityOrig = pd.merge_asof(dfEntityOrig, dfEntity[column_name], left_index=True, right_index=True,
direction='nearest', tolerance=mindelta)

if column_name + '_y' in dfEntityOrig:
merged_score = dfEntityOrig[column_name + '_y'].to_numpy()
else:
merged_score = dfEntityOrig[column_name].to_numpy()

return merged_score


#######################################################################################
# Scalers
#######################################################################################
Expand Down Expand Up @@ -563,7 +547,8 @@ def prepare_data(self, dfEntity):

# interpolate gaps - data imputation
try:
dfe = dfe.dropna(subset=[self.input_item]).interpolate(method="time")
#dfe = dfe.dropna(subset=[self.input_item]).interpolate(method="time")
dfe = dfe.interpolate(method="time")
except Exception as e:
logger.error('Prepare data error: ' + str(e))

Expand Down Expand Up @@ -611,10 +596,15 @@ def _calc(self, df):

# remove all rows with only null entries
dfe = dfe_orig.dropna(how='all')
logger.info('Anomaly ' + str(df[self.output_items[0]].values.shape) + ', ' +
str(dfe_orig[self.output_items[0]].values.shape) + ', ' +
str(dfe[self.output_items[0]].values.shape))

# minimal time delta for merging
mindelta, dfe_orig = min_delta(dfe_orig)

logger.info('Anomaly II ' + str(dfe_orig[self.output_items[0]].values.shape))

logger.debug('Timedelta:' + str(mindelta) + ' Index: ' + str(dfe_orig.index))

# one dimensional time series - named temperature for catchyness
Expand Down Expand Up @@ -658,8 +648,25 @@ def _calc(self, df):
linear_interpolate = sp.interpolate.interp1d(time_series_temperature, scores[i], kind='linear',
fill_value='extrapolate')

zScoreII = merge_score(dfe, dfe_orig, output_item,
abs(linear_interpolate(np.arange(0, temperature.size, 1))), mindelta)
# stretch anomaly score to fit temperature.size
score = abs(linear_interpolate(np.arange(0, temperature.size, 1)))

# and make sure sure it's positive
score[score < 0] = 0

dfe[output_item] = score

# merge so that data is stretched to match the original data w/o gaps and NaNs
dfe_orig = pd.merge_asof(dfe_orig, dfe[output_item], left_index=True, right_index=True,
direction='nearest', tolerance=mindelta)

if output_item + '_y' in dfe_orig:
zScoreII = dfe_orig[output_item + '_y'].to_numpy()
else:
zScoreII = dfe_orig[output_item].to_numpy()

logger.debug('Merge Score : ' + str(score.shape) + ', ' + str(zScoreII.shape))

# fast path - either cut off or just copy
elif diff < 0:
zScoreII = scores[i][0:temperature.size]
Expand All @@ -669,12 +676,12 @@ def _calc(self, df):
# make sure shape is correct
try:
df[output_item] = zScoreII
except Exception as e2:
except Exception as e2:
df[output_item] = zScoreII.reshape(-1,1)
pass

except Exception as e:
logger.error(self.whoami + ' score integration failed with ' + str(e))
logger.error(self.whoami + ' score integration failed with ' + str(e) + '\n' + traceback.format_exc())

logger.debug('--->')

Expand Down
8 changes: 4 additions & 4 deletions tests/test_base_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ def test_base_functions():
df_i['Test2'] = df_i[Temperature] + addl
df_i['Test3'] = df_i[Temperature] + addl
df_i['Test4'] = df_i[Temperature] + addl
df_i['Test1'][3] = None
df_i['Test2'][2] = None
df_i['Test2'][3] = None
df_i['Test3'][1] = None
df_i['Test1'][3] = np.nan
df_i['Test2'][2] = np.nan
df_i['Test2'][3] = np.nan
df_i['Test3'][1] = np.nan
df_i['Test4'][1] = 10000.0
df_i['Test4'][3] = 20000.0

Expand Down