Skip to content
This repository has been archived by the owner on Oct 26, 2023. It is now read-only.

Feature: On demand device polling #48

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ See [Wiki](https://github.com/zewelor/bt-mqtt-gateway/wiki) for more information
* Highly extensible via custom workers
* Data publication via MQTT
* Configurable topic and payload
* Support for on-demand device polling
* MQTT authentication support
* Systemd service
* Reliable and intuitive
Expand Down Expand Up @@ -100,6 +101,18 @@ Use mosquitto_sub to print all messages
mosquitto_sub -h localhost -d -t # command also help for me to test MQTT messages
```

**On-Demand Polling**
To poll a device on demand (in addition to it's `update_interval`), publish an empty message at the `/poll` topic of the device.
I.E:
```
# Will poll the scale
mosquitto_pub -h localhost -t 'miscale/poll' -m ''
# Will poll all MiFlora devices
mosquitto_pub -h localhost -t 'miflora/poll' -m ''
# Will poll the specified MiFlora device
mosquitto_pub -h localhost -t 'miflora/herbs/poll' -m ''
```

## Custom worker development


Expand Down
4 changes: 3 additions & 1 deletion workers/blescanmulti.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ def searchmac(self, devices, mac):

return None

def status_update(self):
def status_update(self, poll_device=None):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be generalized to something like: device_filter=None , which will act as filter for self.devices, then that if and continue wont be needed inside each loop. When self.devices will be a getter method ( see other comment ) it could take array to filter devices and return only filtered subset.

scanner = Scanner().withDelegate(ScanDelegate())
devices = scanner.scan(10.0)
ret = []

for name, mac in self.devices.items():
if poll_device and name != poll_device:
continue
device = self.searchmac(devices, mac)
if device is None:
ret.append(MqttMessage(topic=self.format_topic('presence/'+name), payload="0"))
Expand Down
5 changes: 3 additions & 2 deletions workers/miflora.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ def _setup(self):
for name, mac in self.devices.items():
self.devices[name] = MiFloraPoller(mac, BluepyBackend, cache_timeout=0)

def status_update(self):
def status_update(self, poll_device=None):
ret = []
for name, poller in self.devices.items():
if poll_device and name != poll_device:
continue
try:
ret += self.update_device_state(name, poller)
except RuntimeError:
Expand All @@ -26,7 +28,6 @@ def status_update(self):
return ret

@timeout(8.0)

def update_device_state(self, name, poller):

ret = []
Expand Down
4 changes: 3 additions & 1 deletion workers/mithermometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ def _setup(self):
for name, mac in self.devices.items():
self.devices[name] = MiThermometerPoller(mac, BluepyBackend)

def status_update(self):
def status_update(self, poll_device=None):
ret = []
for name, poller in self.devices.items():
if poll_device and name != poll_device:
continue
try:
ret += self.update_device_state(name, poller)
except RuntimeError:
Expand Down
4 changes: 3 additions & 1 deletion workers/thermostat.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ def _setup(self):

self._modes_mapper = self.ModesMapper()

def status_update(self):
def status_update(self, poll_device=None):
from bluepy import btle

ret = []
for name, thermostat in self.devices.items():
if poll_device and name != poll_device:
continue
try:
ret += self.update_device_state(name, thermostat)
except (RuntimeError, btle.BTLEException):
Expand Down
37 changes: 37 additions & 0 deletions workers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ def register_workers(self, config):
partial(self._queue_command, command), 'interval',
seconds=worker_config['update_interval'],
)

_LOGGER.debug("Adding on-demand polling for %s" % (worker_name))
poll_topics = []
poll_topics.append(worker_obj.format_topic('+', 'poll'))
poll_topics.append(worker_obj.format_topic('poll'))
for topic in poll_topics:
self._mqtt_callbacks.append((
topic,
partial(self._poll_wrapper, worker_obj)
))
elif hasattr(worker_obj, 'run'):
_LOGGER.debug("Registered: %s as daemon" % (worker_name))
self._daemons.append(worker_obj)
Expand Down Expand Up @@ -100,6 +110,33 @@ def _pip_install_helper(package_names):
for package in package_names:
pip_main(['install', '-q', package])

def _poll_wrapper(self, worker_obj, client, userdata, c):
if c.payload.decode('utf-8') != '':
return
poll_device_supported = 'poll_device' in worker_obj.status_update.__code__.co_varnames
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also benefit from some devices getter. When there is None / not defined / only one device ( ? ) return from worker_obj.devices then it does not support

topic_prefixes = []
topic_prefixes.append(userdata.get('global_topic_prefix', ''))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use some topic helpers, to be sure it wont break after some refactoring.

def _format_topic(self, topic):

def format_topic(self, *args):

topic_prefixes.append(getattr(worker_obj, 'topic_prefix', ''))
topic_prefixes.append('poll')
device_name = str(c.topic)
for suffix in topic_prefixes:
if suffix:
device_name = str(device_name).replace(suffix, '').replace('/', '')
device_name = device_name if device_name else None
if not poll_device_supported and device_name:
_LOGGER.debug("This worker does not support polling specific devices."
" Ignoring: %s", c.topic)
return
elif not poll_device_supported:
poll_args = []
elif device_name not in getattr(worker_obj, 'devices', []):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some getter for devices and use it, instead of accessing variable directly.

_LOGGER.debug("Device not found. Ignoring: %s", c.topic)
return
else:
poll_args = [device_name]
_LOGGER.debug("Poll initiated for %s", c.topic)
self._queue_command(self.Command(worker_obj.status_update, poll_args))

def _on_command_wrapper(self, worker_obj, client, userdata, c):
_LOGGER.debug("on command wrapper for with %s: %s", c.topic, c.payload)
global_topic_prefix = userdata['global_topic_prefix']
Expand Down