Skip to content

Commit

Permalink
add fast hankel matrix product to the esst
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas Weber authored and Lucas Weber committed Aug 14, 2024
1 parent cf265aa commit aea7925
Showing 1 changed file with 82 additions and 32 deletions.
114 changes: 82 additions & 32 deletions changepoynt/algorithms/esst.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from changepoynt.utils import linalg as lg
import fbpca
import multiprocessing as mp
import numba as nb


class ESST(Algorithm):
Expand All @@ -17,16 +18,17 @@ class ESST(Algorithm):
"""

def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5,
scale: bool = True, scoring_step: int = 1, parallel=False) -> None:
scale: bool = True, method: str = 'fbrsvd', random_rank: int = None, scoring_step: int = 1,
parallel: bool = False, use_fast_hankel: bool = False, threads: int = 2) -> None:
"""
Experimental change point detection method evaluation the prevalence of change points within a signal
by comparing the difference in eigenvectors between to points in time.
:param window_length: This specifies the length of the time series (in samples), which will be used to extract
the representative "eigensignals" to be compared before and after the lag. The windows length should be big
enough to cover any wanted patterns (e.g. bigger than the periodicity of periodic signals).
the representative "eigensignals" to be compared before and after the lag. The window length should be big
enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).
:param n_windows: This specifies the amount of consecutive time windows used to extract the "eigensignals" from
:param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
the given time series. It should be big enough to cover different parts of the target behavior. If one does not
specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
window (rule of thumb).
Expand All @@ -37,40 +39,71 @@ def __init__(self, window_length: int, n_windows: int = None, lag: int = None, r
cover the behavior.
:param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
dissimilarity of the signal in the future behavior. As a rule of thumb we take the five most dominant
"eigensignals" if you do no specify otherwise.
dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
"eigensignals" if you do not specify otherwise.
:param scale: Due to numeric stability we REALLY RECOMMEND scaling the signal into a restricted value range. Per
default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers this
:param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per
default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this
could cause problems, as the signal will be squished.
:param scoring_step: the distance between scoring steps in samples (e.g. 2 would half the computation).
:param random_rank: To use the randomized singular value decomposition, one needs to provide a
randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
this value, the faster the computation but the higher the error (as the approximation gets worse).
:param parallel: the execution for the different steps can be parallelized in different processes.
:param scoring_step: The distance between scoring steps in samples (e.g., 2 would half the computation).
:param parallel: The execution for the different steps can be parallelized in different processes.
:param use_fast_hankel: Whether to deploy the fast hankel matrix product.
:param threads: The number of threads the fast hankel matrix product is allowed to use.
"""

# save the specified parameters into instance variables
self.window_length = window_length
self.n_windows = n_windows
self.rank = rank
self.scale = scale
self.random_rank = random_rank
self.lag = lag
self.scoring_step = scoring_step
self.parallel = parallel
self.use_fast_hankel = use_fast_hankel
self.method = method
self.threads = threads

# set some default values when they have not been specified
if self.n_windows is None:
self.n_windows = self.window_length//2
if self.lag is None:
# rule of thumb
self.lag = self.n_windows

# specify the methods and their corresponding functions as lambda functions expecting only the hanke matrix
self.methods = {'rsvd': partial(left_entropy, rank=self.rank)}
if self.random_rank is None:
# compute the rank as specified in [3] and
# https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

# specify the methods and their corresponding functions as lambda functions expecting only the hankel matrix
self.methods = {'rsvd': partial(left_entropy, rank=self.rank, random_rank=self.random_rank,
method='rsvd', threads=self.threads),
'fbrsvd': partial(left_entropy, rank=self.rank, random_rank=self.random_rank,
method='fbrsvd', threads=self.threads)}
if self.method not in self.methods:
raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

# set up the methods we use for the construction of the hankel matrix (either it is the fft representation
# of the other one)
if use_fast_hankel and self.method == 'fbrsvd':
raise ValueError(f'fbrsvd method is not defined with use_fast_hankel=True')
self.hankel_construction = {
False: lg.compile_hankel,
True: lg.HankelFFTRepresentation
}
# TODO: Create tests for the ESST

def transform(self, time_series: np.ndarray) -> np.ndarray:
"""
This function calculate the anomaly score for each sample within the time series.
This function calculates the anomaly score for each sample within the time series.
It also does some assertion regarding time series length.
Expand All @@ -94,24 +127,28 @@ def transform(self, time_series: np.ndarray) -> np.ndarray:
else:
time_series = time_series.copy()

# get the different methods
scoring_function = self.methods[self.method]
hankel_function = self.hankel_construction[self.use_fast_hankel]

if self.parallel:
return _transform_parallel(time_series, starting_point, self.window_length, self.n_windows, self.lag,
self.scoring_step, self.methods['rsvd'])
self.scoring_step, scoring_function, hankel_function)
else:
return _transform(time_series, starting_point, self.window_length, self.n_windows, self.lag,
self.scoring_step, self.methods['rsvd'])
self.scoring_step, scoring_function, hankel_function)


def _transform(time_series: np.ndarray, start_idx: int, window_length: int, n_windows: int, lag: int, scoring_step: int,
scoring_function: Callable) -> np.ndarray:
scoring_function: Callable, hankel_construction_function: Callable) -> np.ndarray:
"""
Compute heavy and hopefully jit compilable score computation for the SST method. It does not do any parameter
checking and can throw cryptic errors. It's only used for internal use as a private function.
:param time_series: 1D array containing the time series to be scored
:param start_idx: integer defining the start sample index for the score computation
:param window_length: the size of the time series window for each column of the hankel matrix
:param n_windows: amount of columns in the hankel matrix
:param n_windows: column number of the hankel matrix
"""

# initialize a scoring array with no values yet
Expand All @@ -124,10 +161,10 @@ def _transform(time_series: np.ndarray, start_idx: int, window_length: int, n_wi
for idx in range(start_idx, time_series.shape[0], scoring_step):

# compile the past hankel matrix (H1)
hankel_past = lg.compile_hankel(time_series, idx - lag, window_length, n_windows)
hankel_past = hankel_construction_function(time_series, idx - lag, window_length, n_windows)

# compile the future hankel matrix (H2)
hankel_future = lg.compile_hankel(time_series, idx, window_length, n_windows)
hankel_future = hankel_construction_function(time_series, idx, window_length, n_windows)

# compute the score and save the returned feedback vector
score[idx-offset-scoring_step//2:idx-offset+(scoring_step+1)//2] = \
Expand All @@ -137,15 +174,16 @@ def _transform(time_series: np.ndarray, start_idx: int, window_length: int, n_wi


def _transform_parallel(time_series: np.ndarray, start_idx: int, window_length: int, n_windows: int, lag: int,
scoring_step: int, scoring_function: Callable) -> np.ndarray:
scoring_step: int, scoring_function: Callable,
hankel_construction_function: Callable) -> np.ndarray:
"""
Compute heavy and hopefully jit compilable score computation for the SST method. It does not do any parameter
checking and can throw cryptic errors. It's only used for internal use as a private function.
:param time_series: 1D array containing the time series to be scored
:param start_idx: integer defining the start sample index for the score computation
:param window_length: the size of the time series window for each column of the hankel matrix
:param n_windows: amount of columns in the hankel matrix
:param n_windows: column number of the hankel matrix
"""

# initialize a scoring array with no values yet
Expand All @@ -156,8 +194,8 @@ def _transform_parallel(time_series: np.ndarray, start_idx: int, window_length:

# make the generator for the hankel matrices
gener = (np.concatenate(
(lg.compile_hankel(time_series, idx-lag, window_length, n_windows),
lg.compile_hankel(time_series, idx, window_length, n_windows)
(hankel_construction_function(time_series, idx-lag, window_length, n_windows),
hankel_construction_function(time_series, idx, window_length, n_windows)
),
axis=1)
for idx in range(start_idx, time_series.shape[0], scoring_step))
Expand Down Expand Up @@ -194,16 +232,28 @@ def left_right_diff(left_eigenvectors: np.ndarray):
return np.mean(left_eigenvectors[:, :n]-left_eigenvectors[:, n:], axis=1)


def left_entropy(hankel: np.ndarray, rank: int) -> float:
def left_entropy(hankel: np.ndarray, rank: int, random_rank: int, method: str, threads: int) -> float:
"""
Entropy Singular Spectrum Transformation.
:param hankel: the hankel matrix of the signal
:param rank: the amount of (approximated) eigenvectors as subspace of H1
:param rank: the number of (approximated) eigenvectors as subspace of H1
:param random_rank: the sampling parameter for the randomized svd
:param method: which rsvd method to use
:param threads: the numba of threads numba is allowed to use
:return: the change point score, the input vector x0
"""
# compute the left and right eigenvectors using the randomized svd for fast computation
right_eigenvectors, eigenvalues, left_eigenvectors = fbpca.pca(hankel, rank, True)
threads_before = nb.get_num_threads()
nb.set_num_threads(threads)
if method == 'fbrsvd':
right_eigenvectors, eigenvalues, left_eigenvectors = fbpca.pca(hankel, rank, True)
elif method == 'rsvd':
right_eigenvectors, eigenvalues, left_eigenvectors = lg.randomized_hankel_svd(hankel, rank,
oversampling_p=random_rank - rank)
else:
raise NotImplementedError(f'Method {method} is not available.')
nb.set_num_threads(threads_before)

# shift the left eigenvectors up
left_eigenvectors = left_eigenvectors - np.min(left_eigenvectors, axis=1)[:, None] + 1
Expand Down Expand Up @@ -235,15 +285,15 @@ def _main():
# make synthetic step function
np.random.seed(123)
# synthetic (frequency change)
x0 = np.sin(2 * np.pi * 1 * np.linspace(0, 10, 10000))
x1 = np.sin(2 * np.pi * 2 * np.linspace(0, 10, 10000))
x2 = np.sin(2 * np.pi * 8 * np.linspace(0, 10, 10000))
x3 = np.sin(2 * np.pi * 4 * np.linspace(0, 10, 10000))
x0 = np.sin(2 * np.pi * 1 * np.linspace(0, 10, 3000))
x1 = np.sin(2 * np.pi * 2 * np.linspace(0, 10, 3000))
x2 = np.sin(2 * np.pi * 8 * np.linspace(0, 10, 3000))
x3 = np.sin(2 * np.pi * 4 * np.linspace(0, 10, 3000))
x = np.hstack([x0, x1, x2, x3])
x += np.random.rand(x.size)

# create the method
esst_recognizer = ESST(50, parallel=True)
esst_recognizer = ESST(960, method='rsvd', parallel=False, use_fast_hankel=False)

# compute the score
start = time()
Expand Down

0 comments on commit aea7925

Please sign in to comment.