diff --git a/framework/connection.py b/framework/connection.py index c73fac0..618c249 100644 --- a/framework/connection.py +++ b/framework/connection.py @@ -5,6 +5,7 @@ """ import subprocess import threading +import time import serial import constants as cons @@ -18,20 +19,29 @@ def __init__(self): 'c': "[TESTF-C]", 'py': "[TESTF-PY]", 'start': "[TESTF-C] START", - 'end': "[TESTF-C] END" + 'end': "[TESTF-C] END", + 'success': "[TESTF-C] SUCCESS", + 'failure': "[TESTF-C] FAILURE", + 'exit': "[TESTF-C] EXIT" } self.logger_commands = { f"{self.test_tags['c']} SEND_CHAR" : self.send_char, + f"{self.test_tags['c']} SET_TIMEOUT" : self.set_timeout, } self.log_level = { 'full' : self.echo_log_full, 'tf' : self.echo_log_tf, 'none' : self.echo_log_none } - - self.event_thread_finished = threading.Event() - self.event_stop_listener = threading.Event() + # create a list of threading events + # self.event_thread_finished = threading.Event() + # self.event_stop_listener = threading.Event() self.serial_port = "" + self.list_events = { + 'event_thread_finished': threading.Event(), + 'event_stop_listener': threading.Event(), + 'event_completed_test': threading.Event() + } def send_char(self, command): """" @@ -43,6 +53,38 @@ def send_char(self, command): else: self.serial_port.write(command[2].encode('utf-8') + b"\r\n") + def set_timeout(self, command): + """" + Create a thread to count x ms before finishing the listener thread + """ + self.list_events['event_completed_test'].clear() + command = command.split() + if len(command) < 3: + return + + timeout = int(command[2]) + self.list_events['event_thread_finished'].clear() + + def timeout_thread(timeout): + while timeout > 0: + if self.list_events['event_completed_test'].is_set(): + return + time.sleep(1/1000) + timeout -= 1 + + self.list_events['event_stop_listener'].set() + print(cons.RED_TEXT + + "Timeout reached" + + cons.RESET_COLOR) + + threading.Thread(target=timeout_thread, args=[timeout]).start() + + def unset_timeout(self): + """ + Cancel the timeout + """ + self.list_events['event_completed_test'].set() + def echo_log_full(self, serial_results): """ Print each line in the serial results. @@ -55,7 +97,7 @@ def echo_log_full(self, serial_results): """ for line in serial_results: print(line, end="") - self.event_thread_finished.set() + self.list_events['event_thread_finished'].set() def echo_log_tf(self, serial_results): """ @@ -79,13 +121,13 @@ def echo_log_tf(self, serial_results): if is_tf_section: print(line, end="") - self.event_thread_finished.set() + self.list_events['event_thread_finished'].set() def echo_log_none(self): """ Do not print any serial results. """ - self.event_thread_finished.set() + self.list_events['event_thread_finished'].set() def connect_to_platform_port(self, ports_list, echo): """ @@ -107,8 +149,8 @@ def connect_to_platform_port(self, ports_list, echo): for thread in threads: thread.start() - self.event_thread_finished.wait() - self.event_stop_listener.set() + self.list_events['event_thread_finished'].wait() + self.list_events['event_stop_listener'].set() for thread in threads: thread.join() @@ -126,7 +168,7 @@ def listener(self, ser_port, echo): ('#$#', '#') ] - while not self.event_stop_listener.is_set(): + while not self.list_events['event_stop_listener'].is_set(): res = b"" res_log = [] @@ -137,17 +179,15 @@ def listener(self, ser_port, echo): new_line = new_line.replace(old, new) res_log.append(new_line) - if self.test_tags['c'] in new_line: - cons.TEST_RESULTS = new_line - if self.test_tags['c'] in new_line: command = new_line.split()[0] + " " + new_line.split()[1] if command in self.logger_commands: self.logger_commands[command](new_line) else: cons.TEST_RESULTS = new_line + self.unset_timeout() - if self.event_stop_listener.is_set(): + if self.list_events['event_stop_listener'].is_set(): break for line in reversed(res_log): diff --git a/src/inc/testf_commands.h b/src/inc/testf_commands.h index 7c77bd3..316853b 100644 --- a/src/inc/testf_commands.h +++ b/src/inc/testf_commands.h @@ -15,5 +15,6 @@ #define COMMAND_START() PRINT_FUNCTION(TESTF_TAG "START" "\n") #define COMMAND_END() PRINT_FUNCTION(TESTF_TAG "END" "\n") #define COMMAND_SEND_CHAR(X) PRINT_FUNCTION(TESTF_TAG "SEND_CHAR " X "\n") +#define COMMAND_SEND_TIMEOUT(X) PRINT_FUNCTION(TESTF_TAG "SET_TIMEOUT " X "\n") #endif // TESTF_COMMANDS_H \ No newline at end of file