Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extract trend from signal #8

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions examples/filter/plot_hilbert_huang_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
=====================================================================================
Trend extraction using the Hilbert-Huang Transform
=====================================================================================
Example of trend extraction from non-linear, non-stationary signals using Empirical Mode Decomposition (EMD) and the
Hilbert-Huang Transform. We generate a synthetic signal composed of:
* Three oscillatory signals of different but significant amplitudes
* Two polynomial functions or trends
* Data drift
To make the case more realistic, from an industrial perspective, the timestamps are modified to make them nonuniform
and 35% of the data points are randomly removed. Finally, Gaussian noise with a signal-to-noise ratio of 10 db is
added to it.
The figure below shows each of the components of the synthetic signal (except for the Gaussian noise), the resulting
synthetic signal and the trend obtained by means of Empirical Mode Decomposition and the Hilbert-Huang method
implemented. It can be seen that the trend reflects the general signal behaviour.
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from indsl.signals import insert_data_gaps, line, perturb_timestamp, sine_wave, white_noise
from matplotlib.dates import DateFormatter
from indsl.filter import hilbert_huang_transform

start_date = pd.Timestamp("2023-07-14")
end_date = pd.Timestamp("2023-07-16")

# Wave 1: Small amplitude, long wave period
wave01 = sine_wave(
start_date=start_date,
end_date=end_date,
sample_freq=pd.Timedelta("1m"),
wave_period=pd.Timedelta("6h"),
wave_mean=0,
wave_amplitude=6.5,
wave_phase=0,
)
wave01 = np.exp(wave01)

# Wave 2: Large amplitude, short wave period
wave02 = sine_wave(
start_date=start_date,
end_date=end_date,
sample_freq=pd.Timedelta("1m"),
wave_period=pd.Timedelta("1h"),
wave_mean=0,
wave_amplitude=100,
wave_phase=0,
)

# Wave 3: Large amplitude, short wave period
wave03 = sine_wave(
start_date=start_date,
end_date=end_date,
sample_freq=pd.Timedelta("1m"),
wave_period=pd.Timedelta("3h"),
wave_mean=5,
wave_amplitude=35,
wave_phase=np.pi,
)

# Trends
trend_01 = (
line(start_date=start_date, end_date=end_date, sample_freq=pd.Timedelta("1m"), slope=0.00008, intercept=1) ** 3
)

trend_02 = (
line(start_date=start_date, end_date=end_date, sample_freq=pd.Timedelta("1m"), slope=-0.00005, intercept=5) ** 5
)

drift = line(start_date=start_date, end_date=end_date, sample_freq=pd.Timedelta("1m"), slope=0.00005, intercept=0)

signal = (
pd.Series(wave01)
+ pd.Series(wave02)
+ pd.Series(wave03)
+ pd.Series(trend_01)
+ pd.Series(trend_02)
- pd.Series(drift)
)
signal_w_noise = perturb_timestamp(white_noise(signal, snr_db=10))
signal_to_detrend = insert_data_gaps(signal_w_noise, fraction=0.35)

# Unpack the trend values from the tuple
trend_rho = hilbert_huang_transform(signal_to_detrend)

fig, ax = plt.subplots(3, 1, figsize=[9, 7])

ax[0].plot(wave01, label="Wave 1")
ax[0].plot(wave02, label="Wave 2")
ax[0].plot(wave03, label="Wave 3")
ax[0].set_title("Oscillatory components")
ax[0].set_ylabel("Amplitude")
ax[0].legend()

ax[1].plot(trend_01, label="Polynomial 1")
ax[1].plot(trend_02, label="Polynomial 2")
ax[1].set_title("Trends & Drift")
ax[1].set_ylabel("Magnitude")
ax[1].legend()

color = "tab:red"
ax2 = ax[1].twinx()
ax2.plot(-drift, color=color)
ax2.set_ylabel("Drift", color=color)
ax2.tick_params(axis="y", labelcolor=color)

ax[2].plot(signal, label="Signal without noise")
ax[2].set_title("Signal without noise")
ax[2].set_ylabel("Magnitude")
ax[2].set_xlabel("Date")

# sphinx_gallery_thumbnail_number = 2
fig2, axs = plt.subplots(figsize=[9, 7])

# original signal
axs.plot(signal_to_detrend.index, signal_to_detrend.values, label="Signal")

# Trend extracted from the signal using rho
axs.plot(signal_to_detrend.index, trend_rho, label="Trend of the signal")

axs.set_title("Trend found using the Hilbert-Huang Transform")

# Formatting x axis
axs.xaxis.set_major_formatter(DateFormatter("%b %d, %H:%M"))
plt.setp(axs.get_xticklabels(), rotation=45)
axs.legend(loc="lower right")
plt.tight_layout()
plt.show()
3 changes: 2 additions & 1 deletion indsl/filter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright 2023 Cognite AS
from .hilbert_huang_transform import hilbert_huang_transform
from .simple_filters import status_flag_filter
from .wavelet_filter import wavelet_filter


TOOLBOX_NAME = "Filter"

__all__ = ["wavelet_filter", "status_flag_filter"]
__all__ = ["hilbert_huang_transform", "wavelet_filter", "status_flag_filter"]

__cognite__ = ["wavelet_filter", "status_flag_filter"]
Copy link
Contributor

@neringaalt neringaalt Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add it here as well

205 changes: 205 additions & 0 deletions indsl/filter/hilbert_huang_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from typing import Optional

import numpy as np
import pandas as pd

from scipy.signal import hilbert

from indsl.ts_utils import get_timestamps


# DROP_SENTINAL is used to get the smallest (most negative)
# number that can be represented with a 32-bit signed integer.
DROP_SENTINAL = np.iinfo(np.int32).min
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And use it directly in the code



def _compute_hilbert_huang_2d_spectrum(values_X, values_Z, x_boundaries):
"""Compute 2D Hilbert-Huang spectrum.

Calculate a 2D Hilbert-Huang power distribution in sparse format.
This utility function generates a sparse matrix that captures a two-dimensional
power distribution.
"""
from sparse import COO

# Calculates bin indices for values_X using the np.digitize() function.
# This function returns an array of indices that can be used to bin the
# input values_X into the bins defined by x_boundaries. Any values in
# values_X that are out of the range defined by x_boundaries are assigned
# the DROP_SENTINAL value.
primary_indices = np.digitize(values_X, x_boundaries) - 1
out_of_bounds = np.logical_or(values_X < x_boundaries[0], values_X >= x_boundaries[-1])
primary_indices[out_of_bounds] = DROP_SENTINAL

# Create meshgrids for time and IMF indices
time_indices, imf_indices = np.meshgrid(np.arange(values_X.shape[0]), np.arange(values_X.shape[1]), indexing="ij")

# Flatten arrays for COO matrix creation
flattened_coordinates = np.vstack([primary_indices.ravel(), time_indices.ravel(), imf_indices.ravel()])
exclusions = np.any(flattened_coordinates == DROP_SENTINAL, axis=0)

# Apply the exclusions mask to values_Z
values_Z_masked = np.ma.masked_array(values_Z, mask=exclusions.reshape(values_Z.shape)).compressed()

# Adjust coordinates for valid points
valid_coordinates = flattened_coordinates[:, ~exclusions]

# Create the sparse COO matrix
spectrum_shape = (x_boundaries.shape[0] - 1, values_X.shape[0], values_X.shape[1])
sparse_spectrum = COO(valid_coordinates, values_Z_masked, shape=spectrum_shape)

return sparse_spectrum


def _calculate_histogram_bins_and_centers(dataset, margin=1e-8):
"""Calculate histogram bins and centers.

Determine bin edges for a histogram based on the provided dataset.
"""
# Determine the data range with margin
data_lower_bound = dataset.min() - margin
data_upper_bound = dataset.max() + margin

# Calculate the number of bins
bin_count = min(int(np.sqrt(dataset.size)), 2048) # Assuming a bin limit of 2048

# Establish bin edges
bin_edges = np.linspace(data_lower_bound, data_upper_bound, bin_count + 1)

return bin_edges


def _hilbert_spectrum(
instant_freq,
instant_amp,
use_sparse_format=True,
):
"""Compute a Hilbert-Huang Spectrum (HHT).

Calculate the Hilbert-Huang Transform (HHT) from instantaneous frequency and amplitude.
This transform depicts the energy of a time-series signal over both time and frequency domains. By default, the
output is aggregated over time and IMFs, giving only the frequency spectrum.
"""
# Preliminary checks and setups. Add a dimension if necessary.
instant_freq = instant_freq[:, np.newaxis] if instant_freq.ndim == 1 else instant_freq
instant_amp = instant_amp[:, np.newaxis] if instant_amp.ndim == 1 else instant_amp

bin_edges = _calculate_histogram_bins_and_centers(instant_freq.ravel())

# Calculate the 2D Hilbert spectrum
spectral_data = _compute_hilbert_huang_2d_spectrum(instant_freq, instant_amp, bin_edges)

# Calculate the final spectrum as a power spectrum and return the result in the desired format (sparse or dense)
final_spectra = spectral_data**2 if use_sparse_format else spectral_data.todense()
return final_spectra


def hilbert_huang_transform(
signal: pd.Series,
sift_thresh: float = 1e-8,
max_num_imfs: Optional[int] = None,
error_tolerance: float = 0.05,
return_trend: bool = True,
) -> pd.Series:
r"""Perform the Hilbert-Huang Transform (HHT) to find the trend of a signal.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can look at the description from removed function to get some idea how to write documentation.
You dont need to explain what different type of signals are...


The Hilbert-Huang Transform is a technique that combines Empirical Mode Decomposition (EMD)
and the Hilbert Transform to analyze non-stationary and non-linear time series data,
where the Hilbert transform is applied to each mode after having performed the EMD.
Non-stationary signals are signals that vary in frequency and amplitude over time,
and cannot be adequately represented by fixed parameters, whereas non-linear signals are signals
that cannot be represented by a linear function and can exhibit complex and unpredictable behavior.
Signals from different physical phenomena in the world are rarely purely linear or
stationary, so the HHT is a useful tool for analyzing real-world signals.

Given their complexity, it is often difficult to study the entire signals as a whole.
Therefore, the HHT aims to capture the time-varying nature and non-linear dynamics of such signals by
decomposing them into individual oscillatory components by the use the EMD.
These components are oscillatory modes of the full signal with well-defined instantaneous frequencies called
Intrinsic Mode Functions (IMFs). We calculate the hilbert spectrum of the IMFs to determine the significant ones
from which we extract the trend as their sum. You can read more about this in the given sources.

Args:
signal (pd.Series): Input time series signal.
sift_thresh (float, optional): The threshold used for sifting convergence. Defaults to 1e-8.
max_num_imfs (int, optional): The maximum number of oscillatory components to extract. If None, extract all IMFs. Defaults to None.
error_tolerance (float, optional): The tolerance for determining significant IMFs. Defaults to 0.05.
return_trend (bool, optional): Flag indicating whether to return the trend component of the signal. Defaults to True.

Returns:
pd.Series: The detrended signal if `return_trend` is False, otherwise, the trend component of the signal.

Raises:
ValueError: If `sift_thresh` is not a number higher than zero.
ValueError: If `max_num_imfs` is not an integer higher than zero.
ValueError: If `error_tolerance` is not higher than zero.
Any exceptions that may occur during Empirical Mode Decomposition (EMD).


References:
- Huang, Norden E., et al. "The empirical mode decomposition and the Hilbert spectrum for nonlinear and non-stationary time series analysis."
Proceedings of the Royal Society of London. Series A: Mathematical, Physical and Engineering Sciences 454.1971 (1998): 903-995.
- Yang, Zhijing & Bingham, Chris & Ling, Bingo & Gallimore, Michael & Stewart, Paul & Zhang, Yu. (2012).
"Trend extraction based on Hilbert-Huang transform." 1-5. 10.1109/CSNDSP.2012.6292713.
"""
from PyEMD import EMD

if sift_thresh <= 0:
raise ValueError("The sifting threshold must be positive")
if max_num_imfs is not None and max_num_imfs <= 0:
raise ValueError("The maximum number of IMFs must be positive")
if error_tolerance <= 0:
raise ValueError("Error tolerance must be positive")

signal_array = signal.to_numpy()

# Empirical Mode Decomposition
emd = EMD()
# If max_num_imfs is None, Python's slicing will automatically include all IMFs.
imfs = emd(signal_array)[:max_num_imfs]

# Compute the analytic signal for each IMF using the Hilbert transform and store them in a list
analytic_signals = [hilbert(imf) for imf in imfs]

# Find the total number of IMFs and the index of the last IMF
index_of_the_last_imf = imfs.shape[1] - 1
significant_imf_index_rho = len(imfs) - 1

# Compute the instantaneous frequency and amplitude of each IMF
phase = np.unwrap(np.angle(analytic_signals))
dt = np.mean(np.diff(get_timestamps(signal, "s").to_numpy()))
frequency = np.gradient(phase) / (2 * np.pi * dt)
amplitude = np.abs(analytic_signals)

# Transpose to get the right shape for the array
frequency_array = np.array(frequency).T

# average over time
avg_frequency_array = np.mean(frequency_array, axis=-1)

amplitude_array = np.array(amplitude)

# Ensure that the arrays are 1D by flattening them
# to avoid errors if the arrays have more than 2 dimensions
flat_amplitude = amplitude_array.flatten()
flat_avg_frequency = avg_frequency_array.flatten()

# compute Hilbert spectrum
hht = _hilbert_spectrum(flat_avg_frequency, flat_amplitude)

# Loop for rho calculations
for i in range(index_of_the_last_imf - 1, -1, -1):
rho = np.sum(hht[:, i] * hht[:, i + 1]) / (np.sum(hht[:, i]) * np.sum(hht[:, i + 1])) # type: ignore

if rho < error_tolerance:
break
else:
significant_imf_index_rho -= 1

trend_rho = np.sum(imfs[significant_imf_index_rho:], axis=0, dtype=np.float64)
trend_series_rho = pd.Series(trend_rho, index=signal.index)

# Return the trend component of the signal if return_trend is True, otherwise, return the detrended signal
result_rho = trend_series_rho if return_trend else signal - trend_series_rho

return result_rho
Loading
Loading