From 144839f1bdf4d2dd6260df8282eb51396f9dffb5 Mon Sep 17 00:00:00 2001 From: zewelor Date: Fri, 6 Sep 2019 09:16:09 +0200 Subject: [PATCH] Black reformat (#100) --- const.py | 2 +- exceptions.py | 4 +- gateway.py | 78 +++-- logger.py | 52 +-- mqtt.py | 306 +++++++++-------- utils.py | 3 +- workers/base.py | 52 +-- workers/blescanmulti.py | 188 ++++++----- workers/ibbq.py | 330 +++++++++--------- workers/linakdesk.py | 49 ++- workers/miflora.py | 195 ++++++----- workers/miscale.py | 94 +++--- workers/mithermometer.py | 142 ++++---- workers/mysensors.py | 28 +- workers/smartgadget.py | 153 +++++---- workers/switchbot.py | 147 ++++---- workers/thermostat.py | 497 +++++++++++++++++----------- workers/toothbrush.py | 103 ++++-- workers/toothbrush_homeassistant.py | 273 ++++++++------- workers_manager.py | 347 +++++++++++-------- workers_queue.py | 1 - 21 files changed, 1745 insertions(+), 1299 deletions(-) diff --git a/const.py b/const.py index a40e86b..9154974 100644 --- a/const.py +++ b/const.py @@ -1,2 +1,2 @@ DEFAULT_COMMAND_TIMEOUT = 35 # In seconds -PER_DEVICE_TIMEOUT = 6 # In seconds +PER_DEVICE_TIMEOUT = 6 # In seconds diff --git a/exceptions.py b/exceptions.py index 21d59fc..c7f0d75 100644 --- a/exceptions.py +++ b/exceptions.py @@ -1,6 +1,6 @@ class WorkerTimeoutError(Exception): - pass + pass class DeviceTimeoutError(Exception): - pass + pass diff --git a/gateway.py b/gateway.py index c2a4141..1ab0ff3 100755 --- a/gateway.py +++ b/gateway.py @@ -5,10 +5,11 @@ from exceptions import WorkerTimeoutError if sys.version_info < (3, 5): - print("To use this script you need python 3.5 or newer! got %s" % sys.version_info) - sys.exit(1) + print("To use this script you need python 3.5 or newer! got %s" % sys.version_info) + sys.exit(1) import logger + logger.setup() import logging @@ -23,41 +24,68 @@ parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group() -group.add_argument('-d', '--debug', action='store_true', default=False, help='Set logging to output debug information') -group.add_argument('-q', '--quiet', action='store_true', default=False, help='Set logging to just output warnings and errors') -parser.add_argument('-s', '--suppress-update-failures', dest='suppress', action='store_true', default=False, help='Suppress any errors regarding failed device updates') +group.add_argument( + "-d", + "--debug", + action="store_true", + default=False, + help="Set logging to output debug information", +) +group.add_argument( + "-q", + "--quiet", + action="store_true", + default=False, + help="Set logging to just output warnings and errors", +) +parser.add_argument( + "-s", + "--suppress-update-failures", + dest="suppress", + action="store_true", + default=False, + help="Suppress any errors regarding failed device updates", +) parsed = parser.parse_args() _LOGGER = logger.get() if parsed.quiet: - _LOGGER.setLevel(logging.WARNING) + _LOGGER.setLevel(logging.WARNING) elif parsed.debug: - _LOGGER.setLevel(logging.DEBUG) - logger.enable_debug_formatter() + _LOGGER.setLevel(logging.DEBUG) + logger.enable_debug_formatter() else: - _LOGGER.setLevel(logging.INFO) + _LOGGER.setLevel(logging.INFO) logger.suppress_update_failures(parsed.suppress) -_LOGGER.info('Starting') +_LOGGER.info("Starting") -global_topic_prefix = settings['mqtt'].get('topic_prefix') +global_topic_prefix = settings["mqtt"].get("topic_prefix") -mqtt = MqttClient(settings['mqtt']) -manager = WorkersManager(settings['manager']) +mqtt = MqttClient(settings["mqtt"]) +manager = WorkersManager(settings["manager"]) manager.register_workers(global_topic_prefix).start(mqtt) running = True while running: - try: - mqtt.publish(_WORKERS_QUEUE.get(timeout=10).execute()) - except queue.Empty: # Allow for SIGINT processing - pass - except WorkerTimeoutError as e: - logger.log_exception(_LOGGER, str(e) if str(e) else 'Timeout while executing worker command', suppress=True) - except (KeyboardInterrupt, SystemExit): - running = False - _LOGGER.info('Finish current jobs and shut down. If you need force exit use kill') - except Exception as e: - logger.log_exception(_LOGGER, "Fatal error while executing worker command: %s", type(e).__name__) - raise e + try: + mqtt.publish(_WORKERS_QUEUE.get(timeout=10).execute()) + except queue.Empty: # Allow for SIGINT processing + pass + except WorkerTimeoutError as e: + logger.log_exception( + _LOGGER, + str(e) if str(e) else "Timeout while executing worker command", + suppress=True, + ) + except (KeyboardInterrupt, SystemExit): + running = False + _LOGGER.info( + "Finish current jobs and shut down. If you need force exit use kill" + ) + except Exception as e: + logger.log_exception( + _LOGGER, "Fatal error while executing worker command: %s", type(e).__name__ + ) + raise e diff --git a/logger.py b/logger.py index a3ba2bf..2d3e6ff 100644 --- a/logger.py +++ b/logger.py @@ -2,49 +2,51 @@ import logging.config import yaml -APP_ROOT = 'bt-mqtt-gw' +APP_ROOT = "bt-mqtt-gw" SUPPRESSION_ENABLED = False def setup(): - with open('logger.yaml', 'rt') as f: - config = yaml.safe_load(f.read()) - logging.config.dictConfig(config) + with open("logger.yaml", "rt") as f: + config = yaml.safe_load(f.read()) + logging.config.dictConfig(config) def get(name=None): - if name: - logger_name = '{}.{}'.format(APP_ROOT, name) - else: - logger_name = APP_ROOT - return logging.getLogger(logger_name) + if name: + logger_name = "{}.{}".format(APP_ROOT, name) + else: + logger_name = APP_ROOT + return logging.getLogger(logger_name) def enable_debug_formatter(): - logging.getLogger().handlers[0].setFormatter(logging.getLogger('dummy_debug').handlers[0].formatter) + logging.getLogger().handlers[0].setFormatter( + logging.getLogger("dummy_debug").handlers[0].formatter + ) def reset(): - app_level = get().getEffectiveLevel() + app_level = get().getEffectiveLevel() - root = logging.getLogger() - map(root.removeHandler, root.handlers[:]) - map(root.removeFilter, root.filters[:]) + root = logging.getLogger() + map(root.removeHandler, root.handlers[:]) + map(root.removeFilter, root.filters[:]) - setup() - get().setLevel(app_level) - if app_level <= logging.DEBUG: - enable_debug_formatter() + setup() + get().setLevel(app_level) + if app_level <= logging.DEBUG: + enable_debug_formatter() def suppress_update_failures(suppress): - global SUPPRESSION_ENABLED - SUPPRESSION_ENABLED = suppress + global SUPPRESSION_ENABLED + SUPPRESSION_ENABLED = suppress def log_exception(logger, message, *args, **kwargs): - if not ('suppress' in kwargs and kwargs.pop('suppress') and SUPPRESSION_ENABLED): - if logger.isEnabledFor(logging.DEBUG): - logger.exception(message, *args, **kwargs) - elif logger.isEnabledFor(logging.WARNING): - logger.warning(message, *args, **kwargs) + if not ("suppress" in kwargs and kwargs.pop("suppress") and SUPPRESSION_ENABLED): + if logger.isEnabledFor(logging.DEBUG): + logger.exception(message, *args, **kwargs) + elif logger.isEnabledFor(logging.WARNING): + logger.warning(message, *args, **kwargs) diff --git a/mqtt.py b/mqtt.py index 37d8685..3982327 100644 --- a/mqtt.py +++ b/mqtt.py @@ -3,167 +3,187 @@ import paho.mqtt.client as mqtt import logger -LWT_ONLINE = 'online' -LWT_OFFLINE = 'offline' +LWT_ONLINE = "online" +LWT_OFFLINE = "offline" _LOGGER = logger.get(__name__) -class MqttClient: - - def __init__(self, config): - self._config = config - self._mqttc = mqtt.Client(client_id=self.client_id, - clean_session=False, - userdata={'global_topic_prefix': self.topic_prefix}) - - if self.username and self.password: - self.mqttc.username_pw_set(self.username, self.password) - - if self.ca_cert: - cert_reqs = mqtt.ssl.CERT_REQUIRED if self.ca_verify else mqtt.ssl.CERT_NONE - self.mqttc.tls_set(self.ca_cert, cert_reqs = cert_reqs) - self.mqttc.tls_insecure_set(not self.ca_verify) - - if self.availability_topic: - topic = self._format_topic(self.availability_topic) - _LOGGER.debug("Setting LWT to: %s" % topic) - self.mqttc.will_set(topic, payload=LWT_OFFLINE, retain=True) - - def publish(self, messages): - if not messages: - return - - for m in messages: - if m.use_global_prefix: - topic = self._format_topic(m.topic) - else: - topic = m.topic - self.mqttc.publish(topic, m.payload, retain=m.retain) - - @property - def client_id(self): - return self._config['client_id'] if 'client_id' in self._config else 'bt-mqtt-gateway' - - @property - def hostname(self): - return self._config['host'] - - @property - def port(self): - return self._config['port'] if 'port' in self._config else 1883 - - @property - def username(self): - return self._config['username'] if 'username' in self._config else None - - @property - def password(self): - return self._config['password'] if 'password' in self._config else None - - @property - def ca_cert(self): - return self._config['ca_cert'] if 'ca_cert' in self._config else None - @property - def ca_verify(self): - if 'ca_verify' in self._config: - # Constrain config input to boolean value - if self._config['ca_verify']: - return True - else: - return False - else: - return True - - @property - def topic_prefix(self): - return self._config['topic_prefix'] if 'topic_prefix' in self._config else None - - @property - def availability_topic(self): - return self._config['availability_topic'] if 'availability_topic' in self._config else None - - @property - def mqttc(self): - return self._mqttc - - def on_connect(self, client, userdata, flags, rc): - if self.availability_topic: - self.publish([MqttMessage(topic=self.availability_topic, payload=LWT_ONLINE, retain=True)]) - - def callbacks_subscription(self, callbacks): - self.mqttc.on_connect = self.on_connect - - self.mqttc.connect(self.hostname, port=self.port) - - for topic, callback in callbacks: - topic = self._format_topic(topic) - _LOGGER.debug("Subscribing to: %s" % topic) - self.mqttc.message_callback_add(topic, callback) - self.mqttc.subscribe(topic) - - self.mqttc.loop_start() - - def __del__(self): - if self.availability_topic: - self.publish([MqttMessage(topic=self.availability_topic, payload=LWT_OFFLINE, retain=True)]) - - def _format_topic(self, topic): - return "{}/{}".format(self.topic_prefix, topic) if self.topic_prefix else topic +class MqttClient: + def __init__(self, config): + self._config = config + self._mqttc = mqtt.Client( + client_id=self.client_id, + clean_session=False, + userdata={"global_topic_prefix": self.topic_prefix}, + ) + + if self.username and self.password: + self.mqttc.username_pw_set(self.username, self.password) + + if self.ca_cert: + cert_reqs = mqtt.ssl.CERT_REQUIRED if self.ca_verify else mqtt.ssl.CERT_NONE + self.mqttc.tls_set(self.ca_cert, cert_reqs=cert_reqs) + self.mqttc.tls_insecure_set(not self.ca_verify) + + if self.availability_topic: + topic = self._format_topic(self.availability_topic) + _LOGGER.debug("Setting LWT to: %s" % topic) + self.mqttc.will_set(topic, payload=LWT_OFFLINE, retain=True) + + def publish(self, messages): + if not messages: + return + + for m in messages: + if m.use_global_prefix: + topic = self._format_topic(m.topic) + else: + topic = m.topic + self.mqttc.publish(topic, m.payload, retain=m.retain) + + @property + def client_id(self): + return ( + self._config["client_id"] + if "client_id" in self._config + else "bt-mqtt-gateway" + ) + + @property + def hostname(self): + return self._config["host"] + + @property + def port(self): + return self._config["port"] if "port" in self._config else 1883 + + @property + def username(self): + return self._config["username"] if "username" in self._config else None + + @property + def password(self): + return self._config["password"] if "password" in self._config else None + + @property + def ca_cert(self): + return self._config["ca_cert"] if "ca_cert" in self._config else None + + @property + def ca_verify(self): + if "ca_verify" in self._config: + # Constrain config input to boolean value + if self._config["ca_verify"]: + return True + else: + return False + else: + return True + + @property + def topic_prefix(self): + return self._config["topic_prefix"] if "topic_prefix" in self._config else None + + @property + def availability_topic(self): + return ( + self._config["availability_topic"] + if "availability_topic" in self._config + else None + ) + + @property + def mqttc(self): + return self._mqttc + + def on_connect(self, client, userdata, flags, rc): + if self.availability_topic: + self.publish( + [ + MqttMessage( + topic=self.availability_topic, payload=LWT_ONLINE, retain=True + ) + ] + ) + + def callbacks_subscription(self, callbacks): + self.mqttc.on_connect = self.on_connect + + self.mqttc.connect(self.hostname, port=self.port) + + for topic, callback in callbacks: + topic = self._format_topic(topic) + _LOGGER.debug("Subscribing to: %s" % topic) + self.mqttc.message_callback_add(topic, callback) + self.mqttc.subscribe(topic) + + self.mqttc.loop_start() + + def __del__(self): + if self.availability_topic: + self.publish( + [ + MqttMessage( + topic=self.availability_topic, payload=LWT_OFFLINE, retain=True + ) + ] + ) + + def _format_topic(self, topic): + return "{}/{}".format(self.topic_prefix, topic) if self.topic_prefix else topic class MqttMessage: - use_global_prefix = True + use_global_prefix = True - def __init__(self, topic=None, payload=None, retain=False): - self._topic = topic - self._payload = payload - self._retain = retain + def __init__(self, topic=None, payload=None, retain=False): + self._topic = topic + self._payload = payload + self._retain = retain - @property - def topic(self): - return self._topic + @property + def topic(self): + return self._topic - @topic.setter - def topic(self, new_topic): - self._topic = new_topic + @topic.setter + def topic(self, new_topic): + self._topic = new_topic - @property - def payload(self): - return self._payload + @property + def payload(self): + return self._payload - @property - def retain(self): - return self._retain + @property + def retain(self): + return self._retain - @property - def as_dict(self): - return { - 'topic': self.topic, - 'payload': self.payload - } + @property + def as_dict(self): + return {"topic": self.topic, "payload": self.payload} - def __repr__(self): - return self.as_dict.__str__() + def __repr__(self): + return self.as_dict.__str__() - def __str__(self): - return self.__repr__() + def __str__(self): + return self.__repr__() class MqttConfigMessage(MqttMessage): - SENSOR = 'sensor' - CLIMATE = 'climate' - BINARY_SENSOR = 'binary_sensor' - - use_global_prefix = False + SENSOR = "sensor" + CLIMATE = "climate" + BINARY_SENSOR = "binary_sensor" - def __init__(self, component, name, payload=None, retain=False): - super().__init__("{}/{}/config".format(component, name), json.dumps(payload), retain) + use_global_prefix = False - @property - def retain(self): - return self._retain + def __init__(self, component, name, payload=None, retain=False): + super().__init__( + "{}/{}/config".format(component, name), json.dumps(payload), retain + ) - @retain.setter - def retain(self, new_retain): - self._retain = new_retain + @property + def retain(self): + return self._retain + @retain.setter + def retain(self, new_retain): + self._retain = new_retain diff --git a/utils.py b/utils.py index 0696b41..7c5bd99 100644 --- a/utils.py +++ b/utils.py @@ -1,5 +1,4 @@ - -true_statement = ('y', 'yes', 'on', '1', 'true', 't', ) +true_statement = ("y", "yes", "on", "1", "true", "t") def booleanize(value) -> bool: diff --git a/workers/base.py b/workers/base.py index de4e495..21ff095 100644 --- a/workers/base.py +++ b/workers/base.py @@ -1,33 +1,35 @@ class BaseWorker: - def __init__(self, command_timeout, global_topic_prefix, **kwargs): - self.command_timeout = command_timeout - self.global_topic_prefix = global_topic_prefix - for arg, value in kwargs.items(): - setattr(self, arg, value) - self._setup() + def __init__(self, command_timeout, global_topic_prefix, **kwargs): + self.command_timeout = command_timeout + self.global_topic_prefix = global_topic_prefix + for arg, value in kwargs.items(): + setattr(self, arg, value) + self._setup() - def _setup(self): - return + def _setup(self): + return - def format_discovery_topic(self, mac, *sensor_args): - node_id = mac.replace(':', '-') - object_id = '_'.join([repr(self), *sensor_args]) - return '{}/{}'.format(node_id, object_id) + def format_discovery_topic(self, mac, *sensor_args): + node_id = mac.replace(":", "-") + object_id = "_".join([repr(self), *sensor_args]) + return "{}/{}".format(node_id, object_id) - def format_discovery_id(self, mac, *sensor_args): - return 'bt-mqtt-gateway/{}'.format(self.format_discovery_topic(mac, *sensor_args)) + def format_discovery_id(self, mac, *sensor_args): + return "bt-mqtt-gateway/{}".format( + self.format_discovery_topic(mac, *sensor_args) + ) - def format_discovery_name(self, *sensor_args): - return '_'.join([repr(self), *sensor_args]) + def format_discovery_name(self, *sensor_args): + return "_".join([repr(self), *sensor_args]) - def format_topic(self, *topic_args): - return '/'.join([self.topic_prefix, *topic_args]) + def format_topic(self, *topic_args): + return "/".join([self.topic_prefix, *topic_args]) - def format_prefixed_topic(self, *topic_args): - topic = self.format_topic(*topic_args) - if self.global_topic_prefix: - return '{}/{}'.format(self.global_topic_prefix, topic) - return topic + def format_prefixed_topic(self, *topic_args): + topic = self.format_topic(*topic_args) + if self.global_topic_prefix: + return "{}/{}".format(self.global_topic_prefix, topic) + return topic - def __repr__(self): - return self.__module__.split(".")[-1] + def __repr__(self): + return self.__module__.split(".")[-1] diff --git a/workers/blescanmulti.py b/workers/blescanmulti.py index c6605b6..cbaef29 100644 --- a/workers/blescanmulti.py +++ b/workers/blescanmulti.py @@ -7,100 +7,116 @@ from utils import booleanize import logger -REQUIREMENTS = ['bluepy'] +REQUIREMENTS = ["bluepy"] _LOGGER = logger.get(__name__) - class ScanDelegate(DefaultDelegate): - def __init__(self): - DefaultDelegate.__init__(self) + def __init__(self): + DefaultDelegate.__init__(self) - def handleDiscovery(self, dev, isNewDev, isNewData): - if isNewDev: - _LOGGER.debug("Discovered new device: %s" % dev.addr) + def handleDiscovery(self, dev, isNewDev, isNewData): + if isNewDev: + _LOGGER.debug("Discovered new device: %s" % dev.addr) class BleDeviceStatus: - def __init__(self, worker, mac: str, name: str, available: bool = False, last_status_time: float = None, - message_sent: bool = True): - if last_status_time is None: - last_status_time = time.time() - - self.worker = worker # type: BlescanmultiWorker - self.mac = mac.lower() - self.name = name - self.available = available - self.last_status_time = last_status_time - self.message_sent = message_sent - - def set_status(self, available): - if available != self.available: - self.available = available - self.last_status_time = time.time() - self.message_sent = False - - def _timeout(self): - if self.available: - return self.worker.available_timeout - else: - return self.worker.unavailable_timeout - - def has_time_elapsed(self): - elapsed = time.time() - self.last_status_time - return elapsed > self._timeout() - - def payload(self): - if self.available: - return self.worker.available_payload - else: - return self.worker.unavailable_payload - - def generate_messages(self, device): - messages = [] - if not self.message_sent and self.has_time_elapsed(): - self.message_sent = True - messages.append( - MqttMessage(topic=self.worker.format_topic('presence/{}'.format(self.name)), payload=self.payload()) - ) - if self.available: - messages.append( - MqttMessage(topic=self.worker.format_topic('presence/{}/rssi'.format(self.name)), payload=device.rssi) - ) - return messages + def __init__( + self, + worker, + mac: str, + name: str, + available: bool = False, + last_status_time: float = None, + message_sent: bool = True, + ): + if last_status_time is None: + last_status_time = time.time() + + self.worker = worker # type: BlescanmultiWorker + self.mac = mac.lower() + self.name = name + self.available = available + self.last_status_time = last_status_time + self.message_sent = message_sent + + def set_status(self, available): + if available != self.available: + self.available = available + self.last_status_time = time.time() + self.message_sent = False + + def _timeout(self): + if self.available: + return self.worker.available_timeout + else: + return self.worker.unavailable_timeout + + def has_time_elapsed(self): + elapsed = time.time() - self.last_status_time + return elapsed > self._timeout() + + def payload(self): + if self.available: + return self.worker.available_payload + else: + return self.worker.unavailable_payload + + def generate_messages(self, device): + messages = [] + if not self.message_sent and self.has_time_elapsed(): + self.message_sent = True + messages.append( + MqttMessage( + topic=self.worker.format_topic("presence/{}".format(self.name)), + payload=self.payload(), + ) + ) + if self.available: + messages.append( + MqttMessage( + topic=self.worker.format_topic( + "presence/{}/rssi".format(self.name) + ), + payload=device.rssi, + ) + ) + return messages class BlescanmultiWorker(BaseWorker): - # Default values - devices = {} - # Payload that should be send when device is available - available_payload = 'home' # type: str - # Payload that should be send when device is unavailable - unavailable_payload = 'not_home' # type: str - # After what time (in seconds) we should inform that device is available (default: 0 seconds) - available_timeout = 0 # type: float - # After what time (in seconds) we should inform that device is unavailable (default: 60 seconds) - unavailable_timeout = 60 # type: float - scan_timeout = 10. # type: float - scan_passive = True # type: str or bool - - def __init__(self, command_timeout, global_topic_prefix, **kwargs): - super(BlescanmultiWorker, self).__init__(command_timeout, global_topic_prefix, **kwargs) - self.scanner = Scanner().withDelegate(ScanDelegate()) - self.last_status = [ - BleDeviceStatus(self, mac, name) for name, mac in self.devices.items() - ] - - def status_update(self): - devices = self.scanner.scan(float(self.scan_timeout), passive=booleanize(self.scan_passive)) - mac_addresses = { - device.addr: device for device in devices - } - ret = [] - - for status in self.last_status: - device = mac_addresses.get(status.mac, None) - status.set_status(device is not None) - ret += status.generate_messages(device) - - return ret + # Default values + devices = {} + # Payload that should be send when device is available + available_payload = "home" # type: str + # Payload that should be send when device is unavailable + unavailable_payload = "not_home" # type: str + # After what time (in seconds) we should inform that device is available (default: 0 seconds) + available_timeout = 0 # type: float + # After what time (in seconds) we should inform that device is unavailable (default: 60 seconds) + unavailable_timeout = 60 # type: float + scan_timeout = 10.0 # type: float + scan_passive = True # type: str or bool + + def __init__(self, command_timeout, global_topic_prefix, **kwargs): + super(BlescanmultiWorker, self).__init__( + command_timeout, global_topic_prefix, **kwargs + ) + self.scanner = Scanner().withDelegate(ScanDelegate()) + self.last_status = [ + BleDeviceStatus(self, mac, name) for name, mac in self.devices.items() + ] + + def status_update(self): + devices = self.scanner.scan( + float(self.scan_timeout), passive=booleanize(self.scan_passive) + ) + mac_addresses = {device.addr: device for device in devices} + ret = [] + + for status in self.last_status: + device = mac_addresses.get(status.mac, None) + status.set_status(device is not None) + ret += status.generate_messages(device) + + return ret diff --git a/workers/ibbq.py b/workers/ibbq.py index fe52721..82cbdf6 100644 --- a/workers/ibbq.py +++ b/workers/ibbq.py @@ -11,166 +11,194 @@ from workers.base import BaseWorker import logger import json + _LOGGER = logger.get(__name__) -REQUIREMENTS = ['bluepy'] +REQUIREMENTS = ["bluepy"] class IbbqWorker(BaseWorker): - def _setup(self): - _LOGGER.info("Adding %d %s devices", len(self.devices), repr(self)) - for name, mac in self.devices.items(): - _LOGGER.info("Adding %s device '%s' (%s)", repr(self), name, mac) - self.devices[name] = ibbqThermometer(mac, timeout=self.command_timeout) - - def format_static_topic(self, *args): - return '/'.join([self.topic_prefix, *args]) - - def __repr__(self): - return self.__module__.split(".")[-1] - - def status_update(self): - for name, ibbq in self.devices.items(): - ret = dict() - value=list() - if not ibbq.connected: - ibbq.device = ibbq.connect() - ibbq.subscribe() - bat, value = None, value - else: - bat, value = ibbq.update() - n = 0 - ret['available'] = ibbq.connected - ret['battery_level'] = bat - for i in value: - n += 1 - ret['Temp{}'.format(n)] = i - return([MqttMessage(topic=self.format_static_topic(name), payload = json.dumps(ret))]) - - -class ibbqThermometer(): - SettingResult = "fff1" - AccountAndVerify = 'fff2' - RealTimeData = 'fff4' - SettingData = 'fff5' - Notify = b'\x01\x00' - realTimeDataEnable = bytearray([0x0B, 0x01, 0x00, 0x00, 0x00, 0x00]) - batteryLevel = bytearray([0x08, 0x24, 0x00, 0x00, 0x00, 0x00]) - KEY = bytearray([0x21, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, - 0xb8, 0x22, 0x00, 0x00, 0x00, 0x00, 0x00]) - - - def getBattery(self): - self.Setting_uuid.write(self.batteryLevel) - - def connect(self, timeout=5): - try: - device =(btle.Peripheral(self.mac)) - _LOGGER.debug("%s connected ", self.mac) - return device - except btle.BTLEDisconnectError as er: - _LOGGER.debug("failed connect %s", er) - - def __init__(self, mac, timeout=5): - self.cnt = 0 - self.batteryPct = 0 - self.timeout = timeout - self.mac = mac - self.values = list() - self.device = self.connect() - self.offline = 0 - if not self.device: - return - self.device = self.subscribe() - - @property - def connected(self): - return(bool(self.device)) - - def subscribe(self, timeout=5): - if self.device is None: - return - try: - services = self.device.getServices() - for service in services: - if "fff0" not in str(service.uuid): - continue - for schar in service.getCharacteristics(): - if self.AccountAndVerify in str(schar.uuid): - self.account_uuid = schar - if self.RealTimeData in str(schar.uuid): - self.RT_uuid = schar - if self.SettingData in str(schar.uuid): - self.Setting_uuid = schar - if self.SettingResult in str(schar.uuid): - self.SettingResult_uuid = schar - - self.account_uuid.write(self.KEY) - _LOGGER.info("Authenticated %s", self.mac) - self.RT_uuid.getDescriptors() - self.device.writeCharacteristic(self.RT_uuid.getHandle() + 1, self.Notify) - self.device.writeCharacteristic(self.SettingResult_uuid.getHandle() +1, self.Notify) - self.getBattery() - self.Setting_uuid.write(self.realTimeDataEnable) - self.device.withDelegate(MyDelegate(self)) - _LOGGER.info("Subscribed %s", self.mac) - self.offline = 0 - except btle.BTLEException as ex: - _LOGGER.info("failed %s %s", self.mac, ex) - self.device = None - _LOGGER.info("unsubscribe") - return self.device - - def update(self): - if not self.connected: - return list() - self.values = list() - self.cnt += 1 - try: - if self.cnt >5: + def _setup(self): + _LOGGER.info("Adding %d %s devices", len(self.devices), repr(self)) + for name, mac in self.devices.items(): + _LOGGER.info("Adding %s device '%s' (%s)", repr(self), name, mac) + self.devices[name] = ibbqThermometer(mac, timeout=self.command_timeout) + + def format_static_topic(self, *args): + return "/".join([self.topic_prefix, *args]) + + def __repr__(self): + return self.__module__.split(".")[-1] + + def status_update(self): + for name, ibbq in self.devices.items(): + ret = dict() + value = list() + if not ibbq.connected: + ibbq.device = ibbq.connect() + ibbq.subscribe() + bat, value = None, value + else: + bat, value = ibbq.update() + n = 0 + ret["available"] = ibbq.connected + ret["battery_level"] = bat + for i in value: + n += 1 + ret["Temp{}".format(n)] = i + return [ + MqttMessage( + topic=self.format_static_topic(name), payload=json.dumps(ret) + ) + ] + + +class ibbqThermometer: + SettingResult = "fff1" + AccountAndVerify = "fff2" + RealTimeData = "fff4" + SettingData = "fff5" + Notify = b"\x01\x00" + realTimeDataEnable = bytearray([0x0B, 0x01, 0x00, 0x00, 0x00, 0x00]) + batteryLevel = bytearray([0x08, 0x24, 0x00, 0x00, 0x00, 0x00]) + KEY = bytearray( + [ + 0x21, + 0x07, + 0x06, + 0x05, + 0x04, + 0x03, + 0x02, + 0x01, + 0xB8, + 0x22, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ] + ) + + def getBattery(self): + self.Setting_uuid.write(self.batteryLevel) + + def connect(self, timeout=5): + try: + device = btle.Peripheral(self.mac) + _LOGGER.debug("%s connected ", self.mac) + return device + except btle.BTLEDisconnectError as er: + _LOGGER.debug("failed connect %s", er) + + def __init__(self, mac, timeout=5): self.cnt = 0 - self.getBattery() - while self.device.waitForNotifications(1): - pass - if self.values: + self.batteryPct = 0 + self.timeout = timeout + self.mac = mac + self.values = list() + self.device = self.connect() self.offline = 0 - else: - _LOGGER.debug("%s is silent", self.mac) - if self.offline > 3: - try: - self.device.disconnect() - except btle.BTLEInternalError as e: + if not self.device: + return + self.device = self.subscribe() + + @property + def connected(self): + return bool(self.device) + + def subscribe(self, timeout=5): + if self.device is None: + return + try: + services = self.device.getServices() + for service in services: + if "fff0" not in str(service.uuid): + continue + for schar in service.getCharacteristics(): + if self.AccountAndVerify in str(schar.uuid): + self.account_uuid = schar + if self.RealTimeData in str(schar.uuid): + self.RT_uuid = schar + if self.SettingData in str(schar.uuid): + self.Setting_uuid = schar + if self.SettingResult in str(schar.uuid): + self.SettingResult_uuid = schar + + self.account_uuid.write(self.KEY) + _LOGGER.info("Authenticated %s", self.mac) + self.RT_uuid.getDescriptors() + self.device.writeCharacteristic(self.RT_uuid.getHandle() + 1, self.Notify) + self.device.writeCharacteristic( + self.SettingResult_uuid.getHandle() + 1, self.Notify + ) + self.getBattery() + self.Setting_uuid.write(self.realTimeDataEnable) + self.device.withDelegate(MyDelegate(self)) + _LOGGER.info("Subscribed %s", self.mac) + self.offline = 0 + except btle.BTLEException as ex: + _LOGGER.info("failed %s %s", self.mac, ex) + self.device = None + _LOGGER.info("unsubscribe") + return self.device + + def update(self): + if not self.connected: + return list() + self.values = list() + self.cnt += 1 + try: + if self.cnt > 5: + self.cnt = 0 + self.getBattery() + while self.device.waitForNotifications(1): + pass + if self.values: + self.offline = 0 + else: + _LOGGER.debug("%s is silent", self.mac) + if self.offline > 3: + try: + self.device.disconnect() + except btle.BTLEInternalError as e: + _LOGGER.debug("%s", e) + self.device = None + _LOGGER.debug("%s reconnect", self.mac) + else: + self.offline += 1 + except btle.BTLEDisconnectError as e: _LOGGER.debug("%s", e) - self.device = None - _LOGGER.debug("%s reconnect", self.mac) - else: - self.offline += 1 - except btle.BTLEDisconnectError as e: - _LOGGER.debug("%s", e) - self.device = None - finally: - return(self.batteryPct, self.values) + self.device = None + finally: + return (self.batteryPct, self.values) + class MyDelegate(btle.DefaultDelegate): + def __init__(self, caller): + btle.DefaultDelegate.__init__(self) + self.caller = caller + _LOGGER.debug("init mydelegate") + + def handleNotification(self, cHandle, data): + batMin = 0.95 + batMax = 1.5 + result = list() + # safe = data + if cHandle == 37: + if data[0] == 0x24: + currentV = struct.unpack(" 0: + v, data = data[0:2], data[2:] + result.append(struct.unpack(" 0): - v, data = data[0:2], data[2:] - result.append(struct.unpack(' 0: - rssi = device.rssi - presence = 1 - state = bytes_[5] - pressure = bytes_[6] - time = bytes_[7]*60 + bytes_[8] - mode = bytes_[9] - sector = bytes_[10] - - attributes = { - "rssi": rssi, - "pressure": pressure, - "time": time, - "mode": self.get_mode(mode), - "sector": self.get_sector(sector) - } - presence_value = "online" if presence == 1 else "offline" - - ret.append(MqttMessage(topic=self.format_topic(key+"/presence"), payload=presence_value)) - ret.append(MqttMessage(topic=self.format_topic(key+"/state"), payload=self.get_state(state))) - ret.append(MqttMessage(topic=self.format_topic(key+"/attributes"), payload=json.dumps(attributes))) - - autoconf_data = self.get_autoconf_data(key, item['name']) - if autoconf_data != False: - ret.append(MqttMessage(topic=self.autodiscovery_prefix+"/sensor/"+self.topic_prefix+"_"+key+"/config", payload=json.dumps(autoconf_data), retain=True)) - - yield ret + def _setup(self): + self.autoconfCache = {} + + def searchmac(self, devices, mac): + for dev in devices: + if dev.addr == mac.lower(): + return dev + return None + + def get_autoconf_data(self, key, name): + if key in self.autoconfCache: + return False + else: + self.autoconfCache[key] = True + return { + "platform": "mqtt", + "name": name, + "state_topic": self.topic_prefix + "/" + key + "/state", + "availability_topic": self.topic_prefix + "/" + key + "/presence", + "json_attributes_topic": self.topic_prefix + "/" + key + "/attributes", + "icon": "mdi:tooth-outline", + } + + def get_state(self, item): + if item in BRUSHSTATES: + return BRUSHSTATES[item] + else: + return BRUSHSTATES[0] + + def get_mode(self, item): + if item in BRUSHMODES: + return BRUSHMODES[item] + else: + return BRUSHMODES[255] + + def get_sector(self, item): + if item in BRUSHSECTORS: + return BRUSHSECTORS[item] + else: + return BRUSHSECTORS[255] + + def status_update(self): + scanner = Scanner().withDelegate(ScanDelegate()) + devices = scanner.scan(5.0) + ret = [] + + for key, item in self.devices.items(): + device = self.searchmac(devices, item["mac"]) + + rssi = 0 + presence = 0 + state = 0 + pressure = 0 + time = 0 + mode = 255 + sector = 255 + + if device is not None: + bytes_ = bytearray.fromhex(device.getValueText(255)) + _LOGGER.debug("text: %s" % device.getValueText(255)) + + if bytes_[5] > 0: + rssi = device.rssi + presence = 1 + state = bytes_[5] + pressure = bytes_[6] + time = bytes_[7] * 60 + bytes_[8] + mode = bytes_[9] + sector = bytes_[10] + + attributes = { + "rssi": rssi, + "pressure": pressure, + "time": time, + "mode": self.get_mode(mode), + "sector": self.get_sector(sector), + } + presence_value = "online" if presence == 1 else "offline" + + ret.append( + MqttMessage( + topic=self.format_topic(key + "/presence"), payload=presence_value + ) + ) + ret.append( + MqttMessage( + topic=self.format_topic(key + "/state"), + payload=self.get_state(state), + ) + ) + ret.append( + MqttMessage( + topic=self.format_topic(key + "/attributes"), + payload=json.dumps(attributes), + ) + ) + + autoconf_data = self.get_autoconf_data(key, item["name"]) + if autoconf_data != False: + ret.append( + MqttMessage( + topic=self.autodiscovery_prefix + + "/sensor/" + + self.topic_prefix + + "_" + + key + + "/config", + payload=json.dumps(autoconf_data), + retain=True, + ) + ) + + yield ret diff --git a/workers_manager.py b/workers_manager.py index 95819b4..ba1958d 100644 --- a/workers_manager.py +++ b/workers_manager.py @@ -13,153 +13,214 @@ import logger from pip import __version__ as pip_version -if int(pip_version.split('.')[0]) >= 10: - from pip._internal import main as pip_main + +if int(pip_version.split(".")[0]) >= 10: + from pip._internal import main as pip_main else: - from pip import main as pip_main + from pip import main as pip_main _LOGGER = logger.get(__name__) + class WorkersManager: - class Command: - def __init__(self, callback, timeout, args=(), options=dict()): - self._callback = callback - self._timeout = timeout - self._args = args - self._options = options - self._source = '{}.{}'.format(callback.__self__.__class__.__name__ if hasattr(callback, '__self__') else callback.__module__, callback.__name__) - - def execute(self): - messages = [] - - try: - with timeout(self._timeout, exception=WorkerTimeoutError('Execution of command {} timed out after {} seconds'.format(self._source, self._timeout))): - if inspect.isgeneratorfunction(self._callback): - for message in self._callback(*self._args): - messages += message - else: - messages = self._callback(*self._args) - except WorkerTimeoutError as e: - if messages: - logger.log_exception(_LOGGER, "%s, sending only partial update", e, suppress=True) - else: - raise e - - _LOGGER.debug('Execution result of command %s: %s', self._source, messages) - return messages - - def __init__(self, config): - self._mqtt_callbacks = [] - self._config_commands = [] - self._update_commands = [] - self._scheduler = BackgroundScheduler(timezone=utc) - self._daemons = [] - self._config = config - self._command_timeout = config.get('command_timeout', DEFAULT_COMMAND_TIMEOUT) - - def register_workers(self, global_topic_prefix): - for (worker_name, worker_config) in self._config['workers'].items(): - module_obj = importlib.import_module("workers.%s" % worker_name) - klass = getattr(module_obj, "%sWorker" % worker_name.title()) - - if module_obj.REQUIREMENTS is not None: - self._pip_install_helper(module_obj.REQUIREMENTS) - - command_timeout = worker_config.get('command_timeout', self._command_timeout) - worker_obj = klass(command_timeout, global_topic_prefix, **worker_config['args']) - - if 'sensor_config' in self._config and hasattr(worker_obj, 'config'): - _LOGGER.debug("Added %s config with a %d seconds timeout", repr(worker_obj), 2) - command = self.Command(worker_obj.config, 2, []) - self._config_commands.append(command) - - if hasattr(worker_obj, 'status_update'): - _LOGGER.debug("Added %s worker with %d seconds interval and a %d seconds timeout", repr(worker_obj), worker_config['update_interval'], worker_obj.command_timeout) - command = self.Command(worker_obj.status_update, worker_obj.command_timeout, []) - self._update_commands.append(command) - - if 'update_interval' in worker_config: - job_id = '{}_interval_job'.format(worker_name) - interval_job = self._scheduler.add_job( - partial(self._queue_command, command), 'interval', - seconds=worker_config['update_interval'], id=job_id - ) - self._mqtt_callbacks.append(( - worker_obj.format_topic('update_interval'), - partial(self._update_interval_wrapper, command, job_id) - )) - elif hasattr(worker_obj, 'run'): - _LOGGER.debug("Registered %s as daemon", repr(worker_obj)) - self._daemons.append(worker_obj) - else: - raise "%s cannot be initialized, it has to define run or status_update method" % worker_name - - if 'topic_subscription' in worker_config: - self._mqtt_callbacks.append(( - worker_config['topic_subscription'], - partial(self._on_command_wrapper, worker_obj) - )) - - if 'topic_subscription' in self._config: - for (callback_name, options) in self._config['topic_subscription'].items(): - self._mqtt_callbacks.append(( - options['topic'], - lambda client, _ , c: self._queue_if_matching_payload(self.Command(getattr(self, callback_name), self._command_timeout), c.payload, options['payload'])) + class Command: + def __init__(self, callback, timeout, args=(), options=dict()): + self._callback = callback + self._timeout = timeout + self._args = args + self._options = options + self._source = "{}.{}".format( + callback.__self__.__class__.__name__ + if hasattr(callback, "__self__") + else callback.__module__, + callback.__name__, + ) + + def execute(self): + messages = [] + + try: + with timeout( + self._timeout, + exception=WorkerTimeoutError( + "Execution of command {} timed out after {} seconds".format( + self._source, self._timeout + ) + ), + ): + if inspect.isgeneratorfunction(self._callback): + for message in self._callback(*self._args): + messages += message + else: + messages = self._callback(*self._args) + except WorkerTimeoutError as e: + if messages: + logger.log_exception( + _LOGGER, "%s, sending only partial update", e, suppress=True + ) + else: + raise e + + _LOGGER.debug("Execution result of command %s: %s", self._source, messages) + return messages + + def __init__(self, config): + self._mqtt_callbacks = [] + self._config_commands = [] + self._update_commands = [] + self._scheduler = BackgroundScheduler(timezone=utc) + self._daemons = [] + self._config = config + self._command_timeout = config.get("command_timeout", DEFAULT_COMMAND_TIMEOUT) + + def register_workers(self, global_topic_prefix): + for (worker_name, worker_config) in self._config["workers"].items(): + module_obj = importlib.import_module("workers.%s" % worker_name) + klass = getattr(module_obj, "%sWorker" % worker_name.title()) + + if module_obj.REQUIREMENTS is not None: + self._pip_install_helper(module_obj.REQUIREMENTS) + + command_timeout = worker_config.get( + "command_timeout", self._command_timeout + ) + worker_obj = klass( + command_timeout, global_topic_prefix, **worker_config["args"] + ) + + if "sensor_config" in self._config and hasattr(worker_obj, "config"): + _LOGGER.debug( + "Added %s config with a %d seconds timeout", repr(worker_obj), 2 + ) + command = self.Command(worker_obj.config, 2, []) + self._config_commands.append(command) + + if hasattr(worker_obj, "status_update"): + _LOGGER.debug( + "Added %s worker with %d seconds interval and a %d seconds timeout", + repr(worker_obj), + worker_config["update_interval"], + worker_obj.command_timeout, + ) + command = self.Command( + worker_obj.status_update, worker_obj.command_timeout, [] + ) + self._update_commands.append(command) + + if "update_interval" in worker_config: + job_id = "{}_interval_job".format(worker_name) + interval_job = self._scheduler.add_job( + partial(self._queue_command, command), + "interval", + seconds=worker_config["update_interval"], + id=job_id, + ) + self._mqtt_callbacks.append( + ( + worker_obj.format_topic("update_interval"), + partial(self._update_interval_wrapper, command, job_id), + ) + ) + elif hasattr(worker_obj, "run"): + _LOGGER.debug("Registered %s as daemon", repr(worker_obj)) + self._daemons.append(worker_obj) + else: + raise "%s cannot be initialized, it has to define run or status_update method" % worker_name + + if "topic_subscription" in worker_config: + self._mqtt_callbacks.append( + ( + worker_config["topic_subscription"], + partial(self._on_command_wrapper, worker_obj), + ) + ) + + if "topic_subscription" in self._config: + for (callback_name, options) in self._config["topic_subscription"].items(): + self._mqtt_callbacks.append( + ( + options["topic"], + lambda client, _, c: self._queue_if_matching_payload( + self.Command( + getattr(self, callback_name), self._command_timeout + ), + c.payload, + options["payload"], + ), + ) + ) + + return self + + def start(self, mqtt): + mqtt.callbacks_subscription(self._mqtt_callbacks) + + if "sensor_config" in self._config: + self._publish_config(mqtt) + + self._scheduler.start() + self.update_all() + for daemon in self._daemons: + threading.Thread(target=daemon.run, args=[mqtt], daemon=True).start() + + def _queue_if_matching_payload(self, command, payload, expected_payload): + if payload.decode("utf-8") == expected_payload: + self._queue_command(command) + + def update_all(self): + _LOGGER.debug("Updating all workers") + for command in self._update_commands: + self._queue_command(command) + + @staticmethod + def _queue_command(command): + _WORKERS_QUEUE.put(command) + + @staticmethod + def _pip_install_helper(package_names): + for package in package_names: + pip_main(["install", "-q", package]) + logger.reset() + + def _update_interval_wrapper(self, command, job_id, client, userdata, c): + _LOGGER.info("Recieved updated interval for %s with: %s", c.topic, c.payload) + try: + new_interval = int(c.payload) + self._scheduler.remove_job(job_id) + self._scheduler.add_job( + partial(self._queue_command, command), + "interval", + seconds=new_interval, + id=job_id, + ) + except ValueError: + logger.log_exception( + _LOGGER, "Ignoring invalid new interval: %s", c.payload + ) + + def _on_command_wrapper(self, worker_obj, client, userdata, c): + _LOGGER.debug( + "Received command for %s on %s: %s", repr(worker_obj), c.topic, c.payload + ) + global_topic_prefix = userdata["global_topic_prefix"] + topic = ( + c.topic[len(global_topic_prefix + "/") :] + if global_topic_prefix is not None + else c.topic + ) + self._queue_command( + self.Command( + worker_obj.on_command, worker_obj.command_timeout, [topic, c.payload] + ) ) - return self - - def start(self, mqtt): - mqtt.callbacks_subscription(self._mqtt_callbacks) - - if 'sensor_config' in self._config: - self._publish_config(mqtt) - - self._scheduler.start() - self.update_all() - for daemon in self._daemons: - threading.Thread(target=daemon.run, args=[mqtt], daemon=True).start() - - def _queue_if_matching_payload(self, command, payload, expected_payload): - if payload.decode('utf-8') == expected_payload: - self._queue_command(command) - - def update_all(self): - _LOGGER.debug("Updating all workers") - for command in self._update_commands: - self._queue_command(command) - - @staticmethod - def _queue_command(command): - _WORKERS_QUEUE.put(command) - - @staticmethod - def _pip_install_helper(package_names): - for package in package_names: - pip_main(['install', '-q', package]) - logger.reset() - - def _update_interval_wrapper(self, command, job_id, client, userdata, c): - _LOGGER.info("Recieved updated interval for %s with: %s", c.topic, c.payload) - try: - new_interval = int(c.payload) - self._scheduler.remove_job(job_id) - self._scheduler.add_job( - partial(self._queue_command, command), 'interval', - seconds=new_interval, id=job_id) - except ValueError: - logger.log_exception(_LOGGER, 'Ignoring invalid new interval: %s', c.payload) - - def _on_command_wrapper(self, worker_obj, client, userdata, c): - _LOGGER.debug("Received command for %s on %s: %s", repr(worker_obj), c.topic, c.payload) - global_topic_prefix = userdata['global_topic_prefix'] - topic = c.topic[len(global_topic_prefix+'/'):] if global_topic_prefix is not None else c.topic - self._queue_command(self.Command(worker_obj.on_command, worker_obj.command_timeout, [topic, c.payload])) - - def _publish_config(self, mqtt): - for command in self._config_commands: - messages = command.execute() - for msg in messages: - msg.topic = "{}/{}".format(self._config['sensor_config'].get('topic', 'homeassistant'), msg.topic) - msg.retain = self._config['sensor_config'].get('retain', True) - mqtt.publish(messages) + def _publish_config(self, mqtt): + for command in self._config_commands: + messages = command.execute() + for msg in messages: + msg.topic = "{}/{}".format( + self._config["sensor_config"].get("topic", "homeassistant"), + msg.topic, + ) + msg.retain = self._config["sensor_config"].get("retain", True) + mqtt.publish(messages) diff --git a/workers_queue.py b/workers_queue.py index cb58542..d5320b0 100644 --- a/workers_queue.py +++ b/workers_queue.py @@ -1,4 +1,3 @@ from queue import Queue _WORKERS_QUEUE = Queue() -