Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Commit

Permalink
Retry on failed BTLE operations (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
cybe authored Feb 5, 2021
1 parent d29bc1e commit 5fd21c4
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 16 deletions.
6 changes: 5 additions & 1 deletion config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ manager:
topic: homeassistant/status
payload: online
command_timeout: 35 # Timeout for worker operations. Can be removed if the default of 35 seconds is sufficient.
command_retries: 0 # Number of retries for worker commands. Default is 0. Might not be supported for all workers.
update_retries: 0 # Number of retries for worker updates. Default is 0. Might not be supported for all workers.
workers:
mysensors:
command_timeout: 35 # Optional override of globally set command_timeout.
command_retries: 0 # Optional override of globally set command_retries.
update_retries: 0 # Optional override of globally set update_retries.
args:
port: /dev/ttyUSB0
baudrate: 9600
Expand Down Expand Up @@ -159,4 +163,4 @@ manager:
topic_prefix: blinds
per_device_timeout: 40
topic_subscription: blinds/+/+/+
update_interval: 300
update_interval: 300
2 changes: 2 additions & 0 deletions const.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
DEFAULT_COMMAND_TIMEOUT = 35 # In seconds
DEFAULT_PER_DEVICE_TIMEOUT = 8 # In seconds
DEFAULT_COMMAND_RETRIES = 0
DEFAULT_UPDATE_RETRIES = 0
2 changes: 1 addition & 1 deletion logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def suppress_update_failures(suppress):


def log_exception(logger, message, *args, **kwargs):
if not ("suppress" in kwargs and kwargs.pop("suppress") and SUPPRESSION_ENABLED):
if not (kwargs.pop('suppress', False) and SUPPRESSION_ENABLED):
if logger.isEnabledFor(logging.DEBUG):
logger.exception(message, *args, **kwargs)
elif logger.isEnabledFor(logging.WARNING):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ paho-mqtt
pyyaml
interruptingcow
apscheduler
tenacity
38 changes: 37 additions & 1 deletion workers/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import logger

import functools
import logging

import tenacity

_LOGGER = logger.get(__name__)


class BaseWorker:
def __init__(self, command_timeout, global_topic_prefix, **kwargs):
def __init__(self, command_timeout, command_retries, update_retries, global_topic_prefix, **kwargs):
self.command_timeout = command_timeout
self.command_retries = command_retries
self.update_retries = update_retries
self.global_topic_prefix = global_topic_prefix
for arg, value in kwargs.items():
setattr(self, arg, value)
Expand Down Expand Up @@ -81,3 +91,29 @@ def log_unspecified_exception(self, named_logger, dev_name, exception):
type(exception).__name__,
suppress=True,
)

def retry(_func=None, *, retries=0, exception_type=Exception):
def log_retry(retry_state):
_LOGGER.info(
'Call to %s failed the %s time (%s). Retrying in %s seconds',
'.'.join((retry_state.fn.__module__, retry_state.fn.__name__)),
retry_state.attempt_number,
type(retry_state.outcome.exception()).__name__,
'{:.2f}'.format(getattr(retry_state.next_action, 'sleep')))

def decorator_retry(func):
@functools.wraps(func)
def wrapped_retry(*args, **kwargs):
retryer = tenacity.Retrying(
wait=tenacity.wait_random(1, 3),
retry=tenacity.retry_if_exception_type(exception_type),
stop=tenacity.stop_after_attempt(retries+1),
reraise=True,
before_sleep=log_retry)
return retryer(func, *args, **kwargs)
return wrapped_retry

if _func:
return decorator_retry(_func)
else:
return decorator_retry
4 changes: 2 additions & 2 deletions workers/miflora.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from mqtt import MqttMessage, MqttConfigMessage

from interruptingcow import timeout
from workers.base import BaseWorker
from workers.base import BaseWorker, retry
import logger

REQUIREMENTS = [
Expand Down Expand Up @@ -111,7 +111,7 @@ def status_update(self):

try:
with timeout(self.per_device_timeout, exception=DeviceTimeoutError):
yield self.update_device_state(name, data["poller"])
yield retry(self.update_device_state, retries=self.update_retries, exception_type=BluetoothBackendException)(name, data["poller"])
except BluetoothBackendException as e:
logger.log_exception(
_LOGGER,
Expand Down
4 changes: 2 additions & 2 deletions workers/mithermometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from mqtt import MqttMessage, MqttConfigMessage
from interruptingcow import timeout

from workers.base import BaseWorker
from workers.base import BaseWorker, retry
import logger

REQUIREMENTS = ["mithermometer==0.1.4", "bluepy"]
Expand Down Expand Up @@ -76,7 +76,7 @@ def status_update(self):

try:
with timeout(self.per_device_timeout, exception=DeviceTimeoutError):
yield self.update_device_state(name, data["poller"])
yield retry(self.update_device_state, retries=self.update_retries, exception_type=BluetoothBackendException)(name, data["poller"])
except BluetoothBackendException as e:
logger.log_exception(
_LOGGER,
Expand Down
14 changes: 7 additions & 7 deletions workers/thermostat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from mqtt import MqttMessage, MqttConfigMessage

from workers.base import BaseWorker
from workers.base import BaseWorker, retry
import logger

REQUIREMENTS = ["python-eq3bt==0.1.11"]
Expand Down Expand Up @@ -192,7 +192,7 @@ def status_update(self):
_LOGGER.debug("Updating %s device '%s' (%s)", repr(self), name, data["mac"])
thermostat = data["thermostat"]
try:
thermostat.update()
retry(thermostat.update, retries=self.update_retries, exception_type=btle.BTLEException)()
except btle.BTLEException as e:
logger.log_exception(
_LOGGER,
Expand All @@ -204,7 +204,7 @@ def status_update(self):
suppress=True,
)
else:
yield self.present_device_state(name, thermostat)
yield retry(self.present_device_state, retries=self.update_retries, exception_type=btle.BTLEException)(name, thermostat)

def on_command(self, topic, value):
from bluepy import btle
Expand Down Expand Up @@ -268,11 +268,11 @@ def on_command(self, topic, value):
try:
if method == "preset":
if value == HOLD_COMFORT:
thermostat.activate_comfort()
retry(thermostat.activate_comfort, retries=self.command_retries, exception_type=btle.BTLEException)()
else:
thermostat.activate_eco()
retry(thermostat.activate_eco, retries=self.command_retries, exception_type=btle.BTLEException)()
else:
setattr(thermostat, method, value)
retry(setattr, retries=self.command_retries, exception_type=btle.BTLEException)(thermostat, method, value)
except btle.BTLEException as e:
logger.log_exception(
_LOGGER,
Expand All @@ -286,7 +286,7 @@ def on_command(self, topic, value):
)
return []

return self.present_device_state(device_name, thermostat)
return retry(self.present_device_state, retries=self.command_retries, exception_type=btle.BTLEException)(device_name, thermostat)

def present_device_state(self, name, thermostat):
from eq3bt import Mode
Expand Down
12 changes: 10 additions & 2 deletions workers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from interruptingcow import timeout
from pytz import utc

from const import DEFAULT_COMMAND_TIMEOUT
from const import DEFAULT_COMMAND_TIMEOUT, DEFAULT_COMMAND_RETRIES, DEFAULT_UPDATE_RETRIES
from exceptions import WorkerTimeoutError
from workers_queue import _WORKERS_QUEUE
import logger
Expand Down Expand Up @@ -65,6 +65,8 @@ def __init__(self, config):
self._daemons = []
self._config = config
self._command_timeout = config.get("command_timeout", DEFAULT_COMMAND_TIMEOUT)
self._command_retries = config.get("command_retries", DEFAULT_COMMAND_RETRIES)
self._update_retries = config.get("update_retries", DEFAULT_UPDATE_RETRIES)

def register_workers(self, global_topic_prefix):
for (worker_name, worker_config) in self._config["workers"].items():
Expand All @@ -74,8 +76,14 @@ def register_workers(self, global_topic_prefix):
command_timeout = worker_config.get(
"command_timeout", self._command_timeout
)
command_retries = worker_config.get(
"command_retries", self._command_retries
)
update_retries = worker_config.get(
"update_retries", self._update_retries
)
worker_obj = klass(
command_timeout, global_topic_prefix, **worker_config["args"]
command_timeout, command_retries, update_retries, global_topic_prefix, **worker_config["args"]
)

if "sensor_config" in self._config and hasattr(worker_obj, "config"):
Expand Down

4 comments on commit 5fd21c4

@mak-42
Copy link

@mak-42 mak-42 commented on 5fd21c4 Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit broke BlescanmultiWorker. It still waits for 3 arguments. The result:

workers_manager.py", line 86, in register_workers
command_timeout, command_retries, update_retries, global_topic_prefix, **worker_config["args"]
TypeError: init() takes 3 positional arguments but 5 were given

@zewelor
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cybe Could you take a look ?

@cybe
Copy link
Contributor Author

@cybe cybe commented on 5fd21c4 Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On it.

@cybe
Copy link
Contributor Author

@cybe cybe commented on 5fd21c4 Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor-arguments were hardcoded in BlescanmultiWorker. This fixes it: #217

Please sign in to comment.