Skip to content

Commit

Permalink
feat(timeout): include support to set tests timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Diogo Costa <[email protected]>
  • Loading branch information
Diogo21Costa committed Sep 24, 2024
1 parent 4f13941 commit c73f641
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
68 changes: 54 additions & 14 deletions framework/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import subprocess
import threading
import time
import serial
import constants as cons

Expand All @@ -18,20 +19,29 @@ def __init__(self):
'c': "[TESTF-C]",
'py': "[TESTF-PY]",
'start': "[TESTF-C] START",
'end': "[TESTF-C] END"
'end': "[TESTF-C] END",
'success': "[TESTF-C] SUCCESS",
'failure': "[TESTF-C] FAILURE",
'exit': "[TESTF-C] EXIT"
}
self.logger_commands = {
f"{self.test_tags['c']} SEND_CHAR" : self.send_char,
f"{self.test_tags['c']} SET_TIMEOUT" : self.set_timeout,
}
self.log_level = {
'full' : self.echo_log_full,
'tf' : self.echo_log_tf,
'none' : self.echo_log_none
}

self.event_thread_finished = threading.Event()
self.event_stop_listener = threading.Event()
# create a list of threading events
# self.event_thread_finished = threading.Event()
# self.event_stop_listener = threading.Event()
self.serial_port = ""
self.list_events = {
'event_thread_finished': threading.Event(),
'event_stop_listener': threading.Event(),
'event_completed_test': threading.Event()
}

def send_char(self, command):
""""
Expand All @@ -43,6 +53,38 @@ def send_char(self, command):
else:
self.serial_port.write(command[2].encode('utf-8') + b"\r\n")

def set_timeout(self, command):
""""
Create a thread to count x ms before finishing the listener thread
"""
self.list_events['event_completed_test'].clear()
command = command.split()
if len(command) < 3:
return

timeout = int(command[2])
self.list_events['event_thread_finished'].clear()

def timeout_thread(timeout):
while timeout > 0:
if self.list_events['event_completed_test'].is_set():
return
time.sleep(1/1000)
timeout -= 1

self.list_events['event_stop_listener'].set()
print(cons.RED_TEXT +
"Timeout reached" +
cons.RESET_COLOR)

threading.Thread(target=timeout_thread, args=[timeout]).start()

def unset_timeout(self):
"""
Cancel the timeout
"""
self.list_events['event_completed_test'].set()

def echo_log_full(self, serial_results):
"""
Print each line in the serial results.
Expand All @@ -55,7 +97,7 @@ def echo_log_full(self, serial_results):
"""
for line in serial_results:
print(line, end="")
self.event_thread_finished.set()
self.list_events['event_thread_finished'].set()

def echo_log_tf(self, serial_results):
"""
Expand All @@ -79,13 +121,13 @@ def echo_log_tf(self, serial_results):
if is_tf_section:
print(line, end="")

self.event_thread_finished.set()
self.list_events['event_thread_finished'].set()

def echo_log_none(self):
"""
Do not print any serial results.
"""
self.event_thread_finished.set()
self.list_events['event_thread_finished'].set()

def connect_to_platform_port(self, ports_list, echo):
"""
Expand All @@ -107,8 +149,8 @@ def connect_to_platform_port(self, ports_list, echo):
for thread in threads:
thread.start()

self.event_thread_finished.wait()
self.event_stop_listener.set()
self.list_events['event_thread_finished'].wait()
self.list_events['event_stop_listener'].set()

for thread in threads:
thread.join()
Expand All @@ -126,7 +168,7 @@ def listener(self, ser_port, echo):
('#$#', '#')
]

while not self.event_stop_listener.is_set():
while not self.list_events['event_stop_listener'].is_set():
res = b""
res_log = []

Expand All @@ -137,17 +179,15 @@ def listener(self, ser_port, echo):
new_line = new_line.replace(old, new)
res_log.append(new_line)

if self.test_tags['c'] in new_line:
cons.TEST_RESULTS = new_line

if self.test_tags['c'] in new_line:
command = new_line.split()[0] + " " + new_line.split()[1]
if command in self.logger_commands:
self.logger_commands[command](new_line)
else:
cons.TEST_RESULTS = new_line
self.unset_timeout()

if self.event_stop_listener.is_set():
if self.list_events['event_stop_listener'].is_set():
break

for line in reversed(res_log):
Expand Down
1 change: 1 addition & 0 deletions src/inc/testf_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#define COMMAND_START() PRINT_FUNCTION(TESTF_TAG "START" "\n")
#define COMMAND_END() PRINT_FUNCTION(TESTF_TAG "END" "\n")
#define COMMAND_SEND_CHAR(X) PRINT_FUNCTION(TESTF_TAG "SEND_CHAR " X "\n")
#define COMMAND_SEND_TIMEOUT(X) PRINT_FUNCTION(TESTF_TAG "SET_TIMEOUT " X "\n")

#endif // TESTF_COMMANDS_H

0 comments on commit c73f641

Please sign in to comment.