diff --git a/aioclock/app.py b/aioclock/app.py index 527621a..6a4b0b7 100644 --- a/aioclock/app.py +++ b/aioclock/app.py @@ -5,10 +5,20 @@ Another way to modulize your code is to use `Group` which is kinda the same idea as router in web frameworks. """ +from __future__ import annotations + import asyncio import sys from functools import wraps -from typing import Any, Callable, Optional, TypeVar, Union +from typing import ( + Any, + AsyncContextManager, + Callable, + ContextManager, + Optional, + TypeVar, + Union, +) import anyio @@ -17,6 +27,11 @@ else: from typing import ParamSpec +if sys.version_info < (3, 11): + from typing_extensions import assert_never +else: + from typing import assert_never + from asyncer import asyncify from fast_depends import inject @@ -58,14 +73,93 @@ async def main(): asyncio.run(app.serve()) ``` + ## Lifespan + + You can define this startup and shutdown logic using the lifespan parameter of the AioClock instance. + It should be as an AsyncContextManager which get AioClock application as arguement. + You can find the example below. + + Example: + ```python + import asyncio + from contextlib import asynccontextmanager + + from aioclock import AioClock + + ML_MODEL = [] # just some imaginary component that needs to be started and stopped + + + @asynccontextmanager + async def lifespan(app: AioClock): + ML_MODEL.append(2) + print("UP!") + yield app + ML_MODEL.clear() + print("DOWN!") + + + app = AioClock(lifespan=lifespan) + + + if __name__ == "__main__": + asyncio.run(app.serve()) + ``` + + Here we are simulating the expensive startup operation of loading the model by putting the (fake) + model function in the dictionary with machine learning models before the yield. + This code will be executed before the application starts operationg, during the startup. + + And then, right after the yield, we unload the model. + This code will be executed after the application finishes handling requests, right before the shutdown. + This could, for example, release resources like memory, a GPU or some database connection. + + It would also happen when you're stopping your application gracefully, for example, when you're shutting down your container. + + Lifespan could also be synchronus context manager. Check the example below. + + + Example: + ```python + from contextlib import contextmanager + + from aioclock import AioClock + + ML_MODEL = [] + + @contextmanager + def lifespan_sync(sync_app: AioClock): + ML_MODEL.append(2) + print("UP!") + yield sync_app + ML_MODEL.clear() + print("DOWN!") + + sync_app = AioClock(lifespan=lifespan_sync) + + if __name__ == "__main__": + asyncio.run(app.serve()) + ``` + """ - def __init__(self, limiter: Optional[anyio.CapacityLimiter] = None): + def __init__( + self, + *, + lifespan: Optional[ + Callable[[AioClock], AsyncContextManager[AioClock] | ContextManager[AioClock]] + ] = None, + limiter: Optional[anyio.CapacityLimiter] = None, + ): """ Initialize AioClock instance. No parameters are needed. Attributes: + lifespan: + A context manager that will be used to handle the startup and shutdown of the application. + If not provided, the application will run without any startup and shutdown logic. + To understand it better, check the examples and documentation above. + limiter: Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running Limiter that will be used to limit the number of tasks that are running at the same time. @@ -76,6 +170,7 @@ def __init__(self, limiter: Optional[anyio.CapacityLimiter] = None): self._groups: list[Group] = [] self._app_tasks: list[Task] = [] self._limiter = limiter + self.lifespan = lifespan _groups: list[Group] """List of groups that will be run by AioClock.""" @@ -220,6 +315,25 @@ async def serve(self) -> None: group = Group() group._tasks = self._app_tasks self.include_group(group) + + if self.lifespan is None: + await self._run_tasks() + return + + ctx = self.lifespan(self) + + if isinstance(ctx, AsyncContextManager): + async with ctx: + await self._run_tasks() + + elif isinstance(ctx, ContextManager): + with ctx: + await self._run_tasks() + + else: + assert_never(ctx) + + async def _run_tasks(self) -> None: try: await asyncio.gather( *(task.run() for task in self._get_startup_task()), return_exceptions=False diff --git a/aioclock/triggers.py b/aioclock/triggers.py index e566bc2..28c1cd1 100644 --- a/aioclock/triggers.py +++ b/aioclock/triggers.py @@ -22,6 +22,7 @@ from croniter import croniter from dateutil.relativedelta import relativedelta from pydantic import BaseModel, Field, PositiveInt, model_validator +from typing_extensions import deprecated from aioclock.custom_types import PositiveNumber, Triggers @@ -236,6 +237,9 @@ async def get_waiting_time_till_next_trigger(self): return None +@deprecated( + "Use `lifespan` instead of using Triggers for startup/shutdown events. This feature be removed in version 1.0.0" +) class OnStartUp(LoopController[Literal[Triggers.ON_START_UP]]): """Just like Once, but it triggers the event only once, when the application starts up. @@ -263,6 +267,9 @@ async def get_waiting_time_till_next_trigger(self): return None +@deprecated( + "Use `lifespan` instead of using Triggers for startup/shutdown events. This feature be removed in version 1.0.0" +) class OnShutDown(LoopController[Literal[Triggers.ON_SHUT_DOWN]]): """Just like Once, but it triggers the event only once, when the application shuts down. diff --git a/docs/overview.md b/docs/overview.md index 920b4a2..e45f53a 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -25,8 +25,8 @@ Conditions or events that initiate task execution. It includes - **Every**: Repeats a task at regular intervals. - **At**: Executes a task at a specified time. - **Once**: Runs a task a single time. -- **OnStartUp**: Runs when the application starts. -- **OnShutDown**: Executes during application shutdown. +- **OnStartUp**: Runs when the application starts. (DEPRECATED in favor of lifespan) +- **OnShutDown**: Executes during application shutdown. (DEPRECATED in favor of lifespan) - **Forever**: Continuously runs a task in an infinite loop. - **Cron**: Uses cron syntax for scheduling. - **OrTrigger**: Initiate with a list of triggers, and executes when at least one of the included triggers activate. @@ -43,7 +43,11 @@ The engine that monitors and executes tasks according to their triggers, ensurin The entry point that starts the AioClock application. It initiates the task runner, which monitors and executes tasks based on their triggers. -### 8. Callable +### 8. Lifespan + +A context manager that will be used to handle the startup and shutdown of the application. It is used inside AioClock application. + +### 9. Callable A function or method associated with a task, executed when the task's trigger condition is met. @@ -53,7 +57,8 @@ A function or method associated with a task, executed when the task's trigger co ![Ownership Diagram](images/ownership-diagram.png) -The diagram outlines the core structure of an AioClock application. It shows how the application organizes tasks using dependency injection and logical grouping. Tasks are defined with specific triggers and callables, making them modular and easy to manage. The flow between components, like including groups or using decorators, highlights the framework's flexibility in an asynchronous environment. Overall, the architecture is designed for clarity, promoting clean, organized code while allowing for scalable task management. +In AioClock, tasks are managed through clear ownership within groups, using dependency injection. Groups encapsulate related tasks, each with specific triggers and callables. The include_group() function integrates these groups into the AioClock application, while standalone tasks are managed with decorators like @aioclock.task. This structure ensures that tasks are organized, maintainable, and easy to scale, with each component having a defined responsibility within the application. +The dependency injection system in AioClock allows you to override a callable with another through the application interface, facilitating testing. For example, instead of returning a session from a PostgreSQL database with get_session, you can override it to use get_sqlite_session, which provides a SQLite session instead. This flexibility makes it easier to swap out components for testing or other purposes without changing the core logic. ### Aioclock LifeCycle @@ -61,6 +66,8 @@ The diagram outlines the core structure of an AioClock application. It shows how This diagram shows the lifecycle of an AioClock application. It starts with the app.serve() call, which gathers all tasks and groups. The application then checks for startup tasks, runs them, and proceeds to other tasks. If a shutdown task is detected, it's executed before the application gracefully exits. If no shutdown tasks are present, the application simply exits. The diagram ensures a clear, step-by-step process for task management within the AioClock framework, ensuring tasks are executed in the proper order. +P.S: Since startup and shutdown task are deprcated, lifespan has same side effect as them, with extra benefit of having them with a shared memory state. Please reffer to lifespan API Documentation to understand it better with examples. + ### Task Runner Execution Flow ![Task Runner Execution Flow](images/task-runner-diagram.png) diff --git a/tests/test_examples.py b/tests/test_examples.py index 247803c..39636dd 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -42,6 +42,21 @@ def exec_example(example: str) -> None: raise Exception(f"Failed to execute example:\n\n{example}") +def process_object(name: str, obj: object, module_name: str, library_name: str) -> None: + """Process a single object, extracting and executing code examples.""" + docstring = inspect.getdoc(obj) + if docstring: + examples = extract_code_blocks(docstring) + for example in examples: + exec_example(example) + + # Recursively process class methods only if they belong to the same library + if inspect.isclass(obj): + for method_name, method_obj in inspect.getmembers(obj, inspect.isfunction): + if method_obj.__module__ and method_obj.__module__.startswith(library_name): + process_object(f"{name}.{method_name}", method_obj, module_name, library_name) + + def process_module(module_path: str) -> None: """Process a single Python module.""" module_name = os.path.splitext(os.path.basename(module_path))[0] @@ -53,13 +68,13 @@ def process_module(module_path: str) -> None: sys.modules[module_name] = module spec.loader.exec_module(module) - for name, obj in inspect.getmembers(module): - if inspect.isfunction(obj) or inspect.isclass(obj): - docstring = inspect.getdoc(obj) - if docstring: - examples = extract_code_blocks(docstring) - for example in examples: - exec_example(example) + library_name = "aioclock" + + for name, obj in inspect.getmembers( + module, lambda o: inspect.isfunction(o) or inspect.isclass(o) + ): + if obj.__module__ and obj.__module__.startswith(library_name): + process_object(name, obj, module_name, library_name) def process_markdown(markdown_path: str) -> None: diff --git a/tests/test_lifespan.py b/tests/test_lifespan.py new file mode 100644 index 0000000..015be63 --- /dev/null +++ b/tests/test_lifespan.py @@ -0,0 +1,65 @@ +from contextlib import asynccontextmanager, contextmanager + +import pytest + +from aioclock import AioClock +from aioclock.triggers import Once + +ML_MODEL_ASYNC = [] +RAN_ONCE_TASK_ASYNC = False + + +@asynccontextmanager +async def lifespan(app: AioClock): + ML_MODEL_ASYNC.append(2) + yield app + ML_MODEL_ASYNC.clear() + + +app = AioClock(lifespan=lifespan) + + +@app.task(trigger=Once()) +async def main(): + assert len(ML_MODEL_ASYNC) == 1 + global RAN_ONCE_TASK_ASYNC + RAN_ONCE_TASK_ASYNC = True + + +@pytest.mark.asyncio +async def test_lifespan_e2e_async(): + assert len(ML_MODEL_ASYNC) == 0 + assert RAN_ONCE_TASK_ASYNC is False + await app.serve() # asserts are in the task + assert len(ML_MODEL_ASYNC) == 0 # clean up done + assert RAN_ONCE_TASK_ASYNC is True # task ran + + +ML_MODEL_SYNC = [] # just some imaginary component that needs to be started and stopped +RAN_ONCE_TASK_SYNC = False + + +@contextmanager +def lifespan_sync(sync_app: AioClock): + ML_MODEL_SYNC.append(2) + yield sync_app + ML_MODEL_SYNC.clear() + + +sync_app = AioClock(lifespan=lifespan_sync) + + +@sync_app.task(trigger=Once()) +def sync_main(): + assert len(ML_MODEL_SYNC) == 1 + global RAN_ONCE_TASK_SYNC + RAN_ONCE_TASK_SYNC = True + + +@pytest.mark.asyncio +async def test_lifespan_e2e_sync(): + assert len(ML_MODEL_SYNC) == 0 + assert RAN_ONCE_TASK_SYNC is False + await sync_app.serve() # asserts are in the task + assert len(ML_MODEL_SYNC) == 0 # clean up done + assert RAN_ONCE_TASK_SYNC is True # task ran