Skip to content

Commit

Permalink
cleaned
Browse files Browse the repository at this point in the history
  • Loading branch information
sronilsson committed Aug 2, 2023
1 parent 8939646 commit 4e09ee3
Showing 1 changed file with 106 additions and 86 deletions.
192 changes: 106 additions & 86 deletions simba/mixins/feature_extraction_supplement_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@


class FeatureExtractionSupplemental(FeatureExtractionMixin):

def __init__(self):
FeatureExtractionMixin.__init__(self)

@staticmethod
@jit(nopython=True)
def _helper_euclidean_distance_timeseries_change(distances: np.ndarray,
time_windows: np.ndarray,
fps: int):
def _helper_euclidean_distance_timeseries_change(
distances: np.ndarray, time_windows: np.ndarray, fps: int
):
"""
Private jitted helper called by ``simba.mixins.feature_extraction_supplemental_mixin.FeatureExtractionSupplemental.euclidean_distance_timeseries_change``
"""
Expand All @@ -24,19 +23,21 @@ def _helper_euclidean_distance_timeseries_change(distances: np.ndarray,
shifted_distances = np.copy(distances)
shifted_distances[0:frms] = np.nan
shifted_distances[frms:] = distances[:-frms]
shifted_distances[np.isnan(shifted_distances)] = distances[np.isnan(shifted_distances)]
shifted_distances[np.isnan(shifted_distances)] = distances[
np.isnan(shifted_distances)
]
results[:, window_cnt] = distances - shifted_distances

return results



def euclidean_distance_timeseries_change(self,
location_1: np.ndarray,
location_2: np.ndarray,
fps: int,
px_per_mm: float,
time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6])) -> np.ndarray:
def euclidean_distance_timeseries_change(
self,
location_1: np.ndarray,
location_2: np.ndarray,
fps: int,
px_per_mm: float,
time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6]),
) -> np.ndarray:
"""
Compute the difference in distance between two points in the current frame versus N.N seconds ago. E.g.,
computes if two points are traveling away from each other (positive output values) or towards each other
Expand All @@ -58,14 +59,18 @@ def euclidean_distance_timeseries_change(self,
>>> location_2 = np.random.randint(low=0, high=100, size=(2000, 2)).astype('float32')
>>> distances = self.euclidean_distance_timeseries_change(location_1=location_1, location_2=location_2, fps=10, px_per_mm=4.33, time_windows=np.array([0.2, 0.4, 0.8, 1.6]))
"""
distances = self.framewise_euclidean_distance(location_1=location_1, location_2=location_2, px_per_mm=px_per_mm)
return self._helper_euclidean_distance_timeseries_change(distances=distances, fps=fps, time_windows=time_windows).astype(int)
distances = self.framewise_euclidean_distance(
location_1=location_1, location_2=location_2, px_per_mm=px_per_mm
)
return self._helper_euclidean_distance_timeseries_change(
distances=distances, fps=fps, time_windows=time_windows
).astype(int)

@staticmethod
@jit(nopython=True)
def timeseries_independent_sample_t(data: np.ndarray,
group_size_s: int,
fps: int) -> np.ndarray:
def timeseries_independent_sample_t(
data: np.ndarray, group_size_s: int, fps: int
) -> np.ndarray:
"""
Compute independent-sample t-statistics for sequentially binned values in a time-series.
E.g., compute t-test statistics when comparing ``Feature N`` in the current 1s
Expand All @@ -88,17 +93,19 @@ def timeseries_independent_sample_t(data: np.ndarray,
window_size = int(group_size_s * fps)
data = np.split(data, list(range(window_size, data.shape[0], window_size)))
for cnt, i in enumerate(prange(1, len(data))):
start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size)
mean_1, mean_2 = np.mean(data[i-1]), np.mean(data[i])
stdev_1, stdev_2 = np.std(data[i-1]), np.std(data[i])
results[start:end] = (mean_1 - mean_2) / np.sqrt((stdev_1 / data[i-1].shape[0]) + (stdev_2 / data[i].shape[0]))
start, end = int((cnt + 1) * window_size), int(
((cnt + 1) * window_size) + window_size
)
mean_1, mean_2 = np.mean(data[i - 1]), np.mean(data[i])
stdev_1, stdev_2 = np.std(data[i - 1]), np.std(data[i])
results[start:end] = (mean_1 - mean_2) / np.sqrt(
(stdev_1 / data[i - 1].shape[0]) + (stdev_2 / data[i].shape[0])
)
return results


def two_sample_ks(self,
data: np.ndarray,
group_size_s: int,
fps: int) -> np.ndarray:
def two_sample_ks(
self, data: np.ndarray, group_size_s: int, fps: int
) -> np.ndarray:
"""
Compute Kolmogorov two-sample statistics for sequentially binned values in a time-series.
E.g., compute KS statistics when comparing ``Feature N`` in the current 1s time-window, versus ``Feature N`` in the previous 1s time-window.
Expand All @@ -116,14 +123,15 @@ def two_sample_ks(self,
window_size, results = int(group_size_s * fps), np.full((data.shape[0]), -1.0)
data = np.split(data, list(range(window_size, data.shape[0], window_size)))
for cnt, i in enumerate(prange(1, len(data))):
start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size)
results[start:end] = stats.ks_2samp(data1=data[i-1], data2=data[i]).statistic
start, end = int((cnt + 1) * window_size), int(
((cnt + 1) * window_size) + window_size
)
results[start:end] = stats.ks_2samp(
data1=data[i - 1], data2=data[i]
).statistic
return results

def shapiro_wilks(self,
data: np.ndarray,
bin_size_s: int,
fps: int) -> np.ndarray:
def shapiro_wilks(self, data: np.ndarray, bin_size_s: int, fps: int) -> np.ndarray:
"""
Compute Shapiro-Wilks normality statistics for sequentially binned values in a time-series. E.g., compute
the normality statistics of ``Feature N`` in each window of ``group_size_s`` seconds.
Expand All @@ -141,15 +149,15 @@ def shapiro_wilks(self,
window_size, results = int(bin_size_s * fps), np.full((data.shape[0]), -1.0)
data = np.split(data, list(range(window_size, data.shape[0], window_size)))
for cnt, i in enumerate(prange(1, len(data))):
start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size)
start, end = int((cnt + 1) * window_size), int(
((cnt + 1) * window_size) + window_size
)
results[start:end] = stats.shapiro(data[i])[0]
return results

@staticmethod
@jit(nopython=True)
def peak_ratio(data: np.ndarray,
bin_size_s: int,
fps: int):
def peak_ratio(data: np.ndarray, bin_size_s: int, fps: int):
"""
Compute the ratio of peak values relative to number of values within each seqential
time-period represented of ``bin_size_s`` seconds.
Expand All @@ -172,17 +180,22 @@ def peak_ratio(data: np.ndarray,
window_size, results = int(bin_size_s * fps), np.full((data.shape[0]), -1.0)
data = np.split(data, list(range(window_size, data.shape[0], window_size)))
for cnt, i in enumerate(prange(len(data))):
start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size)
results[start:end] = np.sum((data[i] > np.roll(data[i],1)) & (data[i] > np.roll(data[i],-1))) / data[i].shape[0]
start, end = int((cnt + 1) * window_size), int(
((cnt + 1) * window_size) + window_size
)
results[start:end] = (
np.sum(
(data[i] > np.roll(data[i], 1)) & (data[i] > np.roll(data[i], -1))
)
/ data[i].shape[0]
)
return results


@staticmethod
@jit(nopython=True)
def rolling_categorical_switches(data: np.ndarray,
time_windows: np.ndarray,
fps: int) -> np.ndarray:

def rolling_categorical_switches(
data: np.ndarray, time_windows: np.ndarray, fps: int
) -> np.ndarray:
"""
Compute the count of in categorical feature switches within rolling windows.
Expand All @@ -205,7 +218,7 @@ def rolling_categorical_switches(data: np.ndarray,
for time_window in prange(time_windows.shape[0]):
jump_frms = int(time_windows[time_window] * fps)
for current_frm in prange(jump_frms, data.shape[0]):
time_slice = data[current_frm-jump_frms: current_frm]
time_slice = data[current_frm - jump_frms : current_frm]
current_value, unique_cnt = time_slice[0], 0
for i in prange(1, time_slice.shape[0]):
if time_slice[i] != current_value:
Expand All @@ -216,8 +229,7 @@ def rolling_categorical_switches(data: np.ndarray,

@staticmethod
@jit(nopython=True)
def consecutive_time_series_categories_count(data: np.ndarray,
fps: int):
def consecutive_time_series_categories_count(data: np.ndarray, fps: int):
"""
Compute the count of consecutive milliseconds the feature value has remained static. For example,
compute for how long in milleseconds the animal has remained in the current cardinal direction or the
Expand All @@ -238,20 +250,18 @@ def consecutive_time_series_categories_count(data: np.ndarray,

results = np.full((data.shape[0]), 0.0)
for i in prange(1, data.shape[0]):
if data[i] == data[i-1]:
results[i] = results[i-1]+1
if data[i] == data[i - 1]:
results[i] = results[i - 1] + 1
else:
results[i] = 0

return results / fps


@staticmethod
@jit(nopython=True)
def horizontal_vs_vertical_movement(data: np.ndarray,
pixels_per_mm: float,
time_windows: np.ndarray,
fps: int) -> np.ndarray:
def horizontal_vs_vertical_movement(
data: np.ndarray, pixels_per_mm: float, time_windows: np.ndarray, fps: int
) -> np.ndarray:
"""
Compute the movement along the x-axis relative to the y-axis in rolling time bins.
Expand All @@ -267,24 +277,39 @@ def horizontal_vs_vertical_movement(data: np.ndarray,
for time_window in prange(time_windows.shape[0]):
jump_frms = int(time_windows[time_window] * fps)
for current_frm in prange(jump_frms, results.shape[0]):
x_movement = np.sum(np.abs(np.ediff1d(data[current_frm-jump_frms: current_frm, 0]))) / pixels_per_mm
y_movement = np.sum(np.abs(np.ediff1d(data[current_frm-jump_frms: current_frm, 1]))) / pixels_per_mm
x_movement = (
np.sum(
np.abs(
np.ediff1d(data[current_frm - jump_frms : current_frm, 0])
)
)
/ pixels_per_mm
)
y_movement = (
np.sum(
np.abs(
np.ediff1d(data[current_frm - jump_frms : current_frm, 1])
)
)
/ pixels_per_mm
)
results[current_frm][time_window] = x_movement - y_movement

return results

@staticmethod
@jit(nopython=True)
def border_distances(data: np.ndarray,
pixels_per_mm: float,
img_resolution: np.ndarray,
time_window: float,
fps: int):

def border_distances(
data: np.ndarray,
pixels_per_mm: float,
img_resolution: np.ndarray,
time_window: float,
fps: int,
):
"""
Compute the mean distance of key-point to the top, bottom, left, and right sides of the image in
rolling time-windows. Uses a straight line.
.. image:: _static/img/border_distance.png
:width: 700
:align: center
Expand All @@ -301,22 +326,23 @@ def border_distances(data: np.ndarray,
window_size = int(time_window * fps)
for current_frm in prange(window_size, results.shape[0]):
distances = np.full((4, window_size, 1), np.nan)
windowed_locs = data[current_frm - window_size: current_frm, :]
windowed_locs = data[current_frm - window_size : current_frm, :]
for bp_cnt, bp_loc in enumerate(windowed_locs):
distances[0, bp_cnt] = np.linalg.norm(np.array([bp_loc[0], 0]) - bp_loc)
distances[1, bp_cnt] = np.linalg.norm(np.array([bp_loc[0], img_resolution[0]]) - bp_loc)
distances[1, bp_cnt] = np.linalg.norm(
np.array([bp_loc[0], img_resolution[0]]) - bp_loc
)
distances[2, bp_cnt] = np.linalg.norm(np.array([0, bp_loc[1]]) - bp_loc)
distances[3, bp_cnt] = np.linalg.norm(np.array([0, img_resolution[1]]) - bp_loc)
distances[3, bp_cnt] = np.linalg.norm(
np.array([0, img_resolution[1]]) - bp_loc
)
for i in prange(4):
results[current_frm][i] = np.mean(distances[i]) / pixels_per_mm
return results.astype(np.int32)

@staticmethod
@jit(nopython=True)
def acceleration(data: np.ndarray,
pixels_per_mm: float,
fps: int):

def acceleration(data: np.ndarray, pixels_per_mm: float, fps: int):
"""
Compute acceleration.
Expand All @@ -339,29 +365,26 @@ def acceleration(data: np.ndarray,
for i in prange(fps, shifted_loc.shape[0]):
velocity[i] = np.linalg.norm(shifted_loc[i] - data[i]) / pixels_per_mm
for current_frm in prange(fps, velocity.shape[0], fps):
print(current_frm-fps, current_frm, current_frm, current_frm+fps)
prior_window = np.mean(velocity[current_frm-fps: current_frm])
current_window = np.mean(velocity[current_frm: current_frm+fps])
results[current_frm:current_frm+fps] = current_window - prior_window
print(current_frm - fps, current_frm, current_frm, current_frm + fps)
prior_window = np.mean(velocity[current_frm - fps : current_frm])
current_window = np.mean(velocity[current_frm : current_frm + fps])
results[current_frm : current_frm + fps] = current_window - prior_window
return results




#
#
#
#
#
# start = time.time()
# nose_loc = np.random.randint(low=0, high=500, size=(231, 2)).astype('float32')
#results = FeatureExtractionSupplemental().horizontal_vs_vertical_movement(data=nose_loc, pixels_per_mm=4.33, fps=10, time_windows=np.array([0.4]))
# results = FeatureExtractionSupplemental().horizontal_vs_vertical_movement(data=nose_loc, pixels_per_mm=4.33, fps=10, time_windows=np.array([0.4]))


#results = FeatureExtractionSupplemental().border_distances(data=nose_loc, pixels_per_mm=4.33, fps=10, time_window=0.2, img_resolution=np.array([600, 400]))

#results = FeatureExtractionSupplemental().acceleration(data=nose_loc, pixels_per_mm=4.33, fps=10)
# results = FeatureExtractionSupplemental().border_distances(data=nose_loc, pixels_per_mm=4.33, fps=10, time_window=0.2, img_resolution=np.array([600, 400]))

# results = FeatureExtractionSupplemental().acceleration(data=nose_loc, pixels_per_mm=4.33, fps=10)


# left_ear_loc = np.random.randint(low=0, high=500, size=(10000, 2)).astype('float32')
Expand All @@ -378,13 +401,10 @@ def acceleration(data: np.ndarray,
# static_count = FeatureExtractionSupplemental().consecutive_time_series_categories_count(data=rotation.values, fps=10)


# rolling_angular_dispersion = FeatureExtractionSupplemental().rolling_angular_dispersion(data=angle_data, time_windows=np.array([0.4]), fps=10)


#rolling_angular_dispersion = FeatureExtractionSupplemental().rolling_angular_dispersion(data=angle_data, time_windows=np.array([0.4]), fps=10)



#print(time.time() - start)
# print(time.time() - start)


# # data = np.random.randint(low=0, high=100, size=(223)).astype('float32')
Expand All @@ -396,4 +416,4 @@ def acceleration(data: np.ndarray,
# start = time.time()
# data = np.random.randint(low=0, high=100, size=(50000000)).astype('float32')
# results = FeatureExtractionSupplemental().peak_ratio(data=data, group_size_s=1, fps=10)
# print(time.time() - start)
# print(time.time() - start)

0 comments on commit 4e09ee3

Please sign in to comment.