-
Notifications
You must be signed in to change notification settings - Fork 1
/
Tasktime.py
216 lines (169 loc) · 8.34 KB
/
Tasktime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# -*- coding: utf-8 -*-
"""
Estimate Relaxation from Band Powers
This example shows how to buffer, epoch, and transform EEG data from a single
electrode into values for each of the classic frequencies (e.g. alpha, beta, theta)
Furthermore, it shows how ratios of the band powers can be used to estimate
mental state for neurofeedback.
The neurofeedback protocols described here are inspired by
*Neurofeedback: A Comprehensive Review on System Design, Methodology and Clinical Applications* by Marzbani et. al
Adapted from https://github.com/NeuroTechX/bci-workshop
"""
import time as tt
from os import system as sys
from datetime import datetime, timedelta, time
import numpy as np # Module that simplifies computations on matrices
import matplotlib.pyplot as plt # Module used for plotting
from pylsl import StreamInlet, resolve_byprop # Module to receive EEG data
import utils # Our own utility functions
import win32com.client as wincl #Module used for speaking
class Band:
Delta = 0
Theta = 1
Alpha = 2
Beta = 3
""" EXPERIMENTAL PARAMETERS """
# Modify these to change aspects of the signal processing
# Length of the EEG data buffer (in seconds)
# This buffer will hold last n seconds of data and be used for calculations
BUFFER_LENGTH = 5
# Length of the epochs used to compute the FFT (in seconds)
EPOCH_LENGTH = 1
# Amount of overlap between two consecutive epochs (in seconds)
OVERLAP_LENGTH = 0.8
# Amount to 'shift' the start of each next consecutive epoch
SHIFT_LENGTH = EPOCH_LENGTH - OVERLAP_LENGTH
# Index of the channel(s) (electrodes) to be used
# 0 = left ear, 1 = left forehead, 2 = right forehead, 3 = right ear
INDEX_CHANNEL = [0]
if __name__ == "__main__":
""" 1. CONNECT TO EEG STREAM """
# Search for active LSL streams
print('Looking for an EEG stream...')
streams = resolve_byprop('type', 'EEG', timeout=2)
if len(streams) == 0:
raise RuntimeError('Can\'t find EEG stream.')
# Set active EEG stream to inlet and apply time correction
print("Start acquiring data")
inlet = StreamInlet(streams[0], max_chunklen=12)
eeg_time_correction = inlet.time_correction()
# Get the stream info and description
info = inlet.info()
description = info.desc()
# Get the sampling frequency
# This is an important value that represents how many EEG data points are
# collected in a second. This influences our frequency band calculation.
# for the Muse 2016, this should always be 256
fs = int(info.nominal_srate())
""" 2. INITIALIZE BUFFERS """
# Initialize raw EEG data buffer
eeg_buffer = np.zeros((int(fs * BUFFER_LENGTH), 1))
filter_state = None # for use with the notch filter
# Compute the number of epochs in "buffer_length"
n_win_test = int(np.floor((BUFFER_LENGTH - EPOCH_LENGTH) /
SHIFT_LENGTH + 1))
# Initialize the band power buffer (for plotting)
# bands will be ordered: [delta, theta, alpha, beta]
band_buffer = np.zeros((n_win_test, 4))
""" 3. GET DATA """
# The try/except structure allows to quit the while loop by aborting the
# script with <Ctrl-C>
print('Press Ctrl-C in the console to break the while loop.')
"""set time"""
#assinging the task time
print("""---
What hour do you have to stop in military time? (0-24)?
""")
taskh = int(input(""))
print("""What minute of the hour do you need to stop (1-59)?
""")
taskm = int(input(""))
print("---")
task = time(taskh,taskm)
try:
# The following loop acquires data, computes band powers, and calculates neurofeedback metrics based on those band powers
t = datetime.now()
while True:
""" 3.1 ACQUIRE DATA """
# Obtain EEG data from the LSL stream
eeg_data, timestamp = inlet.pull_chunk(
timeout=1, max_samples=int(SHIFT_LENGTH * fs))
# Only keep the channel we're interested in
ch_data = np.array(eeg_data)[:, INDEX_CHANNEL]
# Update EEG buffer with the new data
eeg_buffer, filter_state = utils.update_buffer(
eeg_buffer, ch_data, notch=True,
filter_state=filter_state)
""" 3.2 COMPUTE BAND POWERS """
# Get newest samples from the buffer
data_epoch = utils.get_last_data(eeg_buffer,
EPOCH_LENGTH * fs)
# Compute band powers
band_powers = utils.compute_band_powers(data_epoch, fs)
band_buffer, _ = utils.update_buffer(band_buffer,
np.asarray([band_powers]))
# Compute the average band powers for all epochs in buffer
# This helps to smooth out noise
smooth_band_powers = np.mean(band_buffer, axis=0)
# print('Delta: ', band_powers[Band.Delta], ' Theta: ', band_powers[Band.Theta],
# ' Alpha: ', band_powers[Band.Alpha], ' Beta: ', band_powers[Band.Beta])
""" 3.3 COMPUTE NEUROFEEDBACK METRICS """
# These metrics could also be used to drive brain-computer interfaces
# Alpha Protocol:
# Simple redout of alpha power, divided by delta waves in order to rule out noise
alpha_metric = smooth_band_powers[Band.Alpha] / \
smooth_band_powers[Band.Delta]
print('Alpha Relaxation: ', alpha_metric)
if alpha_metric > .5 and (datetime.now() - t).seconds > 3:
#assinging current time as a string to speak in standard 12hr format
time1 = datetime.now().strftime('%I').replace('0' , '') + ":" + datetime.now().strftime('%M %p')
speak = wincl.Dispatch("SAPI.SpVoice")
#assinging the current time in 24hr format
timeh = int(datetime.now().strftime('%H'))
timem = int(datetime.now().strftime('%M'))
#finding time differnce
diffh = taskh - timeh
diffm = taskm - timem
#adjusting for negative values
if diffm < 0:
diffm = diffm + 60
diffh = diffh - 1
if diffh < 0:
diffh = diffh + 24
#prints and says current time
print("---")
print("")
print("It is " + time1)
print("")
speak.Speak('It is ' + time1)
#printing and announcing time left
if diffh == 1 and diffm == 1:
print("You have " + str(diffh) + " hour and " + str(diffm) + " minute left")
speak.Speak("You have " + str(diffh) + " hour and " + str(diffm) + " minute left")
elif diffh == 1 and diffm > 1:
print("You have " + str(diffh) + " hour and " + str(diffm) + " minutes left")
speak.Speak("You have " + str(diffh) + " hour and " + str(diffm) + " minutes left")
elif diffh > 1 and diffm == 1:
print("You have " + str(diffh) + " hours and " + str(diffm) + " minute left")
speak.Speak("You have " + str(diffh) + " hours and " + str(diffm) + " minute left")
else:
print("You have " + str(diffh) + " hour and " + str(diffm) + " minutes left")
speak.Speak("You have " + str(diffh) + " hours and " + str(diffm) + " minutes left")
print("keep working, you got it")
speak.Speak("keep working, you got it")
break
# print(smooth_band_powers)
# Beta Protocol:
# Beta waves have been used as a measure of mental activity and concentration
# This beta over theta ratio is commonly used as neurofeedback for ADHD
# beta_metric = smooth_band_powers[Band.Beta] / \
# smooth_band_powers[Band.Theta]
# print('Beta Concentration: ', beta_metric)
# Alpha/Theta Protocol:
# This is another popular neurofeedback metric for stress reduction
# Higher theta over alpha is supposedly associated with reduced anxiety
# theta_metric = smooth_band_powers[Band.Theta] / \
# smooth_band_powers[Band.Alpha]
# print('Theta Relaxation: ', theta_metric)
except KeyboardInterrupt:
print('Closing!')