Skip to content

Commit

Permalink
last try
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jan 6, 2024
1 parent d4bea87 commit 3619a0e
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 5 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,17 @@ jobs:
python -m pip install -r docs/requirements.txt
- name: "Build documentation and check for consistency"
env:
CHECKSUM: "e1fe7e1bf81fd1ccc9d8271547213d17a47c2bbae588709b97ff072cb9ce2a01"
CHECKSUM: "f9d19e96da56a0460a248feb9e946f5f28ea4a8de1c32345e4e60e14f25beb96"
run: |
cd docs/
HASH="$(make checksum | tail -n1)"
find build/html/ \
-not -name 'searchindex.js' \
-not -name '*.woff' \
-not -name '*.woff2' \
-type f -print0 | \
sort -zd | \
xargs -r0 sha256sum
echo "Docs checksum is ${HASH}"
test "${HASH}" == "${CHECKSUM}"
test-flux:
Expand Down
110 changes: 106 additions & 4 deletions docs/source/ext/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ StreamFlow relies on a persistent ``Database`` to store all the metadata regardi
) -> None:
...
Each ``PersistableEntity`` is identified by a unique numerical ``persistent_id`` related to the corresponding ``Database`` record. Two methods, ``save`` and ``load``, allow persisting the entity in the ``Database`` and retrieving it from the persistent record. Note that ``load`` is a class method, as it must construct a new instance.
Each ``PersistableEntity`` is identified by a unique numerical ``persistent_id`` related to the corresponding ``Database`` record. Two methods, ``save`` and ``load``, allow persisting the entity in the ``Database`` and retrieving it from the persistent record. Note that ``load`` is a class method, as it must construct a new instance, furthermore it does not assign the ``persistent_id``.

The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context``. The latter keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references.
The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context`` (see :ref:`DatabaseLoadingContext <DatabaseLoadingContext>`).

Persistence
===========

The ``Database`` interface, defined in the ``streamflow.core.persistence`` module, contains all the methods to create, modify, and retrieve this metadata. Data deletion is unnecessary, as StreamFlow never removes existing records. Internally, the ``save`` and ``load`` methods call one or more of these methods to perform the desired operations.

Expand Down Expand Up @@ -230,8 +233,9 @@ Each ``get_data`` method receives in input the identifier (commonly the ``persis

The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections.


Implementations
===============
---------------

====== ============================================
Type Class
Expand All @@ -246,4 +250,102 @@ By default, StreamFlow uses a local ``SqliteDatabase`` instance for metadata per
The database schema is structured as follows:

.. literalinclude:: ../../../streamflow/persistence/schemas/sqlite.sql
:language: sql
:language: sql


DatabaseLoadingContext
======================
Workflow loading can be costly in terms of time and memory but also tricky, with the possibility of deadlock.
The ``DatabaseLoadingContext`` interface allows to define classes that manage these problems. Good practice is to load the objects from these classes instead of using directly the entity ``load`` methods.

.. code-block:: python
@abstractmethod
def add_deployment(self, persistent_id: int, deployment: DeploymentConfig):
...
@abstractmethod
def add_filter(self, persistent_id: int, filter_config: FilterConfig):
...
@abstractmethod
def add_port(self, persistent_id: int, port: Port):
...
@abstractmethod
def add_step(self, persistent_id: int, step: Step):
...
@abstractmethod
def add_target(self, persistent_id: int, target: Target):
...
@abstractmethod
def add_token(self, persistent_id: int, token: Token):
...
@abstractmethod
def add_workflow(self, persistent_id: int, workflow: Workflow):
...
@abstractmethod
async def load_deployment(self, context: StreamFlowContext, persistent_id: int):
...
@abstractmethod
async def load_filter(self, context: StreamFlowContext, persistent_id: int):
...
@abstractmethod
async def load_port(self, context: StreamFlowContext, persistent_id: int):
...
@abstractmethod
async def load_step(self, context: StreamFlowContext, persistent_id: int):
...
@abstractmethod
async def load_target(self, context: StreamFlowContext, persistent_id: int):
...
@abstractmethod
async def load_token(self, context: StreamFlowContext, persistent_id: int):
...
@abstractmethod
async def load_workflow(self, context: StreamFlowContext, persistent_id: int):
...
Implementations
---------------

==================================================================== =============================================================
Name Class
==================================================================== =============================================================
:ref:`DefaultDatabaseLoadingContext <DefaultDatabaseLoadingContext>` streamflow.persistent.loading_context.DefaultDatabaseLoadingContext
:ref:`WorkflowLoader <WorkflowLoader>` streamflow.persistent.loading_context.WorkflowLoader
==================================================================== =============================================================

DefaultDatabaseLoadingContext
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The ``DefaultDatabaseLoadingContext`` keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references.
Furthermore, it is in charge of assign the ``persistent_id`` when an entity is added in the cache with the ``add_deployment``, ``add_filter``, ``add_port``, ``add_step``, ``add_target``, ``add_token``, ``add_workflow`` methods.


WorkflowLoader
^^^^^^^^^^^^^^
The ``WorkflowLoader`` allows the loading of the steps and ports of a workflow in a new one.
This feature can be helpful for the Fault Tolerance and the Resume features (see :ref:`Fault Tolerance <Fault tolerance>`).
Between the workflows, it is possible to have some shared entities, particularly those used only in reading, for example ``deployment``` and ``target``. Instead, the entities with an internal state must be different instances, so ``steps``, ``ports`` and ``workflow``.
This is done by loading the entity, keeping the ``persistent_id`` in the case of a shared object, or creating a new ``persistent_id`` otherwise.
The ``WorkflowLoader`` extends the ``DefaultDatabaseLoadingContext`` class and overrides only the methods involving the ``step``, ``port`` and ``workflow`` entities.
The class has the ``workflow`` attribute, i.e., the new ``workflow`` instance, and the ``load_workflow`` method returns it.
Instead, the ``add_step``, ``add_port`` and ``add_workflow`` methods do not set the ``persistent_id`` as their parent methods.

.. code-block:: python
def __init__(self, workflow: Workflow):
super().__init__()
self.workflow: Workflow = workflow

0 comments on commit 3619a0e

Please sign in to comment.