Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized time series k means barycenter update procedure #321

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 56 additions & 34 deletions tslearn/barycenters/dba.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from scipy.interpolate import interp1d
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils import check_random_state
from joblib import parallel_backend, Parallel, delayed

from ..metrics import dtw_path
from ..utils import to_time_series_dataset, ts_size
Expand Down Expand Up @@ -148,7 +149,8 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None,
for dynamic time warping, with applications to clustering. Pattern
Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693
"""
X_ = to_time_series_dataset(X)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you want to avoid a copy here, but making sure that the data is in the right format is important. Maybe we shoud tweak the to_time_series_dataset function to add an avoid_copy_if_possible parameter, WDYT?

#X_ = to_time_series_dataset(X)
X_ = X
if barycenter_size is None:
barycenter_size = X_.shape[1]
weights = _set_weights(weights, X_.shape[0])
Expand Down Expand Up @@ -176,7 +178,7 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None,
return barycenter


def _mm_assignment(X, barycenter, weights, metric_params=None):
def _mm_assignment(X, barycenter, weights, metric_params=None, n_treads=15):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

"""Computes item assignement based on DTW alignments and return cost as a
bonus.

Expand Down Expand Up @@ -210,15 +212,23 @@ def _mm_assignment(X, barycenter, weights, metric_params=None):
n = X.shape[0]
cost = 0.
list_p_k = []
for i in range(n):
path, dist_i = dtw_path(barycenter, X[i], **metric_params)
cost += dist_i ** 2 * weights[i]
list_p_k.append(path)
#with parallel_backend('threading'):
res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define a cdist_dtw_path function instead?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should design a high-efficiency cdist_dtw_path function instead. The main bottlenecks are _mm_assignment function and _subgradient_valence_warping from my view. However, as DBA is communication-intensive, we require a good tune on parallelizing DBA's implementation. Perhaps, a good target is to ensure that all the available computing units are at the desired utilization level.

paths, dists = zip(*res)
paths = list(paths)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need these 2 casts?

dists = list(dists)
cost = numpy.sum(numpy.multiply(numpy.power(dists, 2),weights))
cost /= weights.sum()
list_p_k = paths
#for i in range(n):
# path, dist_i = dtw_path(barycenter, X[i], **metric_params)
# cost += dist_i ** 2 * weights[i]
# list_p_k.append(path)
#cost /= weights.sum()
return list_p_k, cost


def _subgradient_valence_warping(list_p_k, barycenter_size, weights):
def _subgradient_valence_warping(list_p_k, barycenter_size, weights, n_treads):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

"""Compute Valence and Warping matrices from paths.

Valence matrices are denoted :math:`V^{(k)}` and Warping matrices are
Expand Down Expand Up @@ -253,19 +263,22 @@ def _subgradient_valence_warping(list_p_k, barycenter_size, weights):
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
list_v_k = []
list_w_k = []
for k, p_k in enumerate(list_p_k):
sz_k = p_k[-1][1] + 1
w_k = numpy.zeros((barycenter_size, sz_k))
def create_w_k(p_k, barycenter_size):
w_k = numpy.zeros((barycenter_size, p_k[-1][1] + 1))
for i, j in p_k:
w_k[i, j] = 1.
list_w_k.append(w_k * weights[k])
list_v_k.append(w_k.sum(axis=1) * weights[k])
return w_k
w_k_ones = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(create_w_k)(p_k,barycenter_size) for p_k in list_p_k)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part really crucial to parallelize? I would expect that it would be neglectible in terms of running times.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function cannot fully utilize CPU cores, even if we force to parallelize dba on different clusters and then becomes the bottleneck.

def mult_w_k_weights(w_k, weight):
return w_k * weight
list_w_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(mult_w_k_weights)(w_k,weights[k]) for k,w_k in enumerate(w_k_ones))
def mult_w_k_sum_wieghts(w_k, weight):
return w_k.sum(axis=1) * weight
list_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(mult_w_k_sum_wieghts)(w_k,weight) for (w_k, weight) in zip(w_k_ones, weights))
return list_v_k, list_w_k


def _mm_valence_warping(list_p_k, barycenter_size, weights):
def _mm_valence_warping(list_p_k, barycenter_size, weights, n_treads):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

"""Compute Valence and Warping matrices from paths.

Valence matrices are denoted :math:`V^{(k)}` and Warping matrices are
Expand Down Expand Up @@ -304,13 +317,15 @@ def _mm_valence_warping(list_p_k, barycenter_size, weights):
list_p_k=list_p_k,
barycenter_size=barycenter_size,
weights=weights)
diag_sum_v_k = numpy.zeros(list_v_k[0].shape)
for v_k in list_v_k:
diag_sum_v_k += v_k
#diag_sum_v_k = numpy.zeros(list_v_k[0].shape)
#for v_k in list_v_k:
# diag_sum_v_k += v_k
diag_sum_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(list_v_k)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, I'm not sure we need to parallelize this part.

diag_sum_v_k = numpy.array(diag_sum_v_k)
return diag_sum_v_k, list_w_k


def _mm_update_barycenter(X, diag_sum_v_k, list_w_k):
def _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

"""Update barycenters using the formula from Algorithm 2 in [1]_.

Parameters
Expand All @@ -336,12 +351,17 @@ def _mm_update_barycenter(X, diag_sum_v_k, list_w_k):
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
d = X.shape[2]
barycenter_size = diag_sum_v_k.shape[0]
sum_w_x = numpy.zeros((barycenter_size, d))
for k, (w_k, x_k) in enumerate(zip(list_w_k, X)):
sum_w_x += w_k.dot(x_k[:ts_size(x_k)])
barycenter = numpy.diag(1. / diag_sum_v_k).dot(sum_w_x)
#d = X.shape[2]
#barycenter_size = diag_sum_v_k.shape[0]
#sum_w_x = numpy.zeros((barycenter_size, d))
#for k, (w_k, x_k) in enumerate(zip(list_w_k, X)):
# sum_w_x += w_k.dot(x_k[:ts_size(x_k)])
sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.dot)(w_k,x_k[:ts_size(x_k)]) for (w_k, x_k) in zip(list_w_k, X))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, I'm not sure we need to parallelize this part.

sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem',n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(sum_w_x)))
sum_w_x = numpy.array(sum_w_x).reshape((len(sum_w_x), 1))
inverse_diag_sum_v_k = numpy.reciprocal(diag_sum_v_k)
inverse_diag_sum_v_k = numpy.diag(inverse_diag_sum_v_k)
barycenter = inverse_diag_sum_v_k.dot(sum_w_x)
return barycenter


Expand Down Expand Up @@ -392,7 +412,7 @@ def _subgradient_update_barycenter(X, list_diag_v_k, list_w_k, weights_sum,


def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None,
max_iter=30, tol=1e-5, weights=None,
max_iter=30, tol=1e-5, weights=None, n_treads=15,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

metric_params=None,
verbose=False, n_init=1):
"""DTW Barycenter Averaging (DBA) method estimated through
Expand Down Expand Up @@ -514,6 +534,7 @@ def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None,
def dtw_barycenter_averaging_one_init(X, barycenter_size=None,
init_barycenter=None,
max_iter=30, tol=1e-5, weights=None,
n_treads=15,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use n_jobs everywhere as a parameter name, and None as its default, we should stick to the same convention here, I think.

metric_params=None,
verbose=False):
"""DTW Barycenter Averaging (DBA) method estimated through
Expand Down Expand Up @@ -576,23 +597,23 @@ def dtw_barycenter_averaging_one_init(X, barycenter_size=None,
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
X_ = to_time_series_dataset(X)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about the array copy issue.

#X_ = to_time_series_dataset(X)
if barycenter_size is None:
barycenter_size = X_.shape[1]
weights = _set_weights(weights, X_.shape[0])
barycenter_size = X.shape[1]
weights = _set_weights(weights, X.shape[0])
if init_barycenter is None:
barycenter = _init_avg(X_, barycenter_size)
barycenter = _init_avg(X, barycenter_size)
else:
barycenter_size = init_barycenter.shape[0]
barycenter = init_barycenter
cost_prev, cost = numpy.inf, numpy.inf
for it in range(max_iter):
list_p_k, cost = _mm_assignment(X_, barycenter, weights, metric_params)
list_p_k, cost = _mm_assignment(X, barycenter, weights, n_treads, metric_params)
diag_sum_v_k, list_w_k = _mm_valence_warping(list_p_k, barycenter_size,
weights)
weights, n_treads)
if verbose:
print("[DBA] epoch %d, cost: %.3f" % (it + 1, cost))
barycenter = _mm_update_barycenter(X_, diag_sum_v_k, list_w_k)
barycenter = _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads)
if abs(cost_prev - cost) < tol:
break
elif cost_prev < cost:
Expand Down Expand Up @@ -698,7 +719,8 @@ def dtw_barycenter_averaging_subgradient(X, barycenter_size=None,
"""
rng = check_random_state(random_state)

X_ = to_time_series_dataset(X)
#X_ = to_time_series_dataset(X)
X_ = X
if barycenter_size is None:
barycenter_size = X_.shape[1]
weights = _set_weights(weights, X_.shape[0])
Expand Down
57 changes: 36 additions & 21 deletions tslearn/clustering/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from scipy.spatial.distance import cdist
import numpy
import warnings
from joblib import parallel_backend, Parallel, delayed

from tslearn.metrics import cdist_gak, cdist_dtw, cdist_soft_dtw, sigma_gak
from tslearn.barycenters import euclidean_barycenter, \
Expand Down Expand Up @@ -506,7 +507,7 @@ class TimeSeriesKMeans(TransformerMixin, ClusterMixin,

verbose : int (default: 0)
If nonzero, print information about the inertia while learning
the model and joblib progress messages are printed.
the model and joblib progress messages are printed.

random_state : integer or numpy.RandomState, optional
Generator used to initialize the centers. If an integer is given, it
Expand Down Expand Up @@ -662,11 +663,11 @@ def _transform(self, X):
metric_params = self._get_metric_params()
if self.metric == "euclidean":
return cdist(X.reshape((X.shape[0], -1)),
self.cluster_centers_.reshape((self.n_clusters, -1)),
metric="euclidean")
self.cluster_centers_.reshape((self.n_clusters, -1)),
metric="euclidean")
elif self.metric == "dtw":
return cdist_dtw(X, self.cluster_centers_, n_jobs=self.n_jobs,
verbose=self.verbose, **metric_params)
verbose=self.verbose, **metric_params)
elif self.metric == "softdtw":
return cdist_soft_dtw(X, self.cluster_centers_, **metric_params)
else:
Expand All @@ -692,23 +693,37 @@ def _assign(self, X, update_class_attributes=True):

def _update_centroids(self, X):
metric_params = self._get_metric_params()
X_list = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I guess we could use a comprehension list for better readability.

for k in range(self.n_clusters):
if self.metric == "dtw":
self.cluster_centers_[k] = dtw_barycenter_averaging(
X=X[self.labels_ == k],
barycenter_size=None,
init_barycenter=self.cluster_centers_[k],
metric_params=metric_params,
verbose=False)
elif self.metric == "softdtw":
self.cluster_centers_[k] = softdtw_barycenter(
X=X[self.labels_ == k],
max_iter=self.max_iter_barycenter,
init=self.cluster_centers_[k],
**metric_params)
else:
self.cluster_centers_[k] = euclidean_barycenter(
X=X[self.labels_ == k])
X_list.append(X[self.labels_ == k])
cluster_centers = Parallel(backend='threading', prefer='threads', require='sharedmem', n_jobs=self.n_clusters, verbose=5)(delayed(dtw_barycenter_averaging)(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take care that here, the function to be called depends on the value of self.metric

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply parallelize DBA over the clusters only speedup less than 0.5x times. More effort is required.

X=X_list[k],
barycenter_size=None,
init_barycenter=self.cluster_centers_[k],
metric_params=metric_params, max_iter = 3,
verbose=True) for k in range(self.n_clusters))
for k in range(self.n_clusters):
self.cluster_centers_[k] = cluster_centers[k]
# def _update_centroids(self, X):
# metric_params = self._get_metric_params()
# for k in range(self.n_clusters):
# if self.metric == "dtw":
# self.cluster_centers_[k] = (dtw_barycenter_averaging(
# X=X[self.labels_ == k],
# barycenter_size=None,
# init_barycenter=self.cluster_centers_[k],
# metric_params=metric_params, max_iter = 3,
# verbose=False))
# elif self.metric == "softdtw":
# self.cluster_centers_[k] = softdtw_barycenter(
# X=X[self.labels_ == k],
# max_iter=self.max_iter_barycenter,
# init=self.cluster_centers_[k],
# **metric_params)
# else:
# self.cluster_centers_[k] = euclidean_barycenter(
# X=X[self.labels_ == k])
# print(self.cluster_centers_[0])

def fit(self, X, y=None):
"""Compute k-means clustering.
Expand Down Expand Up @@ -740,7 +755,7 @@ def fit(self, X, y=None):

max_attempts = max(self.n_init, 10)

X_ = to_time_series_dataset(X)
X_ = X
rs = check_random_state(self.random_state)

if self.init == "k-means++" and self.metric == "euclidean":
Expand Down
2 changes: 1 addition & 1 deletion tslearn/metrics/dtw_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _local_squared_dist(x, y):
@njit()
def njit_accumulated_matrix(s1, s2, mask):
"""Compute the accumulated cost matrix score between two time series.

Parameters
----------
s1 : array, shape = (sz1,)
Expand Down
30 changes: 11 additions & 19 deletions tslearn/metrics/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import numpy
from joblib import Parallel, delayed
from joblib import Parallel, delayed, parallel_backend
from tslearn.utils import to_time_series_dataset

__author__ = 'Romain Tavenard romain.tavenard[at]univ-rennes2.fr'
Expand All @@ -9,48 +9,39 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose,
compute_diagonal=True, dtype=numpy.float, *args, **kwargs):
"""Compute cross-similarity matrix with joblib parallelization for a given
similarity function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please revert all these line suppressions? They tend to pack the docstrings.

Parameters
----------
dist_fun : function
Similarity function to be used

dataset1 : array-like
A dataset of time series

dataset2 : array-like (default: None)
Another dataset of time series. If `None`, self-similarity of
`dataset1` is returned.

n_jobs : int or None, optional (default=None)
The number of jobs to run in parallel.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors. See scikit-learns'
`Glossary <https://scikit-learn.org/stable/glossary.html#term-n-jobs>`__
for more details.

verbose : int, optional (default=0)
The verbosity level: if non zero, progress messages are printed.
Above 50, the output is sent to stdout.
The frequency of the messages increases with the verbosity level.
If it more than 10, all iterations are reported.
`Glossary <https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation>`__
for more details.

compute_diagonal : bool (default: True)
Whether diagonal terms should be computed or assumed to be 0 in the
self-similarity case. Used only if `dataset2` is `None`.

*args and **kwargs :
Optional additional parameters to be passed to the similarity function.


Returns
-------
cdist : numpy.ndarray
Cross-similarity matrix
""" # noqa: E501
dataset1 = to_time_series_dataset(dataset1, dtype=dtype)
#dataset1 = to_time_series_dataset(dataset1, dtype=dtype)

if dataset2 is None:
# Inspired from code by @GillesVandewiele:
Expand All @@ -60,7 +51,7 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose,
k=0 if compute_diagonal else 1,
m=len(dataset1))
matrix[indices] = Parallel(n_jobs=n_jobs,
prefer="threads",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always prefer this? Or should it be specified when calling the function?

prefer="processes",
verbose=verbose)(
delayed(dist_fun)(
dataset1[i], dataset1[j],
Expand All @@ -74,12 +65,13 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose,
matrix[indices] = matrix.T[indices]
return matrix
else:
dataset2 = to_time_series_dataset(dataset2, dtype=dtype)
matrix = Parallel(n_jobs=n_jobs, prefer="threads", verbose=verbose)(
delayed(dist_fun)(
dataset1[i], dataset2[j],
*args, **kwargs
#dataset2 = to_time_series_dataset(dataset2, dtype=dtype)
with parallel_backend('loky'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good idea to specify the parallel_backend here? Will loky always be a better alternative or could it depend on some specificities or one's data?

matrix = Parallel(backend='loky', n_jobs=n_jobs, prefer="processes", verbose=verbose)(
delayed(dist_fun)(
dataset1[i], dataset2[j],
*args, **kwargs
)
for i in range(len(dataset1)) for j in range(len(dataset2))
)
for i in range(len(dataset1)) for j in range(len(dataset2))
)
return numpy.array(matrix).reshape((len(dataset1), -1))