Skip to content

Commit

Permalink
cleaned
Browse files Browse the repository at this point in the history
  • Loading branch information
sronilsson committed Aug 5, 2023
1 parent 1b2be88 commit 8edd278
Showing 1 changed file with 96 additions and 72 deletions.
168 changes: 96 additions & 72 deletions simba/mixins/feature_extraction_circular_mixin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import List

import numpy as np
from scipy import stats
from numba import jit, prange, typed
from typing import List
from scipy import stats


class FeatureExtractionCircularMixin(object):

Expand All @@ -24,10 +26,9 @@ def __init__(self):
pass

@staticmethod
def rolling_mean_dispersion(data: np.ndarray,
time_windows: np.ndarray,
fps: int) -> np.ndarray:

def rolling_mean_dispersion(
data: np.ndarray, time_windows: np.ndarray, fps: int
) -> np.ndarray:
"""
Compute the angular mean dispersion (circular mean) in degrees within rolling temporal windows.
Expand All @@ -46,12 +47,14 @@ def rolling_mean_dispersion(data: np.ndarray,
>>> [ [-1],[-1],[-1],[-1], [-1],[44],[44],[43],[44],[44],[44],[44],[44],[44],[44],[45],[45],[45],[45],[45]])
"""

results = np.full((data.shape[0], time_windows.shape[0]),-1)
results = np.full((data.shape[0], time_windows.shape[0]), -1)
for time_window in prange(time_windows.shape[0]):
jump_frms = int(time_windows[time_window] * fps)
for current_frm in prange(jump_frms, results.shape[0]+1):
data_window = np.deg2rad(data[current_frm-jump_frms:current_frm])
results[current_frm-1][time_window] = np.rad2deg(stats.circmean(data_window)).astype(int)
for current_frm in prange(jump_frms, results.shape[0] + 1):
data_window = np.deg2rad(data[current_frm - jump_frms : current_frm])
results[current_frm - 1][time_window] = np.rad2deg(
stats.circmean(data_window)
).astype(int)
return results

@staticmethod
Expand All @@ -72,20 +75,19 @@ def degrees_to_compass_cardinal(degree_angles: np.ndarray) -> List[str]:
>>> FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data)
>>> ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'N']
"""
results = typed.List(['str'])
DIRECTIONS = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
results = typed.List(["str"])
DIRECTIONS = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
for i in prange(degree_angles.shape[0]):
ix = round(degree_angles[i] / (360. / len(DIRECTIONS)))
ix = round(degree_angles[i] / (360.0 / len(DIRECTIONS)))
direction = DIRECTIONS[ix % len(DIRECTIONS)]
results.append(direction)
return results[1:]

@staticmethod
@jit(nopython=True)
def direction_three_bps(nose_loc: np.ndarray,
left_ear_loc: np.ndarray,
right_ear_loc: np.ndarray) -> np.ndarray:

def direction_three_bps(
nose_loc: np.ndarray, left_ear_loc: np.ndarray, right_ear_loc: np.ndarray
) -> np.ndarray:
"""
Jitted helper to compute the degree angle from three body-parts. Computes the angle in degrees left_ear <-> nose
and right_ear_nose and returns the midpoint.
Expand All @@ -104,15 +106,24 @@ def direction_three_bps(nose_loc: np.ndarray,

results = np.full((nose_loc.shape[0]), np.nan)
for i in prange(nose_loc.shape[0]):
left_ear_to_nose = np.degrees(np.arctan2(left_ear_loc[i][0] - nose_loc[i][1], left_ear_loc[i][1] - nose_loc[i][0]))
right_ear_nose = np.degrees(np.arctan2(right_ear_loc[i][0] - nose_loc[i][1], right_ear_loc[i][1] - nose_loc[i][0]))
left_ear_to_nose = np.degrees(
np.arctan2(
left_ear_loc[i][0] - nose_loc[i][1],
left_ear_loc[i][1] - nose_loc[i][0],
)
)
right_ear_nose = np.degrees(
np.arctan2(
right_ear_loc[i][0] - nose_loc[i][1],
right_ear_loc[i][1] - nose_loc[i][0],
)
)
results[i] = ((left_ear_to_nose + right_ear_nose) % 360) / 2
return results

@staticmethod
@jit(nopython=True)
def direction_two_bps(bp_x: np.ndarray,
bp_y: np.ndarray) -> np.ndarray:
def direction_two_bps(bp_x: np.ndarray, bp_y: np.ndarray) -> np.ndarray:
"""
Jitted method computing degree directionality from two body-parts. E.g., ``nape`` and ``nose``,
or ``swim_bladder`` and ``tail``.
Expand All @@ -129,19 +140,18 @@ def direction_two_bps(bp_x: np.ndarray,

results = np.full((bp_x.shape[0]), np.nan)
for i in prange(bp_x.shape[0]):
angle_degrees = np.degrees(np.arctan2(bp_x[i][0] - bp_y[i][0], bp_y[i][1] - bp_x[i][1]))
angle_degrees = np.degrees(
np.arctan2(bp_x[i][0] - bp_y[i][0], bp_y[i][1] - bp_x[i][1])
)
angle_degrees = angle_degrees + 360 if angle_degrees < 0 else angle_degrees
results[i] = angle_degrees
return results



@staticmethod
@jit(nopython=True)
def rolling_resultant_vector_length(data: np.ndarray,
fps: int,
time_windows: np.ndarray = np.array([1.0])) -> np.ndarray:

def rolling_resultant_vector_length(
data: np.ndarray, fps: int, time_windows: np.ndarray = np.array([1.0])
) -> np.ndarray:
"""
Jitted helper computing the mean resultant vector within rolling time window.
Expand All @@ -168,28 +178,29 @@ def rolling_resultant_vector_length(data: np.ndarray,
results = np.full((data.shape[0], time_windows.shape[0]), -1.0)
for time_window_cnt in prange(time_windows.shape[0]):
window_size = int(time_windows[time_window_cnt] * fps)
for window_end in prange(window_size, data.shape[0]+1, 1):
window_data = data[window_end - window_size:window_end]
for window_end in prange(window_size, data.shape[0] + 1, 1):
window_data = data[window_end - window_size : window_end]
w = np.ones(window_data.shape[0])
r = np.nansum(np.multiply(w, np.exp(1j * window_data)))
results[window_end-1][time_window_cnt] = np.abs(r) / np.nansum(w)
results[window_end - 1][time_window_cnt] = np.abs(r) / np.nansum(w)
return results


@staticmethod
@jit(nopython=True)
def _helper_rayleigh_z(data: np.ndarray, window_size: int):
results = np.full((data.shape[0], 2), np.nan)
for i in range(data.shape[0]):
r = window_size * data[i]
results[i][0] = (r ** 2) / window_size
results[i][1] = np.exp(np.sqrt(1 + 4 * window_size + 4 * (window_size**2 - r**2)) - (1 + 2 * window_size))
results[i][0] = (r**2) / window_size
results[i][1] = np.exp(
np.sqrt(1 + 4 * window_size + 4 * (window_size**2 - r**2))
- (1 + 2 * window_size)
)
return results

def rolling_rayleigh_z(self,
data: np.ndarray,
fps: int,
time_window: float = 1.0) -> np.array:
def rolling_rayleigh_z(
self, data: np.ndarray, fps: int, time_window: float = 1.0
) -> np.array:
"""
Compute Rayleigh Z (test of non-uniformity) of circular data within rolling time-window.
Expand All @@ -202,17 +213,26 @@ def rolling_rayleigh_z(self,
:returns np.ndarray: Size data.shape[0] x 2 with Rayleigh Z statistics in first column and associated p_values in second column
"""

results, window_size = np.full((data.shape[0], 2), np.nan), int(time_window * fps)
resultant_vector_lengths = FeatureExtractionCircularMixin().rolling_resultant_vector_length(data=data, fps=fps, time_window=time_window)
return np.nan_to_num(self._helper_rayleigh_z(data=resultant_vector_lengths, window_size=window_size), nan=-1.0)
results, window_size = np.full((data.shape[0], 2), np.nan), int(
time_window * fps
)
resultant_vector_lengths = (
FeatureExtractionCircularMixin().rolling_resultant_vector_length(
data=data, fps=fps, time_window=time_window
)
)
return np.nan_to_num(
self._helper_rayleigh_z(
data=resultant_vector_lengths, window_size=window_size
),
nan=-1.0,
)

@staticmethod
@jit(nopython=True)
def rolling_circular_correlation(data_x: np.ndarray,
data_y: np.ndarray,
fps: int,
time_window: float = 1.0) -> np.ndarray:

def rolling_circular_correlation(
data_x: np.ndarray, data_y: np.ndarray, fps: int, time_window: float = 1.0
) -> np.ndarray:
"""
Compute correlations between two angular distributions in rolling time-windows.
Expand All @@ -231,22 +251,32 @@ def rolling_circular_correlation(data_x: np.ndarray,
data_x, data_y = np.deg2rad(data_x), np.deg2rad(data_y)
results = np.full((data_x.shape[0]), np.nan)
window_size = int(time_window * fps)
for window_start in prange(0, data_x.shape[0]-window_size+1):
data_x_window = data_x[window_start:window_start+window_size]
data_y_window = data_y[window_start:window_start+window_size]
x_sin = np.sin(data_x_window - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_x_window)))))
y_sin = np.sin(data_y_window - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_y_window)))))
r = np.sum(x_sin * y_sin) / np.sqrt(np.sum(x_sin ** 2) * np.sum(y_sin ** 2))
results[window_start+window_size] = (np.sqrt((data_x_window.shape[0] * (x_sin ** 2).mean() * (y_sin ** 2).mean()) / np.mean(x_sin ** 2 * y_sin ** 2)) * r)
for window_start in prange(0, data_x.shape[0] - window_size + 1):
data_x_window = data_x[window_start : window_start + window_size]
data_y_window = data_y[window_start : window_start + window_size]
x_sin = np.sin(
data_x_window
- np.angle(np.nansum(np.multiply(1, np.exp(1j * data_x_window))))
)
y_sin = np.sin(
data_y_window
- np.angle(np.nansum(np.multiply(1, np.exp(1j * data_y_window))))
)
r = np.sum(x_sin * y_sin) / np.sqrt(np.sum(x_sin**2) * np.sum(y_sin**2))
results[window_start + window_size] = (
np.sqrt(
(data_x_window.shape[0] * (x_sin**2).mean() * (y_sin**2).mean())
/ np.mean(x_sin**2 * y_sin**2)
)
* r
)

return results


@staticmethod
def rolling_circular_stdev(data: np.ndarray,
fps: int,
time_windows: np.ndarray) -> np.ndarray:

def rolling_circular_stdev(
data: np.ndarray, fps: int, time_windows: np.ndarray
) -> np.ndarray:
"""
Compute standard deviation of angular data in rolling time windows.
Expand All @@ -265,17 +295,13 @@ def rolling_circular_stdev(data: np.ndarray,
results = np.full((data.shape[0], time_windows.shape[0]), 0.0)
for time_window_cnt in prange(time_windows.shape[0]):
window_size = int(time_windows[time_window_cnt] * fps)
for window_end in prange(window_size, data.shape[0]+1, 1):
window_data = data[window_end - window_size:window_end]
results[window_end-1][time_window_cnt] = stats.circvar(window_data)
print(results[window_end-1][time_window_cnt])
for window_end in prange(window_size, data.shape[0] + 1, 1):
window_data = data[window_end - window_size : window_end]
results[window_end - 1][time_window_cnt] = stats.circvar(window_data)
print(results[window_end - 1][time_window_cnt])
return np.round(results, 4)






# nose_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32')
# left_ear_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32')
#
Expand All @@ -295,22 +321,20 @@ def rolling_circular_stdev(data: np.ndarray,
# FeatureExtractionCircularMixin().rolling_mean_dispersion(data=data,time_windows=np.array([0.5]), fps=10)



# data = np.array(list(range(0, 405, 45)))
# results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data)


data = np.array(list(range(0, 405, 45)))
results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data)

results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal(
degree_angles=data
)


# def direction_two_bps(bp_x: np.ndarray,
# bp_y: np.ndarray) -> np.ndarray:




# right_ear_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32')
# angle_data = FeatureExtractionCircularMixin().head_direction(nose_loc=nose_loc, left_ear_loc=left_ear_loc, right_ear_loc=right_ear_loc)
#
Expand All @@ -320,4 +344,4 @@ def rolling_circular_stdev(data: np.ndarray,

# start = time.time()
# correlation = FeatureExtractionCircularMixin().rolling_circular_correlation(data_x=angle_data.astype(np.int8), data_y=angle_data.astype(np.int8), time_window=2.0, fps=5)
# print(time.time() - start)
# print(time.time() - start)

0 comments on commit 8edd278

Please sign in to comment.