Skip to content

Commit

Permalink
♻️ (tests): Functional/System - refactor in different files
Browse files Browse the repository at this point in the history
  • Loading branch information
ladislas committed Oct 29, 2024
1 parent 60ae005 commit 30c45f4
Show file tree
Hide file tree
Showing 6 changed files with 565 additions and 430 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mbed-cli>=1.10.0,<2.0
pyserial>=3,<=3.4
pyserial>=3,<=3.5
intelhex>=2.3.0,<3.0.0
prettytable>=2.0,<3.0
imgtool>=2.0.0,<3.0.0
colorama
128 changes: 128 additions & 0 deletions tools/modules/flash_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Leka - LekaOS
# Copyright 2024 APF France handicap
# SPDX-License-Identifier: Apache-2.0

"""
Utility functions for flashing the OS binary to the device.
"""

import logging
from pathlib import Path
import subprocess
import sys
import shutil
import time

from colorama import Fore, Style


def check_external_tools():
"""
Ensure that required external tools are available.
Raises:
SystemExit: If any required tool is missing.
"""
required_tools = ["openocd", "st-flash"]
missing_tools = [tool for tool in required_tools if not shutil.which(tool)]
if missing_tools:
logging.error(f"Missing required tools: {', '.join(missing_tools)}")
sys.exit(1)


def flash_os_and_reset(os_bin_path: Path) -> bool:
"""
Flash the OS binary and reset the device.
Args:
os_bin_path (Path): Path to the OS binary.
Returns:
bool: True if both flashing and resetting succeed; False otherwise.
"""
try:
flash_os(str(os_bin_path))
return True
except SystemExit:
return False


def flash_os(os_bin_path: str):
"""
Flash the OS binary to the device using OpenOCD.
Args:
os_bin_path (str): Path to the OS binary file.
Raises:
SystemExit: If flashing fails.
"""
print(f"Flashing {os_bin_path}...")
cmd_flash = (
f"openocd -f interface/stlink.cfg "
f"-c 'transport select hla_swd' "
f"-f target/stm32f7x.cfg "
f"-c 'program {os_bin_path} 0x08000000' "
f"-c exit"
)
print(cmd_flash)
flash = subprocess.run(cmd_flash, shell=True, check=False)
if flash.returncode != 0:
print(f"Flashing {os_bin_path}... {Fore.RED}{Style.RESET_ALL}")
sys.exit(1)
print(f"Flashing {os_bin_path}... {Fore.GREEN}{Style.RESET_ALL}")

time.sleep(1)

print("Resetting robot...")
cmd_reset = (
"openocd -f interface/stlink.cfg "
"-c 'transport select hla_swd' "
"-f target/stm32f7x.cfg "
"-c init -c 'reset run' "
"-c exit"
)
print(cmd_reset)
reset = subprocess.run(cmd_reset, shell=True, check=False)
if reset.returncode != 0:
print(f"Resetting robot... {Fore.RED}{Style.RESET_ALL}")
sys.exit(1)
print(f"Resetting robot... {Fore.GREEN}{Style.RESET_ALL}")

time.sleep(1)


def erase_flash():
"""
Erase the flash memory of the device using st-flash.
Raises:
SystemExit: If erasing fails.
"""
print("Erasing flash...")
cmd_erase = "st-flash --connect-under-reset --reset erase"
ret = subprocess.run(cmd_erase, shell=True)
if ret.returncode != 0:
print(f"Erasing flash... {Fore.RED}{Style.RESET_ALL}")
sys.exit(1)
print(f"Erasing flash... {Fore.GREEN}{Style.RESET_ALL}")


def print_end_success(message: str):
"""
Print a success message in cyan with a checkmark.
Args:
message (str): The message to print.
"""
print(f"{Fore.CYAN}{message}... ✅{Style.RESET_ALL}")


def print_end_failure(message: str):
"""
Print a failure message in red with a cross mark.
Args:
message (str): The message to print.
"""
print(f"{Fore.RED}{message}... ❌{Style.RESET_ALL}")
28 changes: 28 additions & 0 deletions tools/modules/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Leka - LekaOS
# Copyright 2024 APF France handicap
# SPDX-License-Identifier: Apache-2.0


"""
Configure logging settings based on verbosity level.
"""


import logging
import sys


def configure_logging(verbose: bool):
"""
Configure logging settings based on verbosity level.
Args:
verbose (bool): If True, set logging level to DEBUG; otherwise, INFO.
"""
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
if verbose:
logging.debug("Verbose logging enabled.")
139 changes: 139 additions & 0 deletions tools/modules/serial_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Leka - LekaOS
# Copyright 2024 APF France handicap
# SPDX-License-Identifier: Apache-2.0


"""
Utility functions for serial communication.
"""


import glob
import sys
from time import sleep
from typing import List, Optional
import serial
from colorama import Fore, Style

SERIAL_TIMEOUT = 0.1 # seconds


def connect_serial(port_pattern: str) -> serial.Serial:
"""
Connect to the first serial port matching the given pattern.
Args:
port_pattern (str): Glob pattern to match serial ports.
Returns:
serial.Serial: An open serial connection.
Raises:
SystemExit: If unable to connect to the serial port.
"""
ports = glob.glob(port_pattern)
serial_port = ports[0] if ports else port_pattern

try:
com = serial.Serial(serial_port, 115200, timeout=SERIAL_TIMEOUT)
print(f"Connected to {com.name}... {Fore.GREEN}{Style.RESET_ALL}")
return com
except serial.serialutil.SerialException as error:
print(
f"Error connecting to {serial_port}: {error}... {Fore.RED}{Style.RESET_ALL}"
)
sys.exit(1)


def reset_buffer(com: serial.Serial):
"""
Reset the serial input and output buffers and send a break signal.
Args:
com (serial.Serial): The serial connection to reset.
"""
print("Resetting COM buffer...")
try:
com.reset_input_buffer()
com.reset_output_buffer()
com.send_break(duration=1)
sleep(1)
print(f"Resetting COM buffer... {Fore.GREEN}{Style.RESET_ALL}")
except serial.SerialException as e:
print(f"Error resetting COM buffer: {e}... {Fore.RED}{Style.RESET_ALL}")
sys.exit(1)


def read_output_serial(com: serial.Serial) -> str:
"""
Read a line from the serial connection.
Args:
com (serial.Serial): The serial connection to read from.
Returns:
str: Decoded line from serial output.
"""
try:
data = com.readline().decode("utf-8").strip()
return data
except serial.SerialException as e:
print(f"Serial read error: {e}")
return ""


def wait_for_response(com: serial.Serial, response_timeout: float) -> Optional[str]:
"""
Wait for a response from the device within the specified timeout.
Args:
com (serial.Serial): The serial connection to read from.
response_timeout (float): Timeout in seconds.
Returns:
Optional[str]: The received data if any; otherwise, None.
"""
retries = int(response_timeout / 0.1)
for _ in range(retries):
sleep(0.1)
data = read_output_serial(com)
if data:
return data
return None


def wait_for_system_to_sleep(com: serial.Serial, duration: int) -> List[str]:
"""
Wait for the system to run for a specified duration, collecting relevant serial data.
Args:
com (serial.Serial): The serial connection to read from.
duration (int): Duration in seconds to wait.
Returns:
List[str]: Collected lines containing 'watchdog'.
"""
print("Waiting for system to run...")
data = []
for second in range(duration):
if com.in_waiting > 0:
print(f"{Fore.GREEN}{Style.RESET_ALL}", end="", flush=True)
lines = com.readlines()
for line in lines:
decoded_line = line.decode("utf-8", errors="replace").rstrip()
data.append(decoded_line)
else:
print("•", end="", flush=True)

if (second + 1) % 60 == 0 and (second + 1) != duration:
print()

sleep(1)

print()
filtered_data = [line for line in data if "watchdog" in line][-10:]
print("\n".join(filtered_data))
print(
f"Waiting for system to run for {duration} seconds... {Fore.GREEN}{Style.RESET_ALL}"
)
return filtered_data
Loading

0 comments on commit 30c45f4

Please sign in to comment.