diff --git a/simba/mixins/feature_extraction_circular_mixin.py b/simba/mixins/feature_extraction_circular_mixin.py index db1ade7f9..13224bee6 100644 --- a/simba/mixins/feature_extraction_circular_mixin.py +++ b/simba/mixins/feature_extraction_circular_mixin.py @@ -1,7 +1,9 @@ +from typing import List + import numpy as np -from scipy import stats from numba import jit, prange, typed -from typing import List +from scipy import stats + class FeatureExtractionCircularMixin(object): @@ -24,10 +26,9 @@ def __init__(self): pass @staticmethod - def rolling_mean_dispersion(data: np.ndarray, - time_windows: np.ndarray, - fps: int) -> np.ndarray: - + def rolling_mean_dispersion( + data: np.ndarray, time_windows: np.ndarray, fps: int + ) -> np.ndarray: """ Compute the angular mean dispersion (circular mean) in degrees within rolling temporal windows. @@ -46,12 +47,14 @@ def rolling_mean_dispersion(data: np.ndarray, >>> [ [-1],[-1],[-1],[-1], [-1],[44],[44],[43],[44],[44],[44],[44],[44],[44],[44],[45],[45],[45],[45],[45]]) """ - results = np.full((data.shape[0], time_windows.shape[0]),-1) + results = np.full((data.shape[0], time_windows.shape[0]), -1) for time_window in prange(time_windows.shape[0]): jump_frms = int(time_windows[time_window] * fps) - for current_frm in prange(jump_frms, results.shape[0]+1): - data_window = np.deg2rad(data[current_frm-jump_frms:current_frm]) - results[current_frm-1][time_window] = np.rad2deg(stats.circmean(data_window)).astype(int) + for current_frm in prange(jump_frms, results.shape[0] + 1): + data_window = np.deg2rad(data[current_frm - jump_frms : current_frm]) + results[current_frm - 1][time_window] = np.rad2deg( + stats.circmean(data_window) + ).astype(int) return results @staticmethod @@ -72,20 +75,19 @@ def degrees_to_compass_cardinal(degree_angles: np.ndarray) -> List[str]: >>> FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data) >>> ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'N'] """ - results = typed.List(['str']) - DIRECTIONS = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] + results = typed.List(["str"]) + DIRECTIONS = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] for i in prange(degree_angles.shape[0]): - ix = round(degree_angles[i] / (360. / len(DIRECTIONS))) + ix = round(degree_angles[i] / (360.0 / len(DIRECTIONS))) direction = DIRECTIONS[ix % len(DIRECTIONS)] results.append(direction) return results[1:] @staticmethod @jit(nopython=True) - def direction_three_bps(nose_loc: np.ndarray, - left_ear_loc: np.ndarray, - right_ear_loc: np.ndarray) -> np.ndarray: - + def direction_three_bps( + nose_loc: np.ndarray, left_ear_loc: np.ndarray, right_ear_loc: np.ndarray + ) -> np.ndarray: """ Jitted helper to compute the degree angle from three body-parts. Computes the angle in degrees left_ear <-> nose and right_ear_nose and returns the midpoint. @@ -104,15 +106,24 @@ def direction_three_bps(nose_loc: np.ndarray, results = np.full((nose_loc.shape[0]), np.nan) for i in prange(nose_loc.shape[0]): - left_ear_to_nose = np.degrees(np.arctan2(left_ear_loc[i][0] - nose_loc[i][1], left_ear_loc[i][1] - nose_loc[i][0])) - right_ear_nose = np.degrees(np.arctan2(right_ear_loc[i][0] - nose_loc[i][1], right_ear_loc[i][1] - nose_loc[i][0])) + left_ear_to_nose = np.degrees( + np.arctan2( + left_ear_loc[i][0] - nose_loc[i][1], + left_ear_loc[i][1] - nose_loc[i][0], + ) + ) + right_ear_nose = np.degrees( + np.arctan2( + right_ear_loc[i][0] - nose_loc[i][1], + right_ear_loc[i][1] - nose_loc[i][0], + ) + ) results[i] = ((left_ear_to_nose + right_ear_nose) % 360) / 2 return results @staticmethod @jit(nopython=True) - def direction_two_bps(bp_x: np.ndarray, - bp_y: np.ndarray) -> np.ndarray: + def direction_two_bps(bp_x: np.ndarray, bp_y: np.ndarray) -> np.ndarray: """ Jitted method computing degree directionality from two body-parts. E.g., ``nape`` and ``nose``, or ``swim_bladder`` and ``tail``. @@ -129,19 +140,18 @@ def direction_two_bps(bp_x: np.ndarray, results = np.full((bp_x.shape[0]), np.nan) for i in prange(bp_x.shape[0]): - angle_degrees = np.degrees(np.arctan2(bp_x[i][0] - bp_y[i][0], bp_y[i][1] - bp_x[i][1])) + angle_degrees = np.degrees( + np.arctan2(bp_x[i][0] - bp_y[i][0], bp_y[i][1] - bp_x[i][1]) + ) angle_degrees = angle_degrees + 360 if angle_degrees < 0 else angle_degrees results[i] = angle_degrees return results - - @staticmethod @jit(nopython=True) - def rolling_resultant_vector_length(data: np.ndarray, - fps: int, - time_windows: np.ndarray = np.array([1.0])) -> np.ndarray: - + def rolling_resultant_vector_length( + data: np.ndarray, fps: int, time_windows: np.ndarray = np.array([1.0]) + ) -> np.ndarray: """ Jitted helper computing the mean resultant vector within rolling time window. @@ -168,28 +178,29 @@ def rolling_resultant_vector_length(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), -1.0) for time_window_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0]+1, 1): - window_data = data[window_end - window_size:window_end] + for window_end in prange(window_size, data.shape[0] + 1, 1): + window_data = data[window_end - window_size : window_end] w = np.ones(window_data.shape[0]) r = np.nansum(np.multiply(w, np.exp(1j * window_data))) - results[window_end-1][time_window_cnt] = np.abs(r) / np.nansum(w) + results[window_end - 1][time_window_cnt] = np.abs(r) / np.nansum(w) return results - @staticmethod @jit(nopython=True) def _helper_rayleigh_z(data: np.ndarray, window_size: int): results = np.full((data.shape[0], 2), np.nan) for i in range(data.shape[0]): r = window_size * data[i] - results[i][0] = (r ** 2) / window_size - results[i][1] = np.exp(np.sqrt(1 + 4 * window_size + 4 * (window_size**2 - r**2)) - (1 + 2 * window_size)) + results[i][0] = (r**2) / window_size + results[i][1] = np.exp( + np.sqrt(1 + 4 * window_size + 4 * (window_size**2 - r**2)) + - (1 + 2 * window_size) + ) return results - def rolling_rayleigh_z(self, - data: np.ndarray, - fps: int, - time_window: float = 1.0) -> np.array: + def rolling_rayleigh_z( + self, data: np.ndarray, fps: int, time_window: float = 1.0 + ) -> np.array: """ Compute Rayleigh Z (test of non-uniformity) of circular data within rolling time-window. @@ -202,17 +213,26 @@ def rolling_rayleigh_z(self, :returns np.ndarray: Size data.shape[0] x 2 with Rayleigh Z statistics in first column and associated p_values in second column """ - results, window_size = np.full((data.shape[0], 2), np.nan), int(time_window * fps) - resultant_vector_lengths = FeatureExtractionCircularMixin().rolling_resultant_vector_length(data=data, fps=fps, time_window=time_window) - return np.nan_to_num(self._helper_rayleigh_z(data=resultant_vector_lengths, window_size=window_size), nan=-1.0) + results, window_size = np.full((data.shape[0], 2), np.nan), int( + time_window * fps + ) + resultant_vector_lengths = ( + FeatureExtractionCircularMixin().rolling_resultant_vector_length( + data=data, fps=fps, time_window=time_window + ) + ) + return np.nan_to_num( + self._helper_rayleigh_z( + data=resultant_vector_lengths, window_size=window_size + ), + nan=-1.0, + ) @staticmethod @jit(nopython=True) - def rolling_circular_correlation(data_x: np.ndarray, - data_y: np.ndarray, - fps: int, - time_window: float = 1.0) -> np.ndarray: - + def rolling_circular_correlation( + data_x: np.ndarray, data_y: np.ndarray, fps: int, time_window: float = 1.0 + ) -> np.ndarray: """ Compute correlations between two angular distributions in rolling time-windows. @@ -231,22 +251,32 @@ def rolling_circular_correlation(data_x: np.ndarray, data_x, data_y = np.deg2rad(data_x), np.deg2rad(data_y) results = np.full((data_x.shape[0]), np.nan) window_size = int(time_window * fps) - for window_start in prange(0, data_x.shape[0]-window_size+1): - data_x_window = data_x[window_start:window_start+window_size] - data_y_window = data_y[window_start:window_start+window_size] - x_sin = np.sin(data_x_window - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_x_window))))) - y_sin = np.sin(data_y_window - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_y_window))))) - r = np.sum(x_sin * y_sin) / np.sqrt(np.sum(x_sin ** 2) * np.sum(y_sin ** 2)) - results[window_start+window_size] = (np.sqrt((data_x_window.shape[0] * (x_sin ** 2).mean() * (y_sin ** 2).mean()) / np.mean(x_sin ** 2 * y_sin ** 2)) * r) + for window_start in prange(0, data_x.shape[0] - window_size + 1): + data_x_window = data_x[window_start : window_start + window_size] + data_y_window = data_y[window_start : window_start + window_size] + x_sin = np.sin( + data_x_window + - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_x_window)))) + ) + y_sin = np.sin( + data_y_window + - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_y_window)))) + ) + r = np.sum(x_sin * y_sin) / np.sqrt(np.sum(x_sin**2) * np.sum(y_sin**2)) + results[window_start + window_size] = ( + np.sqrt( + (data_x_window.shape[0] * (x_sin**2).mean() * (y_sin**2).mean()) + / np.mean(x_sin**2 * y_sin**2) + ) + * r + ) return results - @staticmethod - def rolling_circular_stdev(data: np.ndarray, - fps: int, - time_windows: np.ndarray) -> np.ndarray: - + def rolling_circular_stdev( + data: np.ndarray, fps: int, time_windows: np.ndarray + ) -> np.ndarray: """ Compute standard deviation of angular data in rolling time windows. @@ -265,17 +295,13 @@ def rolling_circular_stdev(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0]+1, 1): - window_data = data[window_end - window_size:window_end] - results[window_end-1][time_window_cnt] = stats.circvar(window_data) - print(results[window_end-1][time_window_cnt]) + for window_end in prange(window_size, data.shape[0] + 1, 1): + window_data = data[window_end - window_size : window_end] + results[window_end - 1][time_window_cnt] = stats.circvar(window_data) + print(results[window_end - 1][time_window_cnt]) return np.round(results, 4) - - - - # nose_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32') # left_ear_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32') # @@ -295,22 +321,20 @@ def rolling_circular_stdev(data: np.ndarray, # FeatureExtractionCircularMixin().rolling_mean_dispersion(data=data,time_windows=np.array([0.5]), fps=10) - # data = np.array(list(range(0, 405, 45))) # results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data) data = np.array(list(range(0, 405, 45))) -results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data) - +results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal( + degree_angles=data +) # def direction_two_bps(bp_x: np.ndarray, # bp_y: np.ndarray) -> np.ndarray: - - # right_ear_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32') # angle_data = FeatureExtractionCircularMixin().head_direction(nose_loc=nose_loc, left_ear_loc=left_ear_loc, right_ear_loc=right_ear_loc) # @@ -320,4 +344,4 @@ def rolling_circular_stdev(data: np.ndarray, # start = time.time() # correlation = FeatureExtractionCircularMixin().rolling_circular_correlation(data_x=angle_data.astype(np.int8), data_y=angle_data.astype(np.int8), time_window=2.0, fps=5) -# print(time.time() - start) \ No newline at end of file +# print(time.time() - start)