Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lifespan #25

Merged
merged 7 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 116 additions & 2 deletions aioclock/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@
Another way to modulize your code is to use `Group` which is kinda the same idea as router in web frameworks.
"""

from __future__ import annotations

import asyncio
import sys
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, Union
from typing import (
Any,
AsyncContextManager,
Callable,
ContextManager,
Optional,
TypeVar,
Union,
)

import anyio

Expand All @@ -17,6 +27,11 @@
else:
from typing import ParamSpec

if sys.version_info < (3, 11):
from typing_extensions import assert_never
else:
from typing import assert_never

from asyncer import asyncify
from fast_depends import inject

Expand Down Expand Up @@ -58,14 +73,93 @@ async def main():
asyncio.run(app.serve())
```

## Lifespan

You can define this startup and shutdown logic using the lifespan parameter of the AioClock instance.
It should be as an AsyncContextManager which get AioClock application as arguement.
You can find the example below.

Example:
```python
import asyncio
from contextlib import asynccontextmanager

from aioclock import AioClock

ML_MODEL = [] # just some imaginary component that needs to be started and stopped


@asynccontextmanager
async def lifespan(app: AioClock):
ML_MODEL.append(2)
print("UP!")
yield app
ML_MODEL.clear()
print("DOWN!")


app = AioClock(lifespan=lifespan)


if __name__ == "__main__":
asyncio.run(app.serve())
```

Here we are simulating the expensive startup operation of loading the model by putting the (fake)
model function in the dictionary with machine learning models before the yield.
This code will be executed before the application starts operationg, during the startup.

And then, right after the yield, we unload the model.
This code will be executed after the application finishes handling requests, right before the shutdown.
This could, for example, release resources like memory, a GPU or some database connection.

It would also happen when you're stopping your application gracefully, for example, when you're shutting down your container.

Lifespan could also be synchronus context manager. Check the example below.


Example:
```python
from contextlib import contextmanager

from aioclock import AioClock

ML_MODEL = []

@contextmanager
def lifespan_sync(sync_app: AioClock):
ML_MODEL.append(2)
print("UP!")
yield sync_app
ML_MODEL.clear()
print("DOWN!")

sync_app = AioClock(lifespan=lifespan_sync)

if __name__ == "__main__":
asyncio.run(app.serve())
```

"""

def __init__(self, limiter: Optional[anyio.CapacityLimiter] = None):
def __init__(
self,
*,
lifespan: Optional[
Callable[[AioClock], AsyncContextManager[AioClock] | ContextManager[AioClock]]
] = None,
limiter: Optional[anyio.CapacityLimiter] = None,
):
"""
Initialize AioClock instance.
No parameters are needed.

Attributes:
lifespan:
A context manager that will be used to handle the startup and shutdown of the application.
If not provided, the application will run without any startup and shutdown logic.
To understand it better, check the examples and documentation above.

limiter:
Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running
Limiter that will be used to limit the number of tasks that are running at the same time.
Expand All @@ -76,6 +170,7 @@ def __init__(self, limiter: Optional[anyio.CapacityLimiter] = None):
self._groups: list[Group] = []
self._app_tasks: list[Task] = []
self._limiter = limiter
self.lifespan = lifespan

_groups: list[Group]
"""List of groups that will be run by AioClock."""
Expand Down Expand Up @@ -220,6 +315,25 @@ async def serve(self) -> None:
group = Group()
group._tasks = self._app_tasks
self.include_group(group)

if self.lifespan is None:
await self._run_tasks()
return

ctx = self.lifespan(self)

if isinstance(ctx, AsyncContextManager):
async with ctx:
await self._run_tasks()

elif isinstance(ctx, ContextManager):
with ctx:
await self._run_tasks()

else:
assert_never(ctx)

async def _run_tasks(self) -> None:
try:
await asyncio.gather(
*(task.run() for task in self._get_startup_task()), return_exceptions=False
Expand Down
7 changes: 7 additions & 0 deletions aioclock/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from croniter import croniter
from dateutil.relativedelta import relativedelta
from pydantic import BaseModel, Field, PositiveInt, model_validator
from typing_extensions import deprecated

from aioclock.custom_types import PositiveNumber, Triggers

Expand Down Expand Up @@ -236,6 +237,9 @@ async def get_waiting_time_till_next_trigger(self):
return None


@deprecated(
"Use `lifespan` instead of using Triggers for startup/shutdown events. This feature be removed in version 1.0.0"
)
class OnStartUp(LoopController[Literal[Triggers.ON_START_UP]]):
"""Just like Once, but it triggers the event only once, when the application starts up.

Expand Down Expand Up @@ -263,6 +267,9 @@ async def get_waiting_time_till_next_trigger(self):
return None


@deprecated(
"Use `lifespan` instead of using Triggers for startup/shutdown events. This feature be removed in version 1.0.0"
)
class OnShutDown(LoopController[Literal[Triggers.ON_SHUT_DOWN]]):
"""Just like Once, but it triggers the event only once, when the application shuts down.

Expand Down
15 changes: 11 additions & 4 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ Conditions or events that initiate task execution. It includes
- **Every**: Repeats a task at regular intervals.
- **At**: Executes a task at a specified time.
- **Once**: Runs a task a single time.
- **OnStartUp**: Runs when the application starts.
- **OnShutDown**: Executes during application shutdown.
- **OnStartUp**: Runs when the application starts. (DEPRECATED in favor of lifespan)
- **OnShutDown**: Executes during application shutdown. (DEPRECATED in favor of lifespan)
- **Forever**: Continuously runs a task in an infinite loop.
- **Cron**: Uses cron syntax for scheduling.
- **OrTrigger**: Initiate with a list of triggers, and executes when at least one of the included triggers activate.
Expand All @@ -43,7 +43,11 @@ The engine that monitors and executes tasks according to their triggers, ensurin

The entry point that starts the AioClock application. It initiates the task runner, which monitors and executes tasks based on their triggers.

### 8. Callable
### 8. Lifespan

A context manager that will be used to handle the startup and shutdown of the application. It is used inside AioClock application.

### 9. Callable

A function or method associated with a task, executed when the task's trigger condition is met.

Expand All @@ -53,14 +57,17 @@ A function or method associated with a task, executed when the task's trigger co

![Ownership Diagram](images/ownership-diagram.png)

The diagram outlines the core structure of an AioClock application. It shows how the application organizes tasks using dependency injection and logical grouping. Tasks are defined with specific triggers and callables, making them modular and easy to manage. The flow between components, like including groups or using decorators, highlights the framework's flexibility in an asynchronous environment. Overall, the architecture is designed for clarity, promoting clean, organized code while allowing for scalable task management.
In AioClock, tasks are managed through clear ownership within groups, using dependency injection. Groups encapsulate related tasks, each with specific triggers and callables. The include_group() function integrates these groups into the AioClock application, while standalone tasks are managed with decorators like @aioclock.task. This structure ensures that tasks are organized, maintainable, and easy to scale, with each component having a defined responsibility within the application.
The dependency injection system in AioClock allows you to override a callable with another through the application interface, facilitating testing. For example, instead of returning a session from a PostgreSQL database with get_session, you can override it to use get_sqlite_session, which provides a SQLite session instead. This flexibility makes it easier to swap out components for testing or other purposes without changing the core logic.

### Aioclock LifeCycle

![Aioclock LifeCycle](images/lifecycle-diagram.png)

This diagram shows the lifecycle of an AioClock application. It starts with the app.serve() call, which gathers all tasks and groups. The application then checks for startup tasks, runs them, and proceeds to other tasks. If a shutdown task is detected, it's executed before the application gracefully exits. If no shutdown tasks are present, the application simply exits. The diagram ensures a clear, step-by-step process for task management within the AioClock framework, ensuring tasks are executed in the proper order.

P.S: Since startup and shutdown task are deprcated, lifespan has same side effect as them, with extra benefit of having them with a shared memory state. Please reffer to lifespan API Documentation to understand it better with examples.

### Task Runner Execution Flow

![Task Runner Execution Flow](images/task-runner-diagram.png)
Expand Down
29 changes: 22 additions & 7 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ def exec_example(example: str) -> None:
raise Exception(f"Failed to execute example:\n\n{example}")


def process_object(name: str, obj: object, module_name: str, library_name: str) -> None:
"""Process a single object, extracting and executing code examples."""
docstring = inspect.getdoc(obj)
if docstring:
examples = extract_code_blocks(docstring)
for example in examples:
exec_example(example)

# Recursively process class methods only if they belong to the same library
if inspect.isclass(obj):
for method_name, method_obj in inspect.getmembers(obj, inspect.isfunction):
if method_obj.__module__ and method_obj.__module__.startswith(library_name):
process_object(f"{name}.{method_name}", method_obj, module_name, library_name)


def process_module(module_path: str) -> None:
"""Process a single Python module."""
module_name = os.path.splitext(os.path.basename(module_path))[0]
Expand All @@ -53,13 +68,13 @@ def process_module(module_path: str) -> None:
sys.modules[module_name] = module
spec.loader.exec_module(module)

for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) or inspect.isclass(obj):
docstring = inspect.getdoc(obj)
if docstring:
examples = extract_code_blocks(docstring)
for example in examples:
exec_example(example)
library_name = "aioclock"

for name, obj in inspect.getmembers(
module, lambda o: inspect.isfunction(o) or inspect.isclass(o)
):
if obj.__module__ and obj.__module__.startswith(library_name):
process_object(name, obj, module_name, library_name)


def process_markdown(markdown_path: str) -> None:
Expand Down
65 changes: 65 additions & 0 deletions tests/test_lifespan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from contextlib import asynccontextmanager, contextmanager

import pytest

from aioclock import AioClock
from aioclock.triggers import Once

ML_MODEL_ASYNC = []
RAN_ONCE_TASK_ASYNC = False


@asynccontextmanager
async def lifespan(app: AioClock):
ML_MODEL_ASYNC.append(2)
yield app
ML_MODEL_ASYNC.clear()


app = AioClock(lifespan=lifespan)


@app.task(trigger=Once())
async def main():
assert len(ML_MODEL_ASYNC) == 1
global RAN_ONCE_TASK_ASYNC
RAN_ONCE_TASK_ASYNC = True


@pytest.mark.asyncio
async def test_lifespan_e2e_async():
assert len(ML_MODEL_ASYNC) == 0
assert RAN_ONCE_TASK_ASYNC is False
await app.serve() # asserts are in the task
assert len(ML_MODEL_ASYNC) == 0 # clean up done
assert RAN_ONCE_TASK_ASYNC is True # task ran


ML_MODEL_SYNC = [] # just some imaginary component that needs to be started and stopped
RAN_ONCE_TASK_SYNC = False


@contextmanager
def lifespan_sync(sync_app: AioClock):
ML_MODEL_SYNC.append(2)
yield sync_app
ML_MODEL_SYNC.clear()


sync_app = AioClock(lifespan=lifespan_sync)


@sync_app.task(trigger=Once())
def sync_main():
assert len(ML_MODEL_SYNC) == 1
global RAN_ONCE_TASK_SYNC
RAN_ONCE_TASK_SYNC = True


@pytest.mark.asyncio
async def test_lifespan_e2e_sync():
assert len(ML_MODEL_SYNC) == 0
assert RAN_ONCE_TASK_SYNC is False
await sync_app.serve() # asserts are in the task
assert len(ML_MODEL_SYNC) == 0 # clean up done
assert RAN_ONCE_TASK_SYNC is True # task ran
Loading