diff --git a/custom_components/bermuda/__init__.py b/custom_components/bermuda/__init__.py index fc0dd74..ca0abab 100644 --- a/custom_components/bermuda/__init__.py +++ b/custom_components/bermuda/__init__.py @@ -8,6 +8,7 @@ import asyncio import logging +from datetime import datetime from datetime import timedelta from typing import Final @@ -23,6 +24,7 @@ from homeassistant.helpers.device_registry import format_mac from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from homeassistant.util import slugify +from homeassistant.util.dt import get_age from homeassistant.util.dt import monotonic_time_coarse from homeassistant.util.dt import now @@ -31,6 +33,8 @@ from .const import STARTUP_MESSAGE from .entity import BermudaEntity +# from typing import Any + # from .const import CONF_PASSWORD # from .const import CONF_USERNAME @@ -115,14 +119,48 @@ class BermudaDeviceScanner(dict): def __init__( self, device_address: str, scandata: BluetoothScannerDevice, area_id: str ): - self.name = scandata.scanner.name - self.area_id = area_id - self.adapter = scandata.scanner.adapter - self.source = scandata.scanner.source - self.stamp = scandata.scanner._discovered_device_timestamps[device_address] - self.rssi = scandata.advertisement.rssi - self.rssi_distance = rssi_to_metres(self.rssi) - self.adverts = scandata.advertisement.service_data.items() + self.name: str = scandata.scanner.name + self.area_id: str = area_id + self.adapter: str = scandata.scanner.adapter + self.source: str = scandata.scanner.source + self.rssi: float = scandata.advertisement.rssi + self.rssi_distance: float = rssi_to_metres(self.rssi) + self.adverts: dict[str, bytes] = scandata.advertisement.service_data.items() + + self.stamp: float = None + # Only remote scanners log timestamps here (local usb adaptors do not), + # so check if the dict is there at all first... + if "_discovered_device_timestamps" in vars(scandata.scanner): + # Found a remote scanner which has timestamp history... + + # FIXME: + # pylint: disable-next=protected-access + stamps = scandata.scanner._discovered_device_timestamps + + # In this dict all MAC address keys are upper-cased + uppermac = device_address.upper() + if uppermac in stamps: + self.stamp = stamps[uppermac] + else: + # This shouldn't happen, as we shouldn't have got a record + # of this scanner if it hadn't seen this device. + _LOGGER.error( + "Scanner %s has no stamp for %s - very odd.", + self.source, + device_address, + ) + self.stamp = 0 + else: + # Not a bluetooth_proxy device / remote scanner. + # FIXME: Work out how to handle a bluetooth adaptor's reports. + # Options are: + # (a) find a timestamp somehwere + # (b) if we are doing updates as ads come in, use now()-some_safety_offset. + # + # For now we'll need to ignore it, since the advert might be very stale + # and would give false positives. It's a FIXME for when we receive adverts + # directly instead of periodically trawling the bluetooth manager's history. + self.stamp = 0 def to_dict(self): """Convert class to serialisable dict for dump_devices""" @@ -130,8 +168,8 @@ def to_dict(self): for var, val in vars(self).items(): if var == "adverts": val = {} - for ad, thebytes in self.adverts: - val[ad] = thebytes.hex() + for uuid, thebytes in self.adverts: + val[uuid] = thebytes.hex() out[var] = val return out @@ -149,24 +187,41 @@ class BermudaDevice(dict): def __init__(self): """Initial (empty) data""" - self.address = None - self.unique_id = None # mac address formatted. - self.name = None - self.local_name = None - self.prefname = None # "preferred" name - ideally local_name - self.area_id = None - self.area_name = None - self.area_distance = None # how far this dev is from that area - self.location = None # home or not_home - self.manufacturer = None - self.connectable = False - self.is_scanner = False - self.entry_id = None # used for scanner devices - self.send_tracker_see = False # Create/update device_tracker entity - self.create_sensor = False # Create/update a sensor for this device - self.last_seen = 0 # stamp from most recent scanner spotting + self.address: str = None + self.unique_id: str = None # mac address formatted. + self.name: str = None + self.local_name: str = None + self.prefname: str = None # "preferred" name - ideally local_name + self.area_id: str = None + self.area_name: str = None + self.area_distance: float = None # how far this dev is from that area + self.zone: str = None # home or not_home + self.manufacturer: str = None + self.connectable: bool = False + self.is_scanner: bool = False + self.entry_id: str = None # used for scanner devices + self.send_tracker_see: bool = False # Create/update device_tracker entity + self.create_sensor: bool = False # Create/update a sensor for this device + self.last_seen: float = ( + 0 # stamp from most recent scanner spotting. MONOTONIC_TIME + ) self.scanners: dict[str, BermudaDeviceScanner] = {} + def add_scanner( + self, scanner_device: BermudaDevice, discoveryinfo: BluetoothScannerDevice + ): + """Add/Replace a scanner entry on this device, indicating a received advertisement""" + self.scanners[ + format_mac(scanner_device.address) + ] = newscanner = BermudaDeviceScanner( + self.address, + discoveryinfo, # the entire BluetoothScannerDevice struct + scanner_device.area_id, + ) + # Let's see if we should update our last_seen based on this... + if self.last_seen < newscanner.stamp: + self.last_seen = newscanner.stamp + def to_dict(self): """Convert class to serialisable dict for dump_devices""" out = {} @@ -239,6 +294,22 @@ def __init__( super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL) + def _get_device(self, address: str) -> BermudaDevice: + """Search for a device entry based on mac address""" + mac = format_mac(address) + if mac in self.devices: + return self.devices[mac] + return None + + def _get_or_create_device(self, address: str) -> BermudaDevice: + device = self._get_device(address) + if device is None: + mac = format_mac(address) + self.devices[mac] = device = BermudaDevice() + device.address = mac + device.unique_id = mac + return device + async def _async_update_data(self): """Update data on known devices. @@ -253,16 +324,11 @@ async def _async_update_data(self): # scanner entries for any given device. # Get/Create a device entry - if service_info.address not in self.devices: - # Initialise an empty device - self.devices[service_info.address] = BermudaDevice() - device: BermudaDevice = self.devices[service_info.address] + device = self._get_or_create_device(service_info.address) # We probably don't need to do all of this every time, but we # want to catch any changes, eg when the system learns the local # name etc. - device.address = service_info.address - device.unique_id = format_mac(service_info.address) device.name = service_info.device.name device.local_name = service_info.advertisement.local_name device.manufacturer = service_info.manufacturer @@ -280,42 +346,33 @@ async def _async_update_data(self): device.prefname = "bermuda_" + slugify(service_info.address) # Work through the scanner entries... - for discovered in bluetooth.async_scanner_devices_by_address( + matched_scanners = bluetooth.async_scanner_devices_by_address( self.hass, service_info.address, False - ): - if discovered.scanner.source not in self.devices: - self._refresh_scanners() - - # Only remote scanners log timestamps (so not local usb adaptors), - # so check if the dict is there at all first... - if "_discovered_device_timestamps" in vars(discovered.scanner): - # FIXME: Find a method or request one be added for this - # pylint: disable-next=protected-access - stamps = discovered.scanner._discovered_device_timestamps - scanner_stamp = stamps[service_info.address] - if device.last_seen < scanner_stamp: - device.last_seen = scanner_stamp - else: - # FIXME: Work out how to handle a bluetooth adaptors reports. - # Options are: (a) find a timestamp somehwere (b) if we are - # doing updates as ads come in, use now()-some_safety_offset. - # For now, pretend it's as young as the previous update... - if device.last_seen < ( - MONOTONIC_TIME() - SCAN_INTERVAL.total_seconds() - ): - device.last_seen = ( - MONOTONIC_TIME() - SCAN_INTERVAL.total_seconds() - ) - - # Just replace the scanner entries outright... - device.scanners[discovered.scanner.source] = BermudaDeviceScanner( - device.address, - discovered, - self.devices[discovered.scanner.source].area_id, - ) + ) + for discovered in matched_scanners: + scanner_device = self._get_device(discovered.scanner.source) + if scanner_device is None: + # The receiver doesn't have a device entry yet, let's refresh + # all of them in this batch... + self._refresh_scanners(matched_scanners) + scanner_device = self._get_device(discovered.scanner.source) + + if scanner_device is None: + # Highly unusual. If we can't find an entry for the scanner + # maybe it's from an integration that's not yet loaded, or + # perhaps it's an unexpected type that we don't know how to + # find. + _LOGGER.error( + "Failed to find config for scanner %s, this is probably a bug.", + discovered.scanner.source, + ) + continue + + # Replace the scanner entry on the current device + device.add_scanner(scanner_device, discovered) # FIXME: This should be configurable... - if device.address in [ + if device.address.upper() in [ "EE:E8:37:9F:6B:54", # infinitime, main watch "C7:B8:C6:B0:27:11", # pinetime, devwatch "A4:C1:38:C8:58:91", # bthome thermo, with reed switch @@ -331,39 +388,34 @@ async def _async_update_data(self): # end of async update - # async def _create_or_update_sensor(self, device): - # if self.async_sensor_add_entities is not None: - # #await self.async_sensor_add_entities([BermudaSensor, self.config_entry]) - # NotImplemented - - async def _send_device_tracker_see(self, device): + async def _send_device_tracker_see(self, device: BermudaDevice): """Send "see" event to the legacy device_tracker integration. If the device is not yet in known_devices.yaml it will get added. - Note that device_tracker can *only* support [home|not_home]. - It does support Zones (not via the service though?), but Zones - are only EXTERNAL to the home, not the same as "Area"s. + Note that device_tracker can *only* support [home|not_home], + because device_tracker only deals with "Zones" not "Areas". - I'm not implementing device_tracker proper because I don't grok it - well enough yet. And to be honest this is probably all we need - since it doesn't support Areas anyway. + Simply calling the "see" service is the simplest way to + get this done, but if we need more control (eg, specifying + the source (gps|router|etc)) we might need to hook and implement + it specifically. This is probably all we need right now though: TODO: Allow user to configure what name to use for the device_tracker. """ # Check if the device has been seen recently - rightnow = MONOTONIC_TIME() - if rightnow - device.last_seen > self.timeout_not_home: - location_name = "not_home" + if MONOTONIC_TIME() - self.timeout_not_home < device.last_seen: + device.zone = "home" else: - location_name = "home" - - # If mac is set, dt will: - # slugify the hostname (if set) or mac, and use that as the dev_id. - # Else: - # will slugify dev_id - # So, we will not set mac, but use bermuda_[mac] as dev_id and prefname - # for host_name. + device.zone = "not_home" + + # If mac is set, device_tracker will override our dev_id + # with slugified (hostname OR mac). We don't want that + # since we want dev_id (the key in known_devices.yaml) to + # be stable, predictable and identifyably ours. + # + # So, we will not set mac, but use bermuda_[mac] as dev_id + # and prefname or user-supplied name for host_name. await self.hass.services.async_call( domain="device_tracker", service="see", @@ -371,15 +423,19 @@ async def _send_device_tracker_see(self, device): "dev_id": "bermuda_" + slugify(device.address), # 'mac': device.address, "host_name": device.prefname, - "location_name": location_name, + "location_name": device.zone, }, ) - def dt_mono_to_datetime(self, stamp): + def dt_mono_to_datetime(self, stamp) -> datetime: """Given a monotonic timestamp, convert to datetime object""" age = MONOTONIC_TIME() - stamp return now() - timedelta(seconds=age) + def dt_mono_to_age(self, stamp) -> str: + """Convert monotonic timestamp to age (eg: "6 seconds ago")""" + return get_age(self.dt_mono_to_datetime(stamp)) + def _refresh_areas_by_min_distance(self): """Set area for ALL devices based on closest beacon""" for device in self.devices.values(): @@ -402,12 +458,12 @@ def _refresh_area_by_min_distance(self, device: BermudaDevice): if closest_scanner is not None: # We found a winner device.area_id = closest_scanner.area_id - areas = self.ar.async_get_area(device.area_id).name # potentially a list. + areas = self.ar.async_get_area(device.area_id).name # potentially a list?! if len(areas) == 1: device.area_name = areas[0] else: + # none or a list, perhaps... device.area_name = areas - device.area_distance = closest_scanner.rssi_distance else: # Not close to any scanners! @@ -415,18 +471,21 @@ def _refresh_area_by_min_distance(self, device: BermudaDevice): device.area_name = None device.area_distance = None - def _refresh_scanners(self, address=None): + def _refresh_scanners(self, scanners: list[BluetoothScannerDevice]): """Refresh our local list of scanners (BLE Proxies)""" - # FIXME: Really? This can't possibly be a sensible nesting of loops. - for dev_entry in self.hass.data["device_registry"].devices.data.values(): - if len(dev_entry.connections) > 0: + addresses = set() + for scanner in scanners: + addresses.add(scanner.scanner.source.upper()) + if len(addresses) > 0: + # FIXME: Really? This can't possibly be a sensible nesting of loops. + # should probably look at the API. Anyway, we are checking any devices + # that have a "mac" or "bluetooth" connection, + for dev_entry in self.hass.data["device_registry"].devices.data.values(): for dev_connection in dev_entry.connections: - if dev_connection[0] == "mac": - if address is None or address == dev_connection[1]: - found_address = dev_connection[1] - self.devices[found_address] = BermudaDevice() - scandev = self.devices[found_address] - scandev.address = found_address + if dev_connection[0] in ["mac", "bluetooth"]: + found_address = dev_connection[1].upper() + if found_address in addresses: + scandev = self._get_or_create_device(found_address) scandev.area_id = dev_entry.area_id scandev.entry_id = dev_entry.id if dev_entry.name_by_user is not None: diff --git a/scripts/develop b/scripts/develop index 89eda50..d7c4c2d 100755 --- a/scripts/develop +++ b/scripts/develop @@ -4,6 +4,14 @@ set -e cd "$(dirname "$0")/.." +if [[ ! -f venv/bin/activate ]]; then + echo "ERROR: No venv, make sure you run setup first!" + exit 1 +fi + +source venv/bin/activate + + # Create config dir if not present if [[ ! -d "${PWD}/config" ]]; then mkdir -p "${PWD}/config" diff --git a/scripts/lint b/scripts/lint index 4ffd2fd..04d0641 100755 --- a/scripts/lint +++ b/scripts/lint @@ -4,6 +4,8 @@ set -e cd "$(dirname "$0")/.." +source venv/bin/activate + #ruff check . --fix pre-commit run --all-files --show-diff-on-failure --color=always diff --git a/scripts/setup b/scripts/setup index e9d3353..3909a2d 100755 --- a/scripts/setup +++ b/scripts/setup @@ -4,6 +4,10 @@ set -e cd "$(dirname "$0")/.." +# Create a virtual environment +python3 -m venv venv +source venv/bin/activate + # Seems to fix broken aiohttp wheel building in 3.11: python3 -m pip install --upgrade setuptools wheel # Fix urllib3 issue https://github.com/home-assistant/core/issues/95192