Skip to content

Commit

Permalink
Merge pull request #23 from agittins/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
agittins authored Aug 25, 2023
2 parents 15d23b5 + f3fe815 commit 648d0dd
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 102 deletions.
263 changes: 161 additions & 102 deletions custom_components/bermuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import asyncio
import logging
from datetime import datetime
from datetime import timedelta
from typing import Final

Expand All @@ -23,6 +24,7 @@
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util import slugify
from homeassistant.util.dt import get_age
from homeassistant.util.dt import monotonic_time_coarse
from homeassistant.util.dt import now

Expand All @@ -31,6 +33,8 @@
from .const import STARTUP_MESSAGE
from .entity import BermudaEntity

# from typing import Any

# from .const import CONF_PASSWORD
# from .const import CONF_USERNAME

Expand Down Expand Up @@ -115,23 +119,57 @@ class BermudaDeviceScanner(dict):
def __init__(
self, device_address: str, scandata: BluetoothScannerDevice, area_id: str
):
self.name = scandata.scanner.name
self.area_id = area_id
self.adapter = scandata.scanner.adapter
self.source = scandata.scanner.source
self.stamp = scandata.scanner._discovered_device_timestamps[device_address]
self.rssi = scandata.advertisement.rssi
self.rssi_distance = rssi_to_metres(self.rssi)
self.adverts = scandata.advertisement.service_data.items()
self.name: str = scandata.scanner.name
self.area_id: str = area_id
self.adapter: str = scandata.scanner.adapter
self.source: str = scandata.scanner.source
self.rssi: float = scandata.advertisement.rssi
self.rssi_distance: float = rssi_to_metres(self.rssi)
self.adverts: dict[str, bytes] = scandata.advertisement.service_data.items()

self.stamp: float = None
# Only remote scanners log timestamps here (local usb adaptors do not),
# so check if the dict is there at all first...
if "_discovered_device_timestamps" in vars(scandata.scanner):
# Found a remote scanner which has timestamp history...

# FIXME:
# pylint: disable-next=protected-access
stamps = scandata.scanner._discovered_device_timestamps

# In this dict all MAC address keys are upper-cased
uppermac = device_address.upper()
if uppermac in stamps:
self.stamp = stamps[uppermac]
else:
# This shouldn't happen, as we shouldn't have got a record
# of this scanner if it hadn't seen this device.
_LOGGER.error(
"Scanner %s has no stamp for %s - very odd.",
self.source,
device_address,
)
self.stamp = 0
else:
# Not a bluetooth_proxy device / remote scanner.
# FIXME: Work out how to handle a bluetooth adaptor's reports.
# Options are:
# (a) find a timestamp somehwere
# (b) if we are doing updates as ads come in, use now()-some_safety_offset.
#
# For now we'll need to ignore it, since the advert might be very stale
# and would give false positives. It's a FIXME for when we receive adverts
# directly instead of periodically trawling the bluetooth manager's history.
self.stamp = 0

def to_dict(self):
"""Convert class to serialisable dict for dump_devices"""
out = {}
for var, val in vars(self).items():
if var == "adverts":
val = {}
for ad, thebytes in self.adverts:
val[ad] = thebytes.hex()
for uuid, thebytes in self.adverts:
val[uuid] = thebytes.hex()
out[var] = val
return out

Expand All @@ -149,24 +187,41 @@ class BermudaDevice(dict):

def __init__(self):
"""Initial (empty) data"""
self.address = None
self.unique_id = None # mac address formatted.
self.name = None
self.local_name = None
self.prefname = None # "preferred" name - ideally local_name
self.area_id = None
self.area_name = None
self.area_distance = None # how far this dev is from that area
self.location = None # home or not_home
self.manufacturer = None
self.connectable = False
self.is_scanner = False
self.entry_id = None # used for scanner devices
self.send_tracker_see = False # Create/update device_tracker entity
self.create_sensor = False # Create/update a sensor for this device
self.last_seen = 0 # stamp from most recent scanner spotting
self.address: str = None
self.unique_id: str = None # mac address formatted.
self.name: str = None
self.local_name: str = None
self.prefname: str = None # "preferred" name - ideally local_name
self.area_id: str = None
self.area_name: str = None
self.area_distance: float = None # how far this dev is from that area
self.zone: str = None # home or not_home
self.manufacturer: str = None
self.connectable: bool = False
self.is_scanner: bool = False
self.entry_id: str = None # used for scanner devices
self.send_tracker_see: bool = False # Create/update device_tracker entity
self.create_sensor: bool = False # Create/update a sensor for this device
self.last_seen: float = (
0 # stamp from most recent scanner spotting. MONOTONIC_TIME
)
self.scanners: dict[str, BermudaDeviceScanner] = {}

def add_scanner(
self, scanner_device: BermudaDevice, discoveryinfo: BluetoothScannerDevice
):
"""Add/Replace a scanner entry on this device, indicating a received advertisement"""
self.scanners[
format_mac(scanner_device.address)
] = newscanner = BermudaDeviceScanner(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id,
)
# Let's see if we should update our last_seen based on this...
if self.last_seen < newscanner.stamp:
self.last_seen = newscanner.stamp

def to_dict(self):
"""Convert class to serialisable dict for dump_devices"""
out = {}
Expand Down Expand Up @@ -239,6 +294,22 @@ def __init__(

super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL)

def _get_device(self, address: str) -> BermudaDevice:
"""Search for a device entry based on mac address"""
mac = format_mac(address)
if mac in self.devices:
return self.devices[mac]
return None

def _get_or_create_device(self, address: str) -> BermudaDevice:
device = self._get_device(address)
if device is None:
mac = format_mac(address)
self.devices[mac] = device = BermudaDevice()
device.address = mac
device.unique_id = mac
return device

async def _async_update_data(self):
"""Update data on known devices.
Expand All @@ -253,16 +324,11 @@ async def _async_update_data(self):
# scanner entries for any given device.

# Get/Create a device entry
if service_info.address not in self.devices:
# Initialise an empty device
self.devices[service_info.address] = BermudaDevice()
device: BermudaDevice = self.devices[service_info.address]
device = self._get_or_create_device(service_info.address)

# We probably don't need to do all of this every time, but we
# want to catch any changes, eg when the system learns the local
# name etc.
device.address = service_info.address
device.unique_id = format_mac(service_info.address)
device.name = service_info.device.name
device.local_name = service_info.advertisement.local_name
device.manufacturer = service_info.manufacturer
Expand All @@ -280,42 +346,33 @@ async def _async_update_data(self):
device.prefname = "bermuda_" + slugify(service_info.address)

# Work through the scanner entries...
for discovered in bluetooth.async_scanner_devices_by_address(
matched_scanners = bluetooth.async_scanner_devices_by_address(
self.hass, service_info.address, False
):
if discovered.scanner.source not in self.devices:
self._refresh_scanners()

# Only remote scanners log timestamps (so not local usb adaptors),
# so check if the dict is there at all first...
if "_discovered_device_timestamps" in vars(discovered.scanner):
# FIXME: Find a method or request one be added for this
# pylint: disable-next=protected-access
stamps = discovered.scanner._discovered_device_timestamps
scanner_stamp = stamps[service_info.address]
if device.last_seen < scanner_stamp:
device.last_seen = scanner_stamp
else:
# FIXME: Work out how to handle a bluetooth adaptors reports.
# Options are: (a) find a timestamp somehwere (b) if we are
# doing updates as ads come in, use now()-some_safety_offset.
# For now, pretend it's as young as the previous update...
if device.last_seen < (
MONOTONIC_TIME() - SCAN_INTERVAL.total_seconds()
):
device.last_seen = (
MONOTONIC_TIME() - SCAN_INTERVAL.total_seconds()
)

# Just replace the scanner entries outright...
device.scanners[discovered.scanner.source] = BermudaDeviceScanner(
device.address,
discovered,
self.devices[discovered.scanner.source].area_id,
)
)
for discovered in matched_scanners:
scanner_device = self._get_device(discovered.scanner.source)
if scanner_device is None:
# The receiver doesn't have a device entry yet, let's refresh
# all of them in this batch...
self._refresh_scanners(matched_scanners)
scanner_device = self._get_device(discovered.scanner.source)

if scanner_device is None:
# Highly unusual. If we can't find an entry for the scanner
# maybe it's from an integration that's not yet loaded, or
# perhaps it's an unexpected type that we don't know how to
# find.
_LOGGER.error(
"Failed to find config for scanner %s, this is probably a bug.",
discovered.scanner.source,
)
continue

# Replace the scanner entry on the current device
device.add_scanner(scanner_device, discovered)

# FIXME: This should be configurable...
if device.address in [
if device.address.upper() in [
"EE:E8:37:9F:6B:54", # infinitime, main watch
"C7:B8:C6:B0:27:11", # pinetime, devwatch
"A4:C1:38:C8:58:91", # bthome thermo, with reed switch
Expand All @@ -331,55 +388,54 @@ async def _async_update_data(self):

# end of async update

# async def _create_or_update_sensor(self, device):
# if self.async_sensor_add_entities is not None:
# #await self.async_sensor_add_entities([BermudaSensor, self.config_entry])
# NotImplemented

async def _send_device_tracker_see(self, device):
async def _send_device_tracker_see(self, device: BermudaDevice):
"""Send "see" event to the legacy device_tracker integration.
If the device is not yet in known_devices.yaml it will get added.
Note that device_tracker can *only* support [home|not_home].
It does support Zones (not via the service though?), but Zones
are only EXTERNAL to the home, not the same as "Area"s.
Note that device_tracker can *only* support [home|not_home],
because device_tracker only deals with "Zones" not "Areas".
I'm not implementing device_tracker proper because I don't grok it
well enough yet. And to be honest this is probably all we need
since it doesn't support Areas anyway.
Simply calling the "see" service is the simplest way to
get this done, but if we need more control (eg, specifying
the source (gps|router|etc)) we might need to hook and implement
it specifically. This is probably all we need right now though:
TODO: Allow user to configure what name to use for the device_tracker.
"""

# Check if the device has been seen recently
rightnow = MONOTONIC_TIME()
if rightnow - device.last_seen > self.timeout_not_home:
location_name = "not_home"
if MONOTONIC_TIME() - self.timeout_not_home < device.last_seen:
device.zone = "home"
else:
location_name = "home"

# If mac is set, dt will:
# slugify the hostname (if set) or mac, and use that as the dev_id.
# Else:
# will slugify dev_id
# So, we will not set mac, but use bermuda_[mac] as dev_id and prefname
# for host_name.
device.zone = "not_home"

# If mac is set, device_tracker will override our dev_id
# with slugified (hostname OR mac). We don't want that
# since we want dev_id (the key in known_devices.yaml) to
# be stable, predictable and identifyably ours.
#
# So, we will not set mac, but use bermuda_[mac] as dev_id
# and prefname or user-supplied name for host_name.
await self.hass.services.async_call(
domain="device_tracker",
service="see",
service_data={
"dev_id": "bermuda_" + slugify(device.address),
# 'mac': device.address,
"host_name": device.prefname,
"location_name": location_name,
"location_name": device.zone,
},
)

def dt_mono_to_datetime(self, stamp):
def dt_mono_to_datetime(self, stamp) -> datetime:
"""Given a monotonic timestamp, convert to datetime object"""
age = MONOTONIC_TIME() - stamp
return now() - timedelta(seconds=age)

def dt_mono_to_age(self, stamp) -> str:
"""Convert monotonic timestamp to age (eg: "6 seconds ago")"""
return get_age(self.dt_mono_to_datetime(stamp))

def _refresh_areas_by_min_distance(self):
"""Set area for ALL devices based on closest beacon"""
for device in self.devices.values():
Expand All @@ -402,31 +458,34 @@ def _refresh_area_by_min_distance(self, device: BermudaDevice):
if closest_scanner is not None:
# We found a winner
device.area_id = closest_scanner.area_id
areas = self.ar.async_get_area(device.area_id).name # potentially a list.
areas = self.ar.async_get_area(device.area_id).name # potentially a list?!
if len(areas) == 1:
device.area_name = areas[0]
else:
# none or a list, perhaps...
device.area_name = areas

device.area_distance = closest_scanner.rssi_distance
else:
# Not close to any scanners!
device.area_id = None
device.area_name = None
device.area_distance = None

def _refresh_scanners(self, address=None):
def _refresh_scanners(self, scanners: list[BluetoothScannerDevice]):
"""Refresh our local list of scanners (BLE Proxies)"""
# FIXME: Really? This can't possibly be a sensible nesting of loops.
for dev_entry in self.hass.data["device_registry"].devices.data.values():
if len(dev_entry.connections) > 0:
addresses = set()
for scanner in scanners:
addresses.add(scanner.scanner.source.upper())
if len(addresses) > 0:
# FIXME: Really? This can't possibly be a sensible nesting of loops.
# should probably look at the API. Anyway, we are checking any devices
# that have a "mac" or "bluetooth" connection,
for dev_entry in self.hass.data["device_registry"].devices.data.values():
for dev_connection in dev_entry.connections:
if dev_connection[0] == "mac":
if address is None or address == dev_connection[1]:
found_address = dev_connection[1]
self.devices[found_address] = BermudaDevice()
scandev = self.devices[found_address]
scandev.address = found_address
if dev_connection[0] in ["mac", "bluetooth"]:
found_address = dev_connection[1].upper()
if found_address in addresses:
scandev = self._get_or_create_device(found_address)
scandev.area_id = dev_entry.area_id
scandev.entry_id = dev_entry.id
if dev_entry.name_by_user is not None:
Expand Down
Loading

0 comments on commit 648d0dd

Please sign in to comment.