diff --git a/gateway.py b/gateway.py index 431f4d3..619338c 100755 --- a/gateway.py +++ b/gateway.py @@ -1,31 +1,42 @@ #!/usr/bin/env python3 -import logging import sys if sys.version_info < (3, 5): print("To use this script you need python 3.5 or newer! got %s" % sys.version_info) sys.exit(1) -from logger import _LOGGER +import logger +logger.setup() + +import logging import argparse +import queue + from workers_queue import _WORKERS_QUEUE from config import settings from mqtt import MqttClient from workers_manager import WorkersManager + parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group() -group.add_argument('-d', '--debug', action='store_true', default=False) -group.add_argument('-q', '--quiet', action='store_true', default=False) +group.add_argument('-d', '--debug', action='store_true', default=False, help='Set logging to output debug information') +group.add_argument('-q', '--quiet', action='store_true', default=False, help='Set logging to just output warnings and errors') +parser.add_argument('-s', '--suppress-update-failures', dest='suppress', action='store_true', default=False, help='Suppress any errors regarding failed device updates') parsed = parser.parse_args() -if parsed.debug: +_LOGGER = logger.get() +if parsed.quiet: + _LOGGER.setLevel(logging.WARNING) +elif parsed.debug: _LOGGER.setLevel(logging.DEBUG) + logger.enable_debug_formatter() else: _LOGGER.setLevel(logging.INFO) +logger.suppress_update_failures(parsed.suppress) -_LOGGER.debug('Starting') +_LOGGER.info('Starting') mqtt = MqttClient(settings['mqtt']) manager = WorkersManager() @@ -33,16 +44,16 @@ running = True -try: - while running: - try: - mqtt.publish(_WORKERS_QUEUE.get(block=True).execute()) - except (KeyboardInterrupt, SystemExit): - raise - except Exception as e: - if not parsed.quiet: - _LOGGER.exception(e) -except KeyboardInterrupt: - running = False - _LOGGER.info('Exiting allowing jobs to finish. If you need force exit use kill') - +while running: + try: + mqtt.publish(_WORKERS_QUEUE.get(timeout=10).execute()) + except queue.Empty: # Allow for SIGINT processing + pass + except TimeoutError: + logger.log_exception(_LOGGER, "Timeout while executing worker command", suppress=True) + except (KeyboardInterrupt, SystemExit): + running = False + _LOGGER.info('Finish current jobs and shut down. If you need force exit use kill') + except Exception as e: + logger.log_exception(_LOGGER, "Fatal error while executing worker command: %s", type(e).__name__) + raise e diff --git a/logger.py b/logger.py index f8054dd..a3ba2bf 100644 --- a/logger.py +++ b/logger.py @@ -1,6 +1,50 @@ import logging +import logging.config +import yaml -import sys +APP_ROOT = 'bt-mqtt-gw' +SUPPRESSION_ENABLED = False -_LOGGER = logging.getLogger('bt-mqtt-gw') -_LOGGER.addHandler(logging.StreamHandler(sys.stdout)) + +def setup(): + with open('logger.yaml', 'rt') as f: + config = yaml.safe_load(f.read()) + logging.config.dictConfig(config) + + +def get(name=None): + if name: + logger_name = '{}.{}'.format(APP_ROOT, name) + else: + logger_name = APP_ROOT + return logging.getLogger(logger_name) + + +def enable_debug_formatter(): + logging.getLogger().handlers[0].setFormatter(logging.getLogger('dummy_debug').handlers[0].formatter) + + +def reset(): + app_level = get().getEffectiveLevel() + + root = logging.getLogger() + map(root.removeHandler, root.handlers[:]) + map(root.removeFilter, root.filters[:]) + + setup() + get().setLevel(app_level) + if app_level <= logging.DEBUG: + enable_debug_formatter() + + +def suppress_update_failures(suppress): + global SUPPRESSION_ENABLED + SUPPRESSION_ENABLED = suppress + + +def log_exception(logger, message, *args, **kwargs): + if not ('suppress' in kwargs and kwargs.pop('suppress') and SUPPRESSION_ENABLED): + if logger.isEnabledFor(logging.DEBUG): + logger.exception(message, *args, **kwargs) + elif logger.isEnabledFor(logging.WARNING): + logger.warning(message, *args, **kwargs) diff --git a/logger.yaml b/logger.yaml new file mode 100644 index 0000000..0469232 --- /dev/null +++ b/logger.yaml @@ -0,0 +1,29 @@ +version: 1 +disable_existing_loggers: True + +formatters: + default: + format: '%(asctime)s %(message)s' + datefmt: '%X' + minimal: + format: '%(message)s' + debug: + format: '%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)d:%(funcName)s - %(message)s' + +handlers: + console: + class: logging.StreamHandler + formatter: default + stream: ext://sys.stdout + dummy_debug: + class: logging.NullHandler + formatter: debug + +loggers: + bt-mqtt-gw: + level: INFO + dummy_debug: + handlers: [dummy_debug] + +root: + handlers: [console] diff --git a/mqtt.py b/mqtt.py index 4c8b3dd..fd4d2cb 100644 --- a/mqtt.py +++ b/mqtt.py @@ -1,8 +1,9 @@ import paho.mqtt.client as mqtt -from logger import _LOGGER +import logger LWT_ONLINE = 'online' LWT_OFFLINE = 'offline' +_LOGGER = logger.get(__name__) class MqttClient: diff --git a/workers/base.py b/workers/base.py index e88f5a8..222ee94 100644 --- a/workers/base.py +++ b/workers/base.py @@ -9,3 +9,6 @@ def _setup(self): def format_topic(self, *args): return '/'.join([self.topic_prefix, *args]) + + def __repr__(self): + return self.__module__.split(".")[-1] diff --git a/workers/blescanmulti.py b/workers/blescanmulti.py index 612e9df..dbb0c23 100644 --- a/workers/blescanmulti.py +++ b/workers/blescanmulti.py @@ -1,12 +1,15 @@ import time -from interruptingcow import timeout + from bluepy.btle import Scanner, DefaultDelegate from mqtt import MqttMessage -from utils import booleanize + from workers.base import BaseWorker -from logger import _LOGGER +from utils import booleanize +import logger REQUIREMENTS = ['bluepy'] +_LOGGER = logger.get(__name__) + class ScanDelegate(DefaultDelegate): diff --git a/workers/linakdesk.py b/workers/linakdesk.py index a133fb5..21a2cc3 100644 --- a/workers/linakdesk.py +++ b/workers/linakdesk.py @@ -15,7 +15,7 @@ def status_update(self): return [MqttMessage(topic=self.format_topic('height/cm'), payload=self._get_height())] def _get_height(self): - with timeout(20): + with timeout(20, exception=TimeoutError): self.desk.read_dpg_data() return self.desk.current_height_with_offset.cm diff --git a/workers/miflora.py b/workers/miflora.py index a85e9d4..5aaf531 100644 --- a/workers/miflora.py +++ b/workers/miflora.py @@ -1,29 +1,37 @@ -from interruptingcow import timeout +import logging + from mqtt import MqttMessage + from workers.base import BaseWorker +import logger REQUIREMENTS = ['git+https://github.com/open-homeautomation/miflora.git@84f39432082796412d05b754c948499a1ad710e7#egg=miflora', 'bluepy'] - monitoredAttrs = ["temperature", "moisture", "light", "conductivity", "battery"] +_LOGGER = logger.get(__name__) + class MifloraWorker(BaseWorker): def _setup(self): from miflora.miflora_poller import MiFloraPoller from btlewrap.bluepy import BluepyBackend + _LOGGER.info("Adding %d %s devices", len(self.devices), repr(self)) for name, mac in self.devices.items(): + _LOGGER.debug("Adding %s device '%s' (%s)", repr(self), name, mac) self.devices[name] = MiFloraPoller(mac, BluepyBackend) def status_update(self): + from btlewrap.base import BluetoothBackendException + _LOGGER.info("Updating %d %s devices", len(self.devices), repr(self)) ret = [] for name, poller in self.devices.items(): + _LOGGER.debug("Updating %s device '%s' (%s)", repr(self), name, poller._mac) try: ret += self.update_device_state(name, poller) - except RuntimeError: - pass + except BluetoothBackendException as e: + logger.log_exception(_LOGGER, "Error during update of %s device '%s' (%s): %s", repr(self), name, poller._mac, type(e).__name__, suppress=True) return ret - @timeout(8.0) def update_device_state(self, name, poller): ret = [] poller.clear_cache() diff --git a/workers/miscale.py b/workers/miscale.py index 15ff526..72017d9 100644 --- a/workers/miscale.py +++ b/workers/miscale.py @@ -21,7 +21,7 @@ def _get_weight(self): scanner = btle.Scanner().withDelegate(scan_processor) scanner.scan(5, passive=True) - with timeout(5): + with timeout(5, exception=TimeoutError): while scan_processor.weight is None: time.sleep(1) return scan_processor.weight diff --git a/workers/mithermometer.py b/workers/mithermometer.py index 90b9c9f..981c16a 100644 --- a/workers/mithermometer.py +++ b/workers/mithermometer.py @@ -1,31 +1,37 @@ -from interruptingcow import timeout +import logging + from mqtt import MqttMessage + from workers.base import BaseWorker +import logger REQUIREMENTS = ['mithermometer==0.1.2'] - monitoredAttrs = ["temperature", "humidity", "battery"] +_LOGGER = logger.get(__name__) + class MithermometerWorker(BaseWorker): def _setup(self): from mithermometer.mithermometer_poller import MiThermometerPoller from btlewrap.bluepy import BluepyBackend - + _LOGGER.info("Adding %d %s devices", len(self.devices), repr(self)) for name, mac in self.devices.items(): + _LOGGER.debug("Adding %s device '%s' (%s)", repr(self), name, mac) self.devices[name] = MiThermometerPoller(mac, BluepyBackend) def status_update(self): + from btlewrap.base import BluetoothBackendException + _LOGGER.info("Updating %d %s devices", len(self.devices), repr(self)) ret = [] for name, poller in self.devices.items(): + _LOGGER.debug("Updating %s device '%s' (%s)", repr(self), name, poller._mac) try: ret += self.update_device_state(name, poller) - except RuntimeError: - pass - + except BluetoothBackendException as e: + logger.log_exception(_LOGGER, "Error during update of %s device '%s' (%s): %s", repr(self), name, poller._mac, type(e).__name__, suppress=True) return ret - @timeout(8.0) def update_device_state(self, name, poller): ret = [] poller.clear_cache() diff --git a/workers/mysensors.py b/workers/mysensors.py index 799b05b..0f3ff45 100644 --- a/workers/mysensors.py +++ b/workers/mysensors.py @@ -1,9 +1,11 @@ +import serial from mqtt import MqttMessage + from workers.base import BaseWorker -from logger import _LOGGER -import serial +import logger REQUIREMENTS = ['pyserial'] +_LOGGER = logger.get(__name__) class MysensorsWorker(BaseWorker): diff --git a/workers/thermostat.py b/workers/thermostat.py index 8fde016..0f03e39 100644 --- a/workers/thermostat.py +++ b/workers/thermostat.py @@ -1,12 +1,14 @@ from builtins import staticmethod +import logging -from interruptingcow import timeout from mqtt import MqttMessage + from workers.base import BaseWorker +import logger REQUIREMENTS = ['python-eq3bt'] - monitoredAttrs = ["low_battery", "valve_state", "target_temperature", "window_open", "locked"] +_LOGGER = logger.get(__name__) STATE_AWAY = 'away' STATE_ECO = 'eco' @@ -15,6 +17,7 @@ STATE_ON = 'on' STATE_OFF = 'off' + class ThermostatWorker(BaseWorker): class ModesMapper(): def __init__(self): @@ -57,7 +60,9 @@ def on_off_to_mode(on_off): def _setup(self): from eq3bt import Thermostat + _LOGGER.info("Adding %d %s devices", len(self.devices), repr(self)) for name, mac in self.devices.items(): + _LOGGER.debug("Adding %s device '%s' (%s)", repr(self), name, mac) self.devices[name] = Thermostat(mac) self._modes_mapper = self.ModesMapper() @@ -66,15 +71,17 @@ def status_update(self): from bluepy import btle ret = [] + _LOGGER.info("Updating %d %s devices", len(self.devices), repr(self)) for name, thermostat in self.devices.items(): + _LOGGER.debug("Updating %s device '%s' (%s)", repr(self), name, thermostat._conn._mac) try: ret += self.update_device_state(name, thermostat) - except (RuntimeError, btle.BTLEException): - pass - + except btle.BTLEException as e: + logger.log_exception(_LOGGER, "Error during update of %s device '%s' (%s): %s", repr(self), name, thermostat._conn._mac, type(e).__name__, suppress=True) return ret def on_command(self, topic, value): + from bluepy import btle _, device_name, method, _ = topic.split('/') thermostat = self.devices[device_name] @@ -91,10 +98,19 @@ def on_command(self, topic, value): elif method == "target_temperature": value = float(value) - setattr(thermostat, method, value) - return self.update_device_state(device_name, thermostat) + _LOGGER.info("Setting %s to %s on %s device '%s' (%s)", method, value, repr(self), device_name, thermostat._conn._mac) + try: + setattr(thermostat, method, value) + except btle.BTLEException as e: + logger.log_exception(_LOGGER, "Error setting %s to %s on %s device '%s' (%s): %s", method, value, repr(self), device_name, thermostat._conn._mac, type(e).__name__) + return [] + + try: + return self.update_device_state(device_name, thermostat) + except btle.BTLEException as e: + logger.log_exception(_LOGGER, "Error during update of %s device '%s' (%s): %s", repr(self), device_name, thermostat._conn._mac, type(e).__name__, suppress=True) + return [] - @timeout(8.0) def update_device_state(self, name, thermostat): thermostat.update() diff --git a/workers/toothbrush.py b/workers/toothbrush.py index 00f13df..2029b6e 100644 --- a/workers/toothbrush.py +++ b/workers/toothbrush.py @@ -1,11 +1,14 @@ import time -from interruptingcow import timeout + from bluepy.btle import Scanner, DefaultDelegate from mqtt import MqttMessage + from workers.base import BaseWorker -from logger import _LOGGER +import logger REQUIREMENTS = ['bluepy'] +_LOGGER = logger.get(__name__) + class ScanDelegate(DefaultDelegate): def __init__(self): diff --git a/workers_manager.py b/workers_manager.py index 8cf8fa7..b168ae5 100644 --- a/workers_manager.py +++ b/workers_manager.py @@ -1,5 +1,14 @@ import importlib import threading +from functools import partial +import logging + +from apscheduler.schedulers.background import BackgroundScheduler +from interruptingcow import timeout +from pytz import utc + +from workers_queue import _WORKERS_QUEUE +import logger from pip import __version__ as pip_version if int(pip_version.split('.')[0]) >= 10: @@ -7,12 +16,8 @@ else: from pip import main as pip_main -from apscheduler.schedulers.background import BackgroundScheduler -from interruptingcow import timeout -from functools import partial -from logger import _LOGGER -from workers_queue import _WORKERS_QUEUE -from pytz import utc +_LOGGER = logger.get(__name__) + class WorkersManager: class Command: @@ -23,10 +28,10 @@ def __init__(self, callback, args=(), options=dict()): def execute(self): messages = [] - with timeout(35): + with timeout(35, exception=TimeoutError): messages = self._callback(*self._args) - _LOGGER.debug(messages) + _LOGGER.debug("Command execution result: %s", messages) return messages def __init__(self): @@ -46,7 +51,7 @@ def register_workers(self, config): worker_obj = klass(**worker_config['args']) if hasattr(worker_obj, 'status_update'): - _LOGGER.debug("Added: %s with %d seconds interval" % (worker_name, worker_config['update_interval'])) + _LOGGER.debug("Added %s worker with %d seconds interval", repr(worker_obj), worker_config['update_interval']) command = self.Command(worker_obj.status_update, []) self._update_commands.append(command) @@ -61,7 +66,7 @@ def register_workers(self, config): partial(self._update_interval_wrapper, command, job_id) )) elif hasattr(worker_obj, 'run'): - _LOGGER.debug("Registered: %s as daemon" % (worker_name)) + _LOGGER.debug("Registered %s as daemon", repr(worker_obj)) self._daemons.append(worker_obj) else: raise "%s cannot be initialized, it has to define run or status_update method" % worker_name @@ -105,21 +110,21 @@ def _queue_command(command): def _pip_install_helper(package_names): for package in package_names: pip_main(['install', '-q', package]) + logger.reset() def _update_interval_wrapper(self, command, job_id, client, userdata, c): - _LOGGER.debug("Recieved updated interval for %s with: %s", c.topic, c.payload) + _LOGGER.info("Recieved updated interval for %s with: %s", c.topic, c.payload) try: new_interval = int(c.payload) self._scheduler.remove_job(job_id) self._scheduler.add_job( - partial(self._queue_command, command), 'interval', - seconds=new_interval, id=job_id - ) + partial(self._queue_command, command), 'interval', + seconds=new_interval, id=job_id) except ValueError: - _LOGGER.info("New interval invalid, recieved: %s", c.payload) + logger.log_exception(_LOGGER, 'Ignoring invalid new interval: %s', c.payload) def _on_command_wrapper(self, worker_obj, client, userdata, c): - _LOGGER.debug("on command wrapper for with %s: %s", c.topic, c.payload) + _LOGGER.debug("Received command for %s on %s: %s", repr(worker_obj), c.topic, c.payload) global_topic_prefix = userdata['global_topic_prefix'] topic = c.topic[len(global_topic_prefix+'/'):] if global_topic_prefix is not None else c.topic self._queue_command(self.Command(worker_obj.on_command, [topic, c.payload]))