Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/lint fixes #45

Merged
merged 10 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ name: Step-time Biofeedback Project

on:
push:
branches:
branches:
- main
- feature/*
- bug/*
pull_request:
branches:
branches:
- main
- feature/*
- bug/*
workflow_dispatch:

env:
Expand Down Expand Up @@ -45,7 +49,7 @@ jobs:
run: |
source env/bin/activate
cd ${{github.workspace}}/backend
pytest -s websocketUnitTest.py
pytest -s websocket_unit_test.py


lint:
Expand All @@ -71,7 +75,7 @@ jobs:
run: |
cd ${{github.workspace}}/backend
pylint *.py || true

- name: Lint JavaScript files
run: |
cd ${{github.workspace}}/frontend
Expand Down
11 changes: 5 additions & 6 deletions backend/Step_Time_Calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ def calculate_step_time(force_data, threshold=20.0, moving_avg_factor=2):
step_time = time - step_start_time # Calculate step duration
step_times.append(step_time)
step_start_time = None # Reset for the next step

if moving_avg_factor < 2:
return step_times

for i in range(len(step_times)):
if i < moving_avg_factor - 1:
step_time_moving_averages.append(step_times[i])
step_time_moving_averages.append(step_times[i])
else:
moving_average = np.mean(step_times[i - moving_avg_factor + 1:i+1])
step_time_moving_averages.append(moving_average)

return step_time_moving_averages
moving_average = np.mean(step_times[i - moving_avg_factor + 1:i+1])
step_time_moving_averages.append(moving_average)

return step_time_moving_averages
23 changes: 12 additions & 11 deletions backend/connect_to_qtm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from pylsl import StreamInlet, resolve_stream
"""
Connect to QTM to allow stream of data to come in
"""

import time
from pylsl import StreamInlet, resolve_stream

def stream_data (stream_type= 'Force', sleep_time= 0.1):
"""
Expand All @@ -9,22 +13,19 @@ def stream_data (stream_type= 'Force', sleep_time= 0.1):
"""
streams = resolve_stream('type', 'Force')

if not streams:
if not streams:
print(f"No stream of type '{stream_type}' found.")
return

# create an inlet to read from the stream
return

inlet= StreamInlet(streams[0])

# pull and log samples
try:
while True:
try:
while True:
sample, timestamp = inlet.pull_sample()
print(f"Timestamp: {timestamp}, Sample: {sample}")
time.sleep(sleep_time)
except KeyboardInterrupt:
print("Streaming stopped by user.")
print("Streaming stopped by user.")

if __name__ == "__main__":
stream_data()

28 changes: 17 additions & 11 deletions backend/main.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,45 @@
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
"""
Main file responsable for connecting webscokets to backend
"""
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from target_zone_estimation import handle_data_streaming

app = FastAPI()
connectedClients = set()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
Function opens websocket and test situation and connectivity with frontend
"""
print("Waiting for Connection")
connectedClients.add(websocket)

try:
await websocket.accept()
print("Connection accepted")

while True:
try:
await handle_data_streaming(websocket)
# data = await websocket.receive_text()
# print(f"Received from client: {data}")
# for client in connectedClients:
# await client.send_text(f"Message Received: {data}")
data = await websocket.receive_text()
print(f"Received from client: {data}")
for client in connectedClients:
await client.send_text(f"Message Received: {data}")
except WebSocketDisconnect:
print("Client disconnected")
break
except asyncio.TimeoutError:
print("Timeout during communication")
break
except OSError as e:
print(f"Network error during communication: {e}")
except OSError as error_message:
print(f"Network error during communication: {error_message}")
break
except Exception as e:
print(f"Error occurred during communication: {e}")
except ValueError as error_message:
print(f"Error occurred during communication: {error_message}")
break
finally:
connectedClients.remove(websocket)
await websocket.close()
print("Connection closed")
print("Connection closed")
2 changes: 2 additions & 0 deletions backend/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
asyncio_default_fixture_loop_scope = function
28 changes: 14 additions & 14 deletions backend/target_zone_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from Step_Time_Calculation import calculate_step_time

real_force_data = [
(0.00000, 159.548187),
(0.00093, 159.304047),
(0.01296, 10.327484),
(0.01620, 159.327484),
(0.02000, 5.106781),
(0.03000, 158.862640),
(0.04000, 4.862640),
(0.05000, 160.000000),
(0.06000, 6.000000),
(0.00000, 159.548187),
(0.00093, 159.304047),
(0.01296, 10.327484),
(0.01620, 159.327484),
(0.02000, 5.106781),
(0.03000, 158.862640),
(0.04000, 4.862640),
(0.05000, 160.000000),
(0.06000, 6.000000),
]

threshold = 20.0
Expand All @@ -23,26 +23,26 @@ async def handle_data_streaming(websocket):
for force_data in real_force_data:
try:
print(f"Force Data: {force_data}") # Debug print to inspect the input data

# Accumulate force data over time
accumulated_data.append(force_data)

# Only process once we have multiple data points
if len(accumulated_data) > 1:
step_times = calculate_step_time(accumulated_data, threshold)
print(f"Calculated step times: {step_times}")

# Estimate the target zone based on step times
target_zone = estimate_target_zone(step_times)
message = {
"step_times": step_times,
"target_zone": target_zone
}
await websocket.send_text(json.dumps(message))
await asyncio.sleep(1)
await asyncio.sleep(1)
await asyncio.sleep(1)
except Exception as e:
print(f"Error occurred during data handling: {e}")
except Exception as error:
print(f"Error occurred during data handling: {error}")

def estimate_target_zone(step_times):
"""Estimate target zones based on step times."""
Expand Down
32 changes: 15 additions & 17 deletions backend/test_step_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@
from Step_Time_Calculation import calculate_step_time
from target_zone_estimation import estimate_target_zone

THRESHOLD = 20.0
THRESHOLD = 20.0

class TestStepTimeAndTargetZone(unittest.TestCase):

def test_no_steps_below_threshold(self):
"""Test that all values below threshold yield no step times."""
force_data = [(0.0, 0), (0.1, 0), (0.2, 10), (0.3, 15)]
step_times = calculate_step_time(force_data, THRESHOLD)
self.assertEqual(step_times, [])
self.assertEqual(step_times, [])

def test_step_time_calculation(self):
"""Test that the step times are calculated correctly with given data."""
force_data = [
(0.0, 0), (0.1, 25), (0.2, 30), (0.3, 0),
(0.4, 0), (0.5, 25), (0.6, 30), (0.7, 0)
(0.0, 0), (0.1, 25), (0.2, 30), (0.3, 0),
(0.4, 0), (0.5, 25), (0.6, 30), (0.7, 0)
]
step_times = calculate_step_time(force_data, THRESHOLD)
self.assertAlmostEqual(step_times[0], 0.2, places=2)
self.assertAlmostEqual(step_times[1], 0.2, places=2)
self.assertAlmostEqual(step_times[1], 0.2, places=2)

def test_moving_average_step_times(self):
"""Test that the calculated step times match expected moving averages."""
Expand All @@ -29,8 +29,8 @@ def test_moving_average_step_times(self):
(0.4, 0), (0.5, 25), (0.6, 30), (0.7, 0)
]
step_times = calculate_step_time(force_data, THRESHOLD)
self.assertAlmostEqual(step_times[0], 0.2, places=2)
self.assertAlmostEqual(step_times[1], 0.2, places=2)
self.assertAlmostEqual(step_times[0], 0.2, places=2)
self.assertAlmostEqual(step_times[1], 0.2, places=2)

def test_estimate_target_zone_with_steps(self):
"""Test target zone estimation with valid step times."""
Expand All @@ -51,21 +51,19 @@ def test_no_movement_target_zone(self):
def test_real_data_step_time_calculation(self):
"""Test step time calculation with real data."""
real_force_data = [
(0.00000, 159.548187),
(0.00093, 159.304047),
(0.01296, 158.327484),
(0.01620, 158.327484),
(0.05370, 157.106781),
(0.05972, 156.862640)
(0.00000, 159.548187),
(0.00093, 159.304047),
(0.01296, 158.327484),
(0.01620, 158.327484),
(0.05370, 157.106781),
(0.05972, 156.862640)
]



step_times = calculate_step_time(real_force_data, THRESHOLD)


expected_step_times = [] # Assuming no steps are calculated with this data

self.assertEqual(step_times, expected_step_times)

if __name__ == '__main__':
unittest.main()
unittest.main()
32 changes: 18 additions & 14 deletions backend/websocketConnect.py → backend/websocket_connect.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
"""
This modual is for testing how the websockets connect from the frontend to backend
"""

import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

biostepFeedback = FastAPI()

@biostepFeedback.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
Awaits for the message from the frontend to unsure connection is there
"""
print("Waiting for Connection")

try:
await websocket.accept()
print("Connection accepted")
except asyncio.TimeoutError:
print("Connection timed out")
return
except OSError as e:
print(f"Network error occurred: {e}")
except OSError as error:
print(f"Network error occurred: {error}")
return
except ValueError as error:
print(f"Value error occurred: {error}")
return
except Exception as e:
print(f"Error Occurred: {e}")
return

while True:
try:
data = await websocket.receive_text()
await websocket.send_text()
data = await websocket.receive_text()
await websocket.send_text(data)
print(f"Received and sent back: {data}")
except WebSocketDisconnect:
print("Client disconnected")
break
except asyncio.TimeoutError:
print("Timeout during communication")
break
except OSError as e:
print(f"Network error during communication: {e}")
break
except Exception as e:
print(f"Error occurred during communication: {e}")
except OSError as error:
print(f"Network error during communication: {error}")
break
Loading
Loading