From a723476b0a4509826b50bb29cd653eae20a83837 Mon Sep 17 00:00:00 2001 From: Morg42 <43153739+Morg42@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:01:03 +0200 Subject: [PATCH 1/2] sdp: add delivery queue to serial connection for better timing --- lib/model/sdp/connection.py | 70 +++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/lib/model/sdp/connection.py b/lib/model/sdp/connection.py index a32be2298..24414424d 100644 --- a/lib/model/sdp/connection.py +++ b/lib/model/sdp/connection.py @@ -30,6 +30,7 @@ import requests import socket from threading import Lock, Thread +from queue import SimpleQueue from contextlib import contextmanager import json from importlib import import_module @@ -722,10 +723,6 @@ def _open(self): self._connection.open() self._is_connected = True self.logger.info(f'connected to {self._params[PLUGIN_ATTR_SERIAL_PORT]}') - self._connection_attempts = 0 - if self._params[PLUGIN_ATTR_CB_ON_CONNECT]: - self._params[PLUGIN_ATTR_CB_ON_CONNECT](self) - self._setup_listener() except (self.serial.SerialException, ValueError) as e: self.logger.error(f'error on connection to {self._params[PLUGIN_ATTR_SERIAL_PORT]}. Error was: {e}') self._connection_attempts = 0 @@ -734,6 +731,10 @@ def _open(self): self._lock.release() if self._is_connected: + self._connection_attempts = 0 + if self._params[PLUGIN_ATTR_CB_ON_CONNECT]: + self._params[PLUGIN_ATTR_CB_ON_CONNECT](self) + self._setup_listener() return True else: self.logger.debug(f'sleeping {self._params[PLUGIN_ATTR_CONN_CYCLE]} seconds before next connection attempt') @@ -837,9 +838,9 @@ def _send_bytes(self, packet): def _read_bytes(self, limit_response, clear_buffer=False): """ Try to read bytes from device, return read bytes - if limit_response is int > 0, try to read at least bytes - if limit_response is bytes() or bytearray(), try to read till receiving - if limit_response is 0, read until timeout (use with care...) + if limit_response is int > 0, try to read at least bytes (len mode) + if limit_response is bytes() or bytearray(), try to read till receiving (terminator mode) + if limit_response is 0, read until timeout (use with care...) (use on your own risk mode ;) ) :param limit_response: Number of bytes to read, b' for terminated read, 0 for unrestricted read (timeout) :return: read bytes @@ -871,7 +872,7 @@ def _read_bytes(self, limit_response, clear_buffer=False): with self._lock.acquire_timeout(self.__lock_timeout) as locked: if locked: - # don't wait for input indefinitely, stop after 3 * self._params[PLUGIN_ATTR_CONN_TIMEOUT] seconds + # don't wait for input indefinitely, stop after timeout_mult * self._params[PLUGIN_ATTR_CONN_TIMEOUT] seconds while time() <= starttime + self._timeout_mult * self._params[PLUGIN_ATTR_CONN_TIMEOUT]: readbyte = self._connection.read() self._lastbyte = readbyte @@ -926,7 +927,9 @@ class SDPConnectionSerialAsync(SDPConnectionSerial): The timeout needs to be set small enough not to block reading for too long. Recommended times are between 0.2 and 0.8 seconds. - The ``data_received_callback`` needs to be set or you won't get data. + The ``data_received_callback`` needs to be set or you won't get data. Due + to timing issues, read values are written into a queue and dispatched to + SDP by a separate worker thread. Callback syntax is: def connected_callback(by=None) @@ -937,7 +940,9 @@ def data_received_callback(by, message) def __init__(self, data_received_callback, name=None, **kwargs): # set additional class members self.__receive_thread = None + self.__queue_thread = None self._name = name if name else '' + self._queue = SimpleQueue() super().__init__(data_received_callback, name=name, **kwargs) @@ -946,6 +951,10 @@ def _setup_listener(self): return self._listener_active = True + self.__queue_thread = Thread(target=self.__queue_worker, name=self._name + '.SerialQueue') + self.__queue_thread.daemon = True + self.__queue_thread.start() + self.__receive_thread = Thread(target=self.__receive_thread_worker, name=self._name + '.Serial') self.__receive_thread.daemon = True self.__receive_thread.start() @@ -981,7 +990,7 @@ def __receive_thread_worker(self): # If we work in line mode (with a terminator) slice buffer into single chunks based on terminator if self._params[PLUGIN_ATTR_CONN_TERMINATOR]: __buffer += msg - while True: + while self._listener_active: # terminator = int means fixed size chunks if isinstance(self._params[PLUGIN_ATTR_CONN_TERMINATOR], int): i = self._params[PLUGIN_ATTR_CONN_TERMINATOR] @@ -995,18 +1004,49 @@ def __receive_thread_worker(self): i += len(self._params[PLUGIN_ATTR_CONN_TERMINATOR]) line = __buffer[:i] __buffer = __buffer[i:] - if self._data_received_callback: - self._data_received_callback(self, line if self._params[PLUGIN_ATTR_CONN_BINARY] else str(line, 'utf-8').strip()) - # If not in terminator mode just forward what we received + self._queue.put(line if self._params[PLUGIN_ATTR_CONN_BINARY] else str(line, 'utf-8').strip()) + # possibly deactivate in production? + self.logger.debug(f'put {line} in queue, queue size is {self._queue.qsize()}') + + else: + # forward what we received + self._queue.put(msg if self._params[PLUGIN_ATTR_CONN_BINARY] else str(msg, 'utf-8').strip()) + # possibly deactivate in production? + self.logger.debug(f'put {msg} in queue, queue size is {self._queue.qsize()}') if not self._listener_active: # socket shut down by self.close, no error self.logger.debug('serial connection shut down by call to close method') - return except Exception as e: if not self._listener_active: self.logger.debug(f'serial receive thread {self.__receive_thread.name} shutting down') - return else: self.logger.error(f'serial receive thread {self.__receive_thread.name} died with unexpected error: {e}') + + finally: + # clean up queue worker + + # make queue get() wait unblock by giving something to consume + self._queue.put(None) + + # close thread + try: + self.__queue_thread.join() + except Exception: + pass + + # delete thread as reuse is not possible (just to be sure) + self.__queue_thread = None + + def __queue_worker(self): + """ thread worker to get items from queue and pass them on to sdp """ + while self._listener_active: + item = self._queue.get() + # we could check for the callback outside the while loop, but the + # remote chance of the callback being set up "in operation" should + # not be dismissed... shame to that implementer, though! + # check also for listener_active as this is the "shutdown flag" -> + # don't send anything back if flag is unset + if self._data_received_callback and self._listener_active: + self._data_received_callback(self, item) From 699d7df37a4cb87bcfdee797380fafa76c22a331 Mon Sep 17 00:00:00 2001 From: Morg42 <43153739+Morg42@users.noreply.github.com> Date: Thu, 22 Aug 2024 17:36:24 +0200 Subject: [PATCH 2/2] connection.py aktualisieren --- lib/model/sdp/connection.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/model/sdp/connection.py b/lib/model/sdp/connection.py index 24414424d..979c7716b 100644 --- a/lib/model/sdp/connection.py +++ b/lib/model/sdp/connection.py @@ -1026,6 +1026,7 @@ def __receive_thread_worker(self): finally: # clean up queue worker + self._listener_active = False # make queue get() wait unblock by giving something to consume self._queue.put(None)