Skip to content

Commit

Permalink
Merge pull request #677 from Morg42/sdp-2
Browse files Browse the repository at this point in the history
sdp: add delivery queue to serial connection for better timing
  • Loading branch information
Morg42 authored Aug 23, 2024
2 parents cc0644a + 699d7df commit b6a17c4
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions lib/model/sdp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import requests
import socket
from threading import Lock, Thread
from queue import SimpleQueue
from contextlib import contextmanager
import json
from importlib import import_module
Expand Down Expand Up @@ -722,10 +723,6 @@ def _open(self):
self._connection.open()
self._is_connected = True
self.logger.info(f'connected to {self._params[PLUGIN_ATTR_SERIAL_PORT]}')
self._connection_attempts = 0
if self._params[PLUGIN_ATTR_CB_ON_CONNECT]:
self._params[PLUGIN_ATTR_CB_ON_CONNECT](self)
self._setup_listener()
except (self.serial.SerialException, ValueError) as e:
self.logger.error(f'error on connection to {self._params[PLUGIN_ATTR_SERIAL_PORT]}. Error was: {e}')
self._connection_attempts = 0
Expand All @@ -734,6 +731,10 @@ def _open(self):
self._lock.release()

if self._is_connected:
self._connection_attempts = 0
if self._params[PLUGIN_ATTR_CB_ON_CONNECT]:
self._params[PLUGIN_ATTR_CB_ON_CONNECT](self)
self._setup_listener()
return True
else:
self.logger.debug(f'sleeping {self._params[PLUGIN_ATTR_CONN_CYCLE]} seconds before next connection attempt')
Expand Down Expand Up @@ -837,9 +838,9 @@ def _send_bytes(self, packet):
def _read_bytes(self, limit_response, clear_buffer=False):
"""
Try to read bytes from device, return read bytes
if limit_response is int > 0, try to read at least <limit_response> bytes
if limit_response is bytes() or bytearray(), try to read till receiving <limit_response>
if limit_response is 0, read until timeout (use with care...)
if limit_response is int > 0, try to read at least <limit_response> bytes (len mode)
if limit_response is bytes() or bytearray(), try to read till receiving <limit_response> (terminator mode)
if limit_response is 0, read until timeout (use with care...) (use on your own risk mode ;) )
:param limit_response: Number of bytes to read, b'<terminator> for terminated read, 0 for unrestricted read (timeout)
:return: read bytes
Expand Down Expand Up @@ -871,7 +872,7 @@ def _read_bytes(self, limit_response, clear_buffer=False):
with self._lock.acquire_timeout(self.__lock_timeout) as locked:

if locked:
# don't wait for input indefinitely, stop after 3 * self._params[PLUGIN_ATTR_CONN_TIMEOUT] seconds
# don't wait for input indefinitely, stop after timeout_mult * self._params[PLUGIN_ATTR_CONN_TIMEOUT] seconds
while time() <= starttime + self._timeout_mult * self._params[PLUGIN_ATTR_CONN_TIMEOUT]:
readbyte = self._connection.read()
self._lastbyte = readbyte
Expand Down Expand Up @@ -926,7 +927,9 @@ class SDPConnectionSerialAsync(SDPConnectionSerial):
The timeout needs to be set small enough not to block reading for too long.
Recommended times are between 0.2 and 0.8 seconds.
The ``data_received_callback`` needs to be set or you won't get data.
The ``data_received_callback`` needs to be set or you won't get data. Due
to timing issues, read values are written into a queue and dispatched to
SDP by a separate worker thread.
Callback syntax is:
def connected_callback(by=None)
Expand All @@ -937,7 +940,9 @@ def data_received_callback(by, message)
def __init__(self, data_received_callback, name=None, **kwargs):
# set additional class members
self.__receive_thread = None
self.__queue_thread = None
self._name = name if name else ''
self._queue = SimpleQueue()

super().__init__(data_received_callback, name=name, **kwargs)

Expand All @@ -946,6 +951,10 @@ def _setup_listener(self):
return

self._listener_active = True
self.__queue_thread = Thread(target=self.__queue_worker, name=self._name + '.SerialQueue')
self.__queue_thread.daemon = True
self.__queue_thread.start()

self.__receive_thread = Thread(target=self.__receive_thread_worker, name=self._name + '.Serial')
self.__receive_thread.daemon = True
self.__receive_thread.start()
Expand Down Expand Up @@ -981,7 +990,7 @@ def __receive_thread_worker(self):
# If we work in line mode (with a terminator) slice buffer into single chunks based on terminator
if self._params[PLUGIN_ATTR_CONN_TERMINATOR]:
__buffer += msg
while True:
while self._listener_active:
# terminator = int means fixed size chunks
if isinstance(self._params[PLUGIN_ATTR_CONN_TERMINATOR], int):
i = self._params[PLUGIN_ATTR_CONN_TERMINATOR]
Expand All @@ -995,18 +1004,50 @@ def __receive_thread_worker(self):
i += len(self._params[PLUGIN_ATTR_CONN_TERMINATOR])
line = __buffer[:i]
__buffer = __buffer[i:]
if self._data_received_callback:
self._data_received_callback(self, line if self._params[PLUGIN_ATTR_CONN_BINARY] else str(line, 'utf-8').strip())
# If not in terminator mode just forward what we received
self._queue.put(line if self._params[PLUGIN_ATTR_CONN_BINARY] else str(line, 'utf-8').strip())
# possibly deactivate in production?
self.logger.debug(f'put {line} in queue, queue size is {self._queue.qsize()}')

else:
# forward what we received
self._queue.put(msg if self._params[PLUGIN_ATTR_CONN_BINARY] else str(msg, 'utf-8').strip())
# possibly deactivate in production?
self.logger.debug(f'put {msg} in queue, queue size is {self._queue.qsize()}')

if not self._listener_active:
# socket shut down by self.close, no error
self.logger.debug('serial connection shut down by call to close method')
return

except Exception as e:
if not self._listener_active:
self.logger.debug(f'serial receive thread {self.__receive_thread.name} shutting down')
return
else:
self.logger.error(f'serial receive thread {self.__receive_thread.name} died with unexpected error: {e}')

finally:
# clean up queue worker
self._listener_active = False

# make queue get() wait unblock by giving something to consume
self._queue.put(None)

# close thread
try:
self.__queue_thread.join()
except Exception:
pass

# delete thread as reuse is not possible (just to be sure)
self.__queue_thread = None

def __queue_worker(self):
""" thread worker to get items from queue and pass them on to sdp """
while self._listener_active:
item = self._queue.get()
# we could check for the callback outside the while loop, but the
# remote chance of the callback being set up "in operation" should
# not be dismissed... shame to that implementer, though!
# check also for listener_active as this is the "shutdown flag" ->
# don't send anything back if flag is unset
if self._data_received_callback and self._listener_active:
self._data_received_callback(self, item)

0 comments on commit b6a17c4

Please sign in to comment.