Skip to content

Commit

Permalink
smartplugin: Implemented put_command_to_run_queue() and get_command_f…
Browse files Browse the repository at this point in the history
…rom_run_queue()
  • Loading branch information
msinn committed May 21, 2024
1 parent f2e55ff commit 789731b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 10 deletions.
34 changes: 34 additions & 0 deletions doc/user/source/referenz/plugins/asyncio_support.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

.. index:: Referenz; Asyncio in Plugins
.. index:: Asyncio in Plugins
.. index:: Plugins; Asyncio in Plugins

.. role:: redsup
.. role:: bluesup
.. role:: greensup
.. role:: blacksup



asyncio Unterstützung in Plugins :redsup:`Neu`
==============================================

Python Packages zur Ansteuerung von Peripherie werden zunehmend unter Verwendung von asyncio erstellt. Um diese
Packages in Plugins nutzen zu können, muss das jeweilige Plugin asyncio unterstützen. Die Implementierung hiervon
ist nicht tirvial und die ersten Plugins die asyncio implementiert haben, tun das sehr unterschiedlich und unter
Nutzung der Low-Level Funktionen von asyncio (was ein erhebliches Error-Handling im Plugin voraussetzt).

Um die Nutzung von asyncio zu vereinfachen und in den Plugins zu standardisieren, unterstützt SmartHomeNG ab
v1.11 die Nutzung von asyncio in Plugins durch eine Erweiterung der SmartPlugin Klasse um einige Methoden.

|
Methoden zur Unterstützung von asyncio
--------------------------------------

.. autoclass:: lib.model.smartplugin.SmartPlugin
:members: asyncio_state, start_asyncio, stop_asyncio, run_asyncio_coro, list_asyncio_tasks
:undoc-members:
:show-inheritance:
:member-order: bysource

1 change: 1 addition & 0 deletions doc/user/source/referenz/plugins/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Die möglichen Einträge für die Definition der Metadaten der Plugins sind hier
:glob:
:titlesonly:

asyncio_support
plugin_typen/plugin_typen
plugin_metadata
plugin_user_doc
Expand Down
67 changes: 57 additions & 10 deletions lib/model/smartplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ def scheduler_get(self, name):
_asyncio_loop = None # eventloop of the plugin
_asyncio_state = 'unused'
_used_plugin_coro = None # plugin coro used when calling start_asyncio (to be able to used by a generic 'restart asyncio' method
run_queue = None # queue to send commends to the main-coro/plugin-coro
_run_queue = None # queue to send commends to the main-coro/plugin-coro

def asyncio_state(self) -> str:
"""
Expand Down Expand Up @@ -1072,13 +1072,9 @@ def stop_asyncio(self) -> None:
"""
self.logger.info("Shutting down asyncio loop and thread...")
if self._asyncio_loop is not None:
try:
# Send termination command to plugin_coro to stop the plugin
asyncio.run_coroutine_threadsafe(self.run_queue.put('STOP'), self._asyncio_loop)
except Exception as e:
self.logger.notice(f"stop_asyncio: Exception '{e}' in run_queue.put ({self._asyncio_loop=})")
time.sleep(3)

self.put_command_to_run_queue('STOP')
time.sleep(3)
try:
self.pluginThread.join()
self.logger.debug("_asyncio_loop_thread of plugin stopped")
Expand Down Expand Up @@ -1113,7 +1109,7 @@ async def _asyncio_main(self, plugin_coro):
task = asyncio.current_task()
task.set_name('MainTask')
# Create queue to send termination command to plugin_coro when the plugin should be stopped
self.run_queue = asyncio.Queue()
self._run_queue = asyncio.Queue()

# Create the main task of the plugin and await it
self.task = asyncio.create_task(plugin_coro, name='PluginTask')
Expand Down Expand Up @@ -1151,9 +1147,60 @@ def run_asyncio_coro(self, coro, return_exeption=False):
self.logger.exception(f"run_asyncio_coro: Exception {ex} ({coro=}, loop={self._asyncio_loop})")
return result

def put_command_to_run_queue(self, command: str) -> None:
"""
Put an entry to the run-queue
"""
if self._asyncio_loop is not None:
self.logger.info(f"Writing command '{command}' to run-queue")
try:
# Send a command to plugin_coro
asyncio.run_coroutine_threadsafe(self._run_queue.put(command), self._asyncio_loop)
except Exception as e:
self.logger.warning(f"put_command_to_run_queue: Exception '{e}' in _run_queue.put ({self._asyncio_loop=})")
time.sleep(3)
else:
self.logger.warning(f"put_command_to_run_queue: Cannot write command '{command}' to run-queue, no active event-loop")
return

async def get_command_from_run_queue(self) -> str:
"""
Get an entry from the run-queue
At the moment this is used, to block the plugin_coro until the plugin should be stopped.
When the plugin should be stopped, a string 'STOP' is written into the queue and the plugin_coro can check for
the string 'STOP' and terminate itself.
:return: First command from the queue
"""
queue_item = await self._run_queue.get()
return queue_item

async def wait_for_asyncio_termination(self) -> None:
"""
Wait for the command to stop the plugin_coro
This is used to block the plugin_coro until the plugin should be stopped.
When the plugin should be stopped, a string 'STOP' is written into the queue
:return:
"""
queue_command = ''
while queue_command != 'STOP':
queue_command = await self._run_queue.get()
if queue_command != 'STOP':
# put command back to queue?
await asyncio.sleep(0.1)

return

async def list_asyncio_tasks(self):
"""
Log al list of the tasks that are in the eventloop
Log a list of the tasks that are in the eventloop
The intention of this method is to support the plugin development/debugging. It can be called
from the executor plugin or from the evaql-syntax-chacker of the admin gui
"""
self.logger.notice("list_asyncio_tasks: Task list")
tasks = asyncio.all_tasks()
Expand Down

0 comments on commit 789731b

Please sign in to comment.