forked from wmvanvliet/mne_microstates
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmne_microstates.py
274 lines (234 loc) · 9.9 KB
/
mne_microstates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""
Functions to segment EEG into microstates. Based on the Microsegment toolbox
for EEGlab, written by Andreas Trier Poulsen [1]_.
Author: Marijn van Vliet <[email protected]>
References
----------
.. [1] Poulsen, A. T., Pedroni, A., Langer, N., & Hansen, L. K. (2018).
Microstate EEGlab toolbox: An introductionary guide. bioRxiv.
"""
import warnings
import numpy as np
from scipy.stats import zscore
from scipy.signal import find_peaks
import matplotlib as mpl
from matplotlib import pyplot as plt
import mne
from mne.utils import logger, verbose
__version__ = '0.4dev0'
@verbose
def segment(data, n_states=4, n_inits=10, max_iter=1000, thresh=1e-6,
normalize=False, min_peak_dist=2, max_n_peaks=10000,
return_polarity=False, random_state=None, verbose=None):
"""Segment a continuous signal into microstates.
Peaks in the global field power (GFP) are used to find microstates, using a
modified K-means algorithm. Several runs of the modified K-means algorithm
are performed, using different random initializations. The run that
resulted in the best segmentation, as measured by global explained variance
(GEV), is used.
Parameters
----------
data : ndarray, shape (n_channels, n_samples)
The data to find the microstates in
n_states : int
The number of unique microstates to find. Defaults to 4.
n_inits : int
The number of random initializations to use for the k-means algorithm.
The best fitting segmentation across all initializations is used.
Defaults to 10.
max_iter : int
The maximum number of iterations to perform in the k-means algorithm.
Defaults to 1000.
thresh : float
The threshold of convergence for the k-means algorithm, based on
relative change in noise variance. Defaults to 1e-6.
normalize : bool
Whether to normalize (z-score) the data across time before running the
k-means algorithm. Defaults to ``False``.
min_peak_dist : int
Minimum distance (in samples) between peaks in the GFP. Defaults to 2.
max_n_peaks : int | None
Maximum number of GFP peaks to use in the k-means algorithm. Chosen
randomly. Set to ``None`` to use all peaks. Defaults to 10000.
return_polarity : bool
Whether to return the polarity of the activation.
Defaults to ``False``.
random_state : int | numpy.random.RandomState | None
The seed or ``RandomState`` for the random number generator. Defaults
to ``None``, in which case a different seed is chosen each time this
function is called.
verbose : int | bool | None
Controls the verbosity.
Returns
-------
maps : ndarray, shape (n_channels, n_states)
The topographic maps of the found unique microstates.
segmentation : ndarray, shape (n_samples,)
For each sample, the index of the microstate to which the sample has
been assigned.
polarity : ndarray, shape (n_samples,)
For each sample, the polarity (+1 or -1) of the activation on the
currently activate map.
References
----------
.. [1] Pascual-Marqui, R. D., Michel, C. M., & Lehmann, D. (1995).
Segmentation of brain electrical activity into microstates: model
estimation and validation. IEEE Transactions on Biomedical
Engineering.
"""
logger.info('Finding %d microstates, using %d random intitializations' %
(n_states, n_inits))
if normalize:
data = zscore(data, axis=1)
# Find peaks in the global field power (GFP)
gfp = np.std(data, axis=0)
peaks, _ = find_peaks(gfp, distance=min_peak_dist)
n_peaks = len(peaks)
# Limit the number of peaks by randomly selecting them
if max_n_peaks is not None:
max_n_peaks = min(n_peaks, max_n_peaks)
if not isinstance(random_state, np.random.RandomState):
random_state = np.random.RandomState(random_state)
chosen_peaks = random_state.choice(n_peaks, size=max_n_peaks,
replace=False)
peaks = peaks[chosen_peaks]
# Cache this value for later
gfp_sum_sq = np.sum(gfp ** 2)
# Do several runs of the k-means algorithm, keep track of the best
# segmentation.
best_gev = 0
best_maps = None
best_segmentation = None
best_polarity = None
for _ in range(n_inits):
maps = _mod_kmeans(data[:, peaks], n_states, n_inits, max_iter, thresh,
random_state, verbose)
activation = maps.dot(data)
segmentation = np.argmax(np.abs(activation), axis=0)
map_corr = _corr_vectors(data, maps[segmentation].T)
# assigned_activations = np.choose(segmentations, activation)
# Compare across iterations using global explained variance (GEV) of
# the found microstates.
gev = sum((gfp * map_corr) ** 2) / gfp_sum_sq
logger.info('GEV of found microstates: %f' % gev)
if gev > best_gev:
best_gev, best_maps, best_segmentation = gev, maps, segmentation
best_polarity = np.sign(np.choose(segmentation, activation))
if return_polarity:
return best_maps, best_segmentation, best_polarity
else:
return best_maps, best_segmentation
@verbose
def _mod_kmeans(data, n_states=4, n_inits=10, max_iter=1000, thresh=1e-6,
random_state=None, verbose=None):
"""The modified K-means clustering algorithm.
See :func:`segment` for the meaning of the parameters and return
values.
"""
if not isinstance(random_state, np.random.RandomState):
random_state = np.random.RandomState(random_state)
n_channels, n_samples = data.shape
# Cache this value for later
data_sum_sq = np.sum(data ** 2)
# Select random timepoints for our initial topographic maps
init_times = random_state.choice(n_samples, size=n_states, replace=False)
maps = data[:, init_times].T
maps /= np.linalg.norm(maps, axis=1, keepdims=True) # Normalize the maps
prev_residual = np.inf
for iteration in range(max_iter):
# Assign each sample to the best matching microstate
activation = maps.dot(data)
segmentation = np.argmax(np.abs(activation), axis=0)
# Recompute the topographic maps of the microstates, based on the
# samples that were assigned to each state.
for state in range(n_states):
idx = (segmentation == state)
if np.sum(idx) == 0:
warnings.warn('Some microstates are never activated')
maps[state] = 0
continue
maps[state] = data[:, idx].dot(activation[state, idx])
maps[state] /= np.linalg.norm(maps[state])
# Estimate residual noise
act_sum_sq = np.sum(np.sum(maps[segmentation].T * data, axis=0) ** 2)
residual = abs(data_sum_sq - act_sum_sq)
residual /= float(n_samples * (n_channels - 1))
# Have we converged?
if (prev_residual - residual) < (thresh * residual):
logger.info('Converged at %d iterations.' % iteration)
break
prev_residual = residual
else:
warnings.warn('Modified K-means algorithm failed to converge.')
return maps
def _corr_vectors(A, B, axis=0):
"""Compute pairwise correlation of multiple pairs of vectors.
Fast way to compute correlation of multiple pairs of vectors without
computing all pairs as would with corr(A,B). Borrowed from Oli at Stack
overflow. Note the resulting coefficients vary slightly from the ones
obtained from corr due differences in the order of the calculations.
(Differences are of a magnitude of 1e-9 to 1e-17 depending of the tested
data).
Parameters
----------
A : ndarray, shape (n, m)
The first collection of vectors
B : ndarray, shape (n, m)
The second collection of vectors
axis : int
The axis that contains the elements of each vector. Defaults to 0.
Returns
-------
corr : ndarray, shape (m,)
For each pair of vectors, the correlation between them.
"""
An = A - np.mean(A, axis=axis)
Bn = B - np.mean(B, axis=axis)
An /= np.linalg.norm(An, axis=axis)
Bn /= np.linalg.norm(Bn, axis=axis)
return np.sum(An * Bn, axis=axis)
def plot_segmentation(segmentation, data, times, polarity=None):
"""Plot a microstate segmentation.
Parameters
----------
segmentation : list of int
For each sample in time, the index of the state to which the sample has
been assigned.
times : list of float
The time-stamp for each sample.
polarity : list of int | None
For each sample in time, the polarity (+1 or -1) of the activation.
"""
gfp = np.std(data, axis=0)
if polarity is not None:
gfp *= polarity
n_states = len(np.unique(segmentation))
plt.figure(figsize=(6 * np.ptp(times), 2))
cmap = plt.cm.get_cmap('plasma', n_states)
plt.plot(times, gfp, color='black', linewidth=1)
for state, color in zip(range(n_states), cmap.colors):
plt.fill_between(times, gfp, color=color,
where=(segmentation == state))
norm = mpl.colors.Normalize(vmin=0, vmax=n_states)
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
plt.colorbar(sm)
plt.xlabel('Time (s)')
plt.title('Segmentation into %d microstates' % n_states)
plt.autoscale(tight=True)
plt.tight_layout()
def plot_maps(maps, info):
"""Plot prototypical microstate maps.
Parameters
----------
maps : ndarray, shape (n_channels, n_maps)
The prototypical microstate maps.
info : instance of mne.io.Info
The info structure of the dataset, containing the location of the
sensors.
"""
plt.figure(figsize=(2 * len(maps), 2))
for i, map in enumerate(maps):
plt.subplot(1, len(maps), i + 1)
mne.viz.plot_topomap(map, info)
plt.title('%d' % i)