Skip to content

Commit

Permalink
[libshortfin] Wire Python asyncio to IREE concurrency. (nod-ai#125)
Browse files Browse the repository at this point in the history
* Add user APIs to start workers on the System.
* Rework System destruction to have an explicit Shutdown phase. Use this
from Python in an atexit handler to explicitly shutdown any system
instances live at exit (needed to avoid various deadlocks and unsavory
things).
* Adds a specialized PyWorker with overrides to do Python accounting
(set up event loop on the thread, GIL maintenance).
* Add Worker call and call_threadsafe APIs.
* Create an AbstractEventLoop that is sufficient to launch coroutines on
a worker.
* Clean up various init and shutdown order issues to make use resilient.
* Skating towards having lightweight Process and queues to drive device
work, letting them oversubscribe worker threads as needed.

The same underlying APIs should be able to be made to work for C++
coroutines but that will be a separate effort.

---------

Co-authored-by: Ben Vanik <[email protected]>
  • Loading branch information
stellaraccident and benvanik authored Aug 16, 2024
1 parent 7ecf106 commit 6e5998f
Show file tree
Hide file tree
Showing 25 changed files with 1,170 additions and 165 deletions.
74 changes: 74 additions & 0 deletions libshortfin/bindings/python/_shortfin/asyncio_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2024 Advanced Micro Devices, Inc
#
# Licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

import asyncio
from collections.abc import Callable
from contextvars import Context
from typing_extensions import Unpack

from . import lib as sfl


class PyWorkerEventLoop(asyncio.AbstractEventLoop):
def __init__(self, worker: sfl.local.Worker):
self._worker = worker

def get_debug(self):
# Requirement of asyncio.
return False

def create_task(self, coro):
return asyncio.Task(coro, loop=self)

def create_future(self):
return asyncio.Future(loop=self)

def time(self) -> float:
return self._worker._now() / 1e9

def call_soon_threadsafe(self, callback, *args, context=None) -> asyncio.Handle:
def on_worker():
asyncio.set_event_loop(self)
return callback(*args)

self._worker.call_threadsafe(on_worker)
# TODO: Return future.

def call_soon(self, callback, *args, context=None) -> asyncio.Handle:
handle = _Handle(callback, args, self, context)
self._worker.call(handle._sf_maybe_run)
return handle

def call_later(
self, delay: float, callback, *args, context=None
) -> asyncio.TimerHandle:
w = self._worker
deadline = w._delay_to_deadline_ns(delay)
handle = _TimerHandle(deadline / 1e9, callback, args, self, context)
w.delay_call(deadline, handle._sf_maybe_run)
return handle

def call_exception_handler(self, context) -> None:
# TODO: Should route this to the central exception handler.
raise RuntimeError(f"Async exception on {self._worker}: {context}")

def _timer_handle_cancelled(self, handle):
# We don't do anything special: just skip it if it comes up.
pass


class _Handle(asyncio.Handle):
def _sf_maybe_run(self):
if self.cancelled():
return
self._run()


class _TimerHandle(asyncio.TimerHandle):
def _sf_maybe_run(self):
if self.cancelled():
return
self._run()
Loading

0 comments on commit 6e5998f

Please sign in to comment.