From 3619a0ee77fdaa4197e6173485070cf8b8c5d967 Mon Sep 17 00:00:00 2001 From: Alberto Mulone Date: Sat, 6 Jan 2024 20:31:50 +0100 Subject: [PATCH] last try --- .github/workflows/ci-tests.yaml | 9 ++- docs/source/ext/database.rst | 110 ++++++++++++++++++++++++++++++-- 2 files changed, 114 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 7e688fe59..17ebc3d37 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -147,10 +147,17 @@ jobs: python -m pip install -r docs/requirements.txt - name: "Build documentation and check for consistency" env: - CHECKSUM: "e1fe7e1bf81fd1ccc9d8271547213d17a47c2bbae588709b97ff072cb9ce2a01" + CHECKSUM: "f9d19e96da56a0460a248feb9e946f5f28ea4a8de1c32345e4e60e14f25beb96" run: | cd docs/ HASH="$(make checksum | tail -n1)" + find build/html/ \ + -not -name 'searchindex.js' \ + -not -name '*.woff' \ + -not -name '*.woff2' \ + -type f -print0 | \ + sort -zd | \ + xargs -r0 sha256sum echo "Docs checksum is ${HASH}" test "${HASH}" == "${CHECKSUM}" test-flux: diff --git a/docs/source/ext/database.rst b/docs/source/ext/database.rst index 08816fb80..35b5cf28f 100644 --- a/docs/source/ext/database.rst +++ b/docs/source/ext/database.rst @@ -23,9 +23,12 @@ StreamFlow relies on a persistent ``Database`` to store all the metadata regardi ) -> None: ... -Each ``PersistableEntity`` is identified by a unique numerical ``persistent_id`` related to the corresponding ``Database`` record. Two methods, ``save`` and ``load``, allow persisting the entity in the ``Database`` and retrieving it from the persistent record. Note that ``load`` is a class method, as it must construct a new instance. +Each ``PersistableEntity`` is identified by a unique numerical ``persistent_id`` related to the corresponding ``Database`` record. Two methods, ``save`` and ``load``, allow persisting the entity in the ``Database`` and retrieving it from the persistent record. Note that ``load`` is a class method, as it must construct a new instance, furthermore it does not assign the ``persistent_id``. -The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context``. The latter keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references. +The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context`` (see :ref:`DatabaseLoadingContext `). + +Persistence +=========== The ``Database`` interface, defined in the ``streamflow.core.persistence`` module, contains all the methods to create, modify, and retrieve this metadata. Data deletion is unnecessary, as StreamFlow never removes existing records. Internally, the ``save`` and ``load`` methods call one or more of these methods to perform the desired operations. @@ -230,8 +233,9 @@ Each ``get_data`` method receives in input the identifier (commonly the ``persis The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections. + Implementations -=============== +--------------- ====== ============================================ Type Class @@ -246,4 +250,102 @@ By default, StreamFlow uses a local ``SqliteDatabase`` instance for metadata per The database schema is structured as follows: .. literalinclude:: ../../../streamflow/persistence/schemas/sqlite.sql - :language: sql \ No newline at end of file + :language: sql + + +DatabaseLoadingContext +====================== +Workflow loading can be costly in terms of time and memory but also tricky, with the possibility of deadlock. +The ``DatabaseLoadingContext`` interface allows to define classes that manage these problems. Good practice is to load the objects from these classes instead of using directly the entity ``load`` methods. + +.. code-block:: python + + @abstractmethod + def add_deployment(self, persistent_id: int, deployment: DeploymentConfig): + ... + + @abstractmethod + def add_filter(self, persistent_id: int, filter_config: FilterConfig): + ... + + @abstractmethod + def add_port(self, persistent_id: int, port: Port): + ... + + @abstractmethod + def add_step(self, persistent_id: int, step: Step): + ... + + @abstractmethod + def add_target(self, persistent_id: int, target: Target): + ... + + @abstractmethod + def add_token(self, persistent_id: int, token: Token): + ... + + @abstractmethod + def add_workflow(self, persistent_id: int, workflow: Workflow): + ... + + @abstractmethod + async def load_deployment(self, context: StreamFlowContext, persistent_id: int): + ... + + @abstractmethod + async def load_filter(self, context: StreamFlowContext, persistent_id: int): + ... + + @abstractmethod + async def load_port(self, context: StreamFlowContext, persistent_id: int): + ... + + @abstractmethod + async def load_step(self, context: StreamFlowContext, persistent_id: int): + ... + + @abstractmethod + async def load_target(self, context: StreamFlowContext, persistent_id: int): + ... + + @abstractmethod + async def load_token(self, context: StreamFlowContext, persistent_id: int): + ... + + @abstractmethod + async def load_workflow(self, context: StreamFlowContext, persistent_id: int): + ... + + +Implementations +--------------- + +==================================================================== ============================================================= +Name Class +==================================================================== ============================================================= +:ref:`DefaultDatabaseLoadingContext ` streamflow.persistent.loading_context.DefaultDatabaseLoadingContext +:ref:`WorkflowLoader ` streamflow.persistent.loading_context.WorkflowLoader +==================================================================== ============================================================= + +DefaultDatabaseLoadingContext +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The ``DefaultDatabaseLoadingContext`` keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references. +Furthermore, it is in charge of assign the ``persistent_id`` when an entity is added in the cache with the ``add_deployment``, ``add_filter``, ``add_port``, ``add_step``, ``add_target``, ``add_token``, ``add_workflow`` methods. + + +WorkflowLoader +^^^^^^^^^^^^^^ +The ``WorkflowLoader`` allows the loading of the steps and ports of a workflow in a new one. +This feature can be helpful for the Fault Tolerance and the Resume features (see :ref:`Fault Tolerance `). +Between the workflows, it is possible to have some shared entities, particularly those used only in reading, for example ``deployment``` and ``target``. Instead, the entities with an internal state must be different instances, so ``steps``, ``ports`` and ``workflow``. +This is done by loading the entity, keeping the ``persistent_id`` in the case of a shared object, or creating a new ``persistent_id`` otherwise. +The ``WorkflowLoader`` extends the ``DefaultDatabaseLoadingContext`` class and overrides only the methods involving the ``step``, ``port`` and ``workflow`` entities. +The class has the ``workflow`` attribute, i.e., the new ``workflow`` instance, and the ``load_workflow`` method returns it. +Instead, the ``add_step``, ``add_port`` and ``add_workflow`` methods do not set the ``persistent_id`` as their parent methods. + +.. code-block:: python + + def __init__(self, workflow: Workflow): + super().__init__() + self.workflow: Workflow = workflow +