Skip to content

Commit

Permalink
100% peaks
Browse files Browse the repository at this point in the history
  • Loading branch information
ssolson committed Feb 20, 2024
1 parent 3c6f892 commit 7605238
Showing 1 changed file with 89 additions and 67 deletions.
156 changes: 89 additions & 67 deletions mhkit/loads/extreme/peaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ def _calculate_window_size(peaks: NDArray[np.float64], sampling_rate: float) ->
float
The window size determined by the auto-correlation function.
"""
nlags = int(14 * 24 / sampling_rate)
x = peaks - np.mean(peaks)
acf = signal.correlate(x, x, mode="full")
lag = signal.correlation_lags(len(x), len(x), mode="full")
n_lags = int(14 * 24 / sampling_rate)
deviations_from_mean = peaks - np.mean(peaks)
acf = signal.correlate(deviations_from_mean, deviations_from_mean, mode="full")
lag = signal.correlation_lags(len(peaks), len(peaks), mode="full")
idx_zero = np.argmax(lag == 0)
positive_lag = lag[idx_zero : idx_zero + nlags + 1]
acf_positive = acf[idx_zero : idx_zero + nlags + 1] / acf[idx_zero]
positive_lag = lag[idx_zero : idx_zero + n_lags + 1]
acf_positive = acf[idx_zero : idx_zero + n_lags + 1] / acf[idx_zero]

window_size = sampling_rate * positive_lag[acf_positive < 0.5][0]
return window_size / sampling_rate
Expand Down Expand Up @@ -122,7 +122,7 @@ def _peaks_over_threshold(
return independent_storm_peaks


def global_peaks(t: np.ndarray, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
def global_peaks(time: np.ndarray, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Find the global peaks of a zero-centered response time-series.
Expand All @@ -131,25 +131,25 @@ def global_peaks(t: np.ndarray, data: np.ndarray) -> Tuple[np.ndarray, np.ndarra
Parameters
----------
t: np.array
time: np.array
Time array.
data: np.array
Response time-series.
Returns
-------
t_peaks: np.array
time_peaks: np.array
Time array for peaks
peaks: np.array
Peak values of the response time-series
"""
if not isinstance(t, np.ndarray):
raise TypeError(f"t must be of type np.ndarray. Got: {type(t)}")
if not isinstance(time, np.ndarray):
raise TypeError(f"time must be of type np.ndarray. Got: {type(time)}")
if not isinstance(data, np.ndarray):
raise TypeError(f"data must be of type np.ndarray. Got: {type(data)}")

# Find zero up-crossings
inds = upcrossing(t, data)
inds = upcrossing(time, data)

# We also include the final point in the dataset
inds = np.append(inds, len(data) - 1)
Expand All @@ -168,38 +168,38 @@ def find_peak_index(ind1, ind2):
dtype=int,
)

return t[peak_inds], data[peak_inds]
return time[peak_inds], data[peak_inds]


def number_of_short_term_peaks(n: int, t: float, t_st: float) -> float:
def number_of_short_term_peaks(n_peaks: int, time: float, time_st: float) -> float:
"""
Estimate the number of peaks in a specified period.
Parameters
----------
n : int
n_peaks : int
Number of peaks in analyzed timeseries.
t : float
time : float
Length of time of analyzed timeseries.
t_st: float
time_st: float
Short-term period for which to estimate the number of peaks.
Returns
-------
n_st : float
Number of peaks in short term period.
"""
if not isinstance(n, int):
raise TypeError(f"n must be of type int. Got: {type(n)}")
if not isinstance(t, float):
raise TypeError(f"t must be of type float. Got: {type(t)}")
if not isinstance(t_st, float):
raise TypeError(f"t_st must be of type float. Got: {type(t_st)}")
if not isinstance(n_peaks, int):
raise TypeError(f"n_peaks must be of type int. Got: {type(n_peaks)}")
if not isinstance(time, float):
raise TypeError(f"time must be of type float. Got: {type(time)}")
if not isinstance(time_st, float):
raise TypeError(f"time_st must be of type float. Got: {type(time_st)}")

return n * t_st / t
return n_peaks * time_st / time


def peaks_distribution_weibull(x: NDArray[np.float_]) -> rv_continuous:
def peaks_distribution_weibull(peaks_data: NDArray[np.float_]) -> rv_continuous:
"""
Estimate the peaks distribution by fitting a Weibull
distribution to the peaks of the response.
Expand All @@ -209,28 +209,33 @@ def peaks_distribution_weibull(x: NDArray[np.float_]) -> rv_continuous:
Parameters
----------
x : NDArray[np.float_]
peaks_data : NDArray[np.float_]
Global peaks.
Returns
-------
peaks: scipy.stats.rv_frozen
Probability distribution of the peaks.
"""
if not isinstance(x, np.ndarray):
raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}")
if not isinstance(peaks_data, np.ndarray):
raise TypeError(
f"peaks_data must be of type np.ndarray. Got: {type(peaks_data)}"
)

# peaks distribution
peaks_params = stats.exponweib.fit(x, f0=1, floc=0)
peaks_params = stats.exponweib.fit(peaks_data, f0=1, floc=0)
param_names = ["a", "c", "loc", "scale"]
peaks_params = {k: v for k, v in zip(param_names, peaks_params)}
peaks_params = dict(zip(param_names, peaks_params))
peaks = stats.exponweib(**peaks_params)
# save the parameter info
peaks.params = peaks_params
return peaks


def peaks_distribution_weibull_tail_fit(x: NDArray[np.float_]) -> rv_continuous:
# pylint: disable=R0914
def peaks_distribution_weibull_tail_fit(
peaks_data: NDArray[np.float_],
) -> rv_continuous:
"""
Estimate the peaks distribution using the Weibull tail fit
method.
Expand All @@ -240,45 +245,49 @@ def peaks_distribution_weibull_tail_fit(x: NDArray[np.float_]) -> rv_continuous:
Parameters
----------
x : np.array
peaks_data : np.array
Global peaks.
Returns
-------
peaks: scipy.stats.rv_frozen
Probability distribution of the peaks.
"""
if not isinstance(x, np.ndarray):
raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}")
if not isinstance(peaks_data, np.ndarray):
raise TypeError(
f"peaks_data must be of type np.ndarray. Got: {type(peaks_data)}"
)

# Initial guess for Weibull parameters
p0 = stats.exponweib.fit(x, f0=1, floc=0)
p0 = np.array([p0[1], p0[3]])
p_0 = stats.exponweib.fit(peaks_data, f0=1, floc=0)
p_0 = np.array([p_0[1], p_0[3]])
# Approximate CDF
x = np.sort(x)
npeaks = len(x)
F = np.zeros(npeaks)
for i in range(npeaks):
F[i] = i / (npeaks + 1.0)
peaks_data = np.sort(peaks_data)
n_peaks = len(peaks_data)
cdf_positions = np.zeros(n_peaks)
for i in range(n_peaks):
cdf_positions[i] = i / (n_peaks + 1.0)
# Divide into seven sets & fit Weibull
subset_shape_params = np.zeros(7)
subset_scale_params = np.zeros(7)
set_lim = np.arange(0.60, 0.90, 0.05)

def weibull_cdf(x, c, s):
return stats.exponweib(a=1, c=c, loc=0, scale=s).cdf(x)
def weibull_cdf(data_points, shape, scale):
return stats.exponweib(a=1, c=shape, loc=0, scale=scale).cdf(data_points)

for local_set in range(7):
x_set = x[(F > set_lim[local_set])]
f_set = F[(F > set_lim[local_set])]
global_peaks_set = peaks_data[(cdf_positions > set_lim[local_set])]
cdf_positions_set = cdf_positions[(cdf_positions > set_lim[local_set])]
# pylint: disable=W0632
popt, _ = optimize.curve_fit(weibull_cdf, x_set, f_set, p0=p0)
subset_shape_params[local_set] = popt[0]
subset_scale_params[local_set] = popt[1]
p_opt, _ = optimize.curve_fit(
weibull_cdf, global_peaks_set, cdf_positions_set, p0=p_0
)
subset_shape_params[local_set] = p_opt[0]
subset_scale_params[local_set] = p_opt[1]
# peaks distribution
peaks_params = [1, np.mean(subset_shape_params), 0, np.mean(subset_scale_params)]
param_names = ["a", "c", "loc", "scale"]
peaks_params = {k: v for k, v in zip(param_names, peaks_params)}
peaks_params = dict(zip(param_names, peaks_params))
peaks = stats.exponweib(**peaks_params)
# save the parameter info
peaks.params = peaks_params
Expand All @@ -287,6 +296,7 @@ def weibull_cdf(x, c, s):
return peaks


# pylint: disable=R0914
def automatic_hs_threshold(
peaks: NDArray[np.float_],
sampling_rate: float,
Expand All @@ -300,7 +310,8 @@ def automatic_hs_threshold(
This method was developed by:
> Neary, V. S., S. Ahn, B. E. Seng, M. N. Allahdadi, T. Wang, Z. Yang and R. He (2020).
> "Characterization of Extreme Wave Conditions for Wave Energy Converter Design and Project Risk Assessment.”
> "Characterization of Extreme Wave Conditions for Wave Energy Converter Design and
> Project Risk Assessment.”
> J. Mar. Sci. Eng. 2020, 8(4), 289; https://doi.org/10.3390/jmse8040289.
Please cite this paper if using this method.
Expand Down Expand Up @@ -385,7 +396,7 @@ def automatic_hs_threshold(


def peaks_distribution_peaks_over_threshold(
x: NDArray[np.float_], threshold: Optional[float] = None
peaks_data: NDArray[np.float_], threshold: Optional[float] = None
) -> rv_continuous:
"""
Estimate the peaks distribution using the peaks over threshold
Expand All @@ -401,7 +412,7 @@ def peaks_distribution_peaks_over_threshold(
Parameters
----------
x : NDArray[np.float_]
peaks_data : NDArray[np.float_]
Global peaks.
threshold : Optional[float]
Threshold value. Only peaks above this value will be used.
Expand All @@ -412,24 +423,26 @@ def peaks_distribution_peaks_over_threshold(
peaks: rv_continuous
Probability distribution of the peaks.
"""
if not isinstance(x, np.ndarray):
raise TypeError(f"x must be of type np.ndarray. Got: {type(x)}")
if not isinstance(peaks_data, np.ndarray):
raise TypeError(
f"peaks_data must be of type np.ndarray. Got: {type(peaks_data)}"
)
if threshold is None:
threshold = np.mean(x) + 1.4 * np.std(x)
threshold = np.mean(peaks_data) + 1.4 * np.std(peaks_data)
if threshold is not None and not isinstance(threshold, float):
raise TypeError(
f"If specified, threshold must be of type float. Got: {type(threshold)}"
)

# peaks over threshold
x = np.sort(x)
pot = x[x > threshold] - threshold
npeaks = len(x)
peaks_data = np.sort(peaks_data)
pot = peaks_data[peaks_data > threshold] - threshold
npeaks = len(peaks_data)
npot = len(pot)
# Fit a generalized Pareto
pot_params = stats.genpareto.fit(pot, floc=0.0)
param_names = ["c", "loc", "scale"]
pot_params = {k: v for k, v in zip(param_names, pot_params)}
pot_params = dict(zip(param_names, pot_params))
pot = stats.genpareto(**pot_params)
# save the parameter info
pot.params = pot_params
Expand All @@ -443,15 +456,24 @@ def __init__(
self.threshold = threshold
super().__init__(*args, **kwargs)

def _cdf(self, x: NDArray[np.float_], *args, **kwargs) -> NDArray[np.float_]:
x = np.atleast_1d(x)
out = np.zeros_like(x)
out[x < self.threshold] = np.NaN
xt = x[x >= self.threshold]
if xt.size != 0:
pot_ccdf = 1.0 - self.pot.cdf(xt - self.threshold, *args, **kwargs)
# pylint: disable=arguments-differ
def _cdf(self, data_points, *args, **kwds) -> NDArray[np.float_]:
# Convert data_points to a NumPy array if it's not already
data_points = np.atleast_1d(data_points)
out = np.zeros_like(data_points)

# Use the instance's threshold attribute instead of passing as a parameter
below_threshold = data_points < self.threshold
out[below_threshold] = np.NaN

above_threshold_indices = ~below_threshold
if np.any(above_threshold_indices):
points_above_threshold = data_points[above_threshold_indices]
pot_ccdf = 1.0 - self.pot.cdf(
points_above_threshold - self.threshold, *args, **kwds
)
prop_pot = npot / npeaks
out[x >= self.threshold] = 1.0 - (prop_pot * pot_ccdf)
out[above_threshold_indices] = 1.0 - (prop_pot * pot_ccdf)
return out

peaks = _Peaks(name="peaks", pot_distribution=pot, threshold=threshold)
Expand Down

0 comments on commit 7605238

Please sign in to comment.