Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added possibility to load ports and steps in a different workflow #274

Merged
merged 69 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
34b923c
added parameter change_wf in the load methods
LanderOtto Nov 6, 2023
9d0bdd2
fix Config persistence
LanderOtto Nov 6, 2023
29052b7
refactory from deployment to config
LanderOtto Nov 6, 2023
a25a63e
fix: config is read-only so workflows can share the obj with same id
LanderOtto Nov 7, 2023
f24eacb
minor fix
LanderOtto Nov 7, 2023
3f3a10e
moved some general-purpose methods in a utils file
LanderOtto Nov 7, 2023
e986840
added tests for change_wf
LanderOtto Nov 7, 2023
2163be6
added new asserts
LanderOtto Nov 7, 2023
eb57f50
fix loading of TokenProcessor in ValueFromTransformer
LanderOtto Nov 7, 2023
5a669d3
fix k8s template path
LanderOtto Nov 7, 2023
fc86b92
test on other Steps
LanderOtto Nov 7, 2023
2206776
minor fix skip ports loading
LanderOtto Nov 7, 2023
49224fa
fix comments
LanderOtto Nov 7, 2023
014b8da
minor fix
LanderOtto Nov 8, 2023
b01ed16
fix annotation
LanderOtto Nov 8, 2023
38f6412
moved methods and template
LanderOtto Nov 8, 2023
5f90117
merge
LanderOtto Dec 1, 2023
465eaaa
fix tests
LanderOtto Dec 1, 2023
d2a7f0e
minor fix
LanderOtto Dec 3, 2023
e4dd0d2
renamed parameter name
LanderOtto Jan 3, 2024
006fed4
fix
LanderOtto Jan 3, 2024
6c0a93e
change signature method from public to private
LanderOtto Jan 3, 2024
bf5715e
change parameter type
LanderOtto Jan 4, 2024
1029bb4
moved change_workflow control into load_workflow method
LanderOtto Jan 5, 2024
ed3a69c
fixed type error
LanderOtto Jan 5, 2024
2a2d286
removed add_step and add_port methods
LanderOtto Jan 5, 2024
53407c8
moved logic of the _get_port method into load_port method
LanderOtto Jan 5, 2024
b60d21e
fix
LanderOtto Jan 5, 2024
0f6d912
added tests
LanderOtto Jan 5, 2024
52dd913
used pytest parametrize
LanderOtto Jan 5, 2024
2208421
made code more readable
LanderOtto Jan 5, 2024
a61d2cc
changes get_dependencies method name into load_dependencies
LanderOtto Jan 5, 2024
e3b03e7
created a new class
LanderOtto Jan 6, 2024
f6e0d3f
made code more readable
LanderOtto Jan 6, 2024
af2ecff
fix
LanderOtto Jan 6, 2024
b3f77c0
removed is_std_loading method
LanderOtto Jan 6, 2024
dfd2819
renamed test files
LanderOtto Jan 6, 2024
1e950ae
moved persistent_id assignemtn inside DatabaseLoadingContext
LanderOtto Jan 6, 2024
05973a4
fixed signatures
LanderOtto Jan 6, 2024
69ad637
fix
LanderOtto Jan 6, 2024
cfb39ea
added doc
LanderOtto Jan 6, 2024
17711c8
change doc
LanderOtto Jan 6, 2024
078eaeb
checksum
LanderOtto Jan 6, 2024
69e7ce6
fix
LanderOtto Jan 6, 2024
4c9622d
fix
LanderOtto Jan 6, 2024
47a9679
fix sphinx error
LanderOtto Jan 6, 2024
7341a2f
fix sphinx error ... again
LanderOtto Jan 6, 2024
9925950
removed duplicate name
LanderOtto Jan 6, 2024
4207324
test
LanderOtto Jan 6, 2024
6188f4b
test 2
LanderOtto Jan 6, 2024
a24d838
fix 2
LanderOtto Jan 6, 2024
d4bea87
test undo
LanderOtto Jan 6, 2024
3619a0e
last try
LanderOtto Jan 6, 2024
86382e1
test
LanderOtto Jan 6, 2024
7ff6124
resolved checksum
LanderOtto Jan 6, 2024
0b80a59
renamed WorkflowLoader class into WorkflowBuilder
LanderOtto Jan 7, 2024
e038532
fix doc
LanderOtto Jan 7, 2024
848d208
fix empty method notation
LanderOtto Jan 7, 2024
f63fac8
workflow load v1
LanderOtto Jan 8, 2024
a399f2c
workflow load v2
LanderOtto Jan 8, 2024
0031e26
refactor in alphabetic order the tests
LanderOtto Jan 8, 2024
4c7382c
doc
LanderOtto Jan 8, 2024
f12696b
doc
LanderOtto Jan 8, 2024
29d2e22
removed load_entire_workflow parameter
LanderOtto Jan 9, 2024
dbe62e1
doc
LanderOtto Jan 9, 2024
4aed45e
removed WorkflowBuilder logic in the step load method
LanderOtto Jan 9, 2024
79d5c3f
removed load_dependencies
LanderOtto Jan 11, 2024
b1ac866
doc
LanderOtto Jan 11, 2024
2fde06b
doc
LanderOtto Jan 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ jobs:
python -m pip install -r docs/requirements.txt
- name: "Build documentation and check for consistency"
env:
CHECKSUM: "caaa02b3477bf264a667c1684176ecf3f5e03e6c916d3107299c33670506239c"
CHECKSUM: "fc9bdd01ef90f0b24d019da7683aa528af10119ef54d0a13cb16ec7adaa04242"
run: |
cd docs/
HASH="$(make checksum | tail -n1)"
echo "Docs checksum is ${HASH}"
test "${HASH}" == "${CHECKSUM}"
test-flux:
runs-on: ubuntu-22.04
Expand Down
83 changes: 81 additions & 2 deletions docs/source/ext/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ StreamFlow relies on a persistent ``Database`` to store all the metadata regardi

Each ``PersistableEntity`` is identified by a unique numerical ``persistent_id`` related to the corresponding ``Database`` record. Two methods, ``save`` and ``load``, allow persisting the entity in the ``Database`` and retrieving it from the persistent record. Note that ``load`` is a class method, as it must construct a new instance.

The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context``. The latter keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references.
The ``load`` method receives three input parameters: the current execution ``context``, the ``persistent_id`` of the instance that should be loaded, and a ``loading_context`` (see :ref:`DatabaseLoadingContext <DatabaseLoadingContext>`). Note that the ``load`` method should not directly assign the ``persistent_id`` to the new entity, as this operation is in charge to the :ref:`DatabaseLoadingContext <DatabaseLoadingContext>` class.

Persistence
===========

The ``Database`` interface, defined in the ``streamflow.core.persistence`` module, contains all the methods to create, modify, and retrieve this metadata. Data deletion is unnecessary, as StreamFlow never removes existing records. Internally, the ``save`` and ``load`` methods call one or more of these methods to perform the desired operations.

Expand Down Expand Up @@ -230,8 +233,9 @@ Each ``get_data`` method receives in input the identifier (commonly the ``persis

The ``close`` method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections.


Implementations
===============
---------------

====== ============================================
Type Class
Expand All @@ -247,3 +251,78 @@ The database schema is structured as follows:

.. literalinclude:: ../../../streamflow/persistence/schemas/sqlite.sql
:language: sql


DatabaseLoadingContext
======================
Workflow loading is a delicate operation. If not managed properly, it can be costly in terms of time and memory and lead to deadlocks in case of circular references.
The ``DatabaseLoadingContext`` interface allows to define classes in charge of managing these aspects. Users should always rely on these classes to load entities, instead of directly calling ``load`` methods from ``PersistableEntity`` instances.

.. code-block:: python

def add_deployment(self, persistent_id: int, deployment: DeploymentConfig):
...

def add_filter(self, persistent_id: int, filter_config: FilterConfig):
...

def add_port(self, persistent_id: int, port: Port):
...

def add_step(self, persistent_id: int, step: Step):
...

def add_target(self, persistent_id: int, target: Target):
...

def add_token(self, persistent_id: int, token: Token):
...

def add_workflow(self, persistent_id: int, workflow: Workflow):
...

async def load_deployment(self, context: StreamFlowContext, persistent_id: int):
...

async def load_filter(self, context: StreamFlowContext, persistent_id: int):
...

async def load_port(self, context: StreamFlowContext, persistent_id: int):
...

async def load_step(self, context: StreamFlowContext, persistent_id: int):
...

async def load_target(self, context: StreamFlowContext, persistent_id: int):
...

async def load_token(self, context: StreamFlowContext, persistent_id: int):
...

async def load_workflow(self, context: StreamFlowContext, persistent_id: int):
...


Implementations
---------------

==================================================================== =============================================================
Name Class
==================================================================== =============================================================
:ref:`DefaultDatabaseLoadingContext <DefaultDatabaseLoadingContext>` streamflow.persistent.loading_context.DefaultDatabaseLoadingContext
:ref:`WorkflowBuilder <WorkflowBuilder>` streamflow.persistent.loading_context.WorkflowBuilder
==================================================================== =============================================================

DefaultDatabaseLoadingContext
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The ``DefaultDatabaseLoadingContext`` keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references.
Furthermore, it is in charge of assigning the ``persistent_id`` when an entity is added to the cache through an ``add_*`` method.

WorkflowBuilder
^^^^^^^^^^^^^^^
The ``WorkflowBuilder`` class loads the steps and ports of an existing workflow from a ``Database`` and inserts them into a new workflow object received as a constructor argument. It extends the ``DefaultDatabaseLoadingContext`` class and overrides only the methods involving ``step``, ``port``, and ``workflow`` entities. In particular, the ``add_*`` methods of these entities must not set the ``persistent_id``, as they are dealing with a newly-created workflow, and the ``load_*`` methods should reset the internal state of their entities to the initial value (e.g., reset the status to `Status.WAITING` and clear the `terminated` flag).

The ``load_workflow`` method must behave in two different ways, depending on whether it is called directly from a user or in the internal logic of another entity's ``load`` method. In the first case, it should load all the entities related to the original workflow, identified by the ``persistent_id`` argument, into the new one. In the latter case it should simply return the new workflow entity being built.

Other entities, such as ``deployment`` and ``target`` objects, can be safely shared between the old and the new workflows, as their internal state does not need to be modified. Therefore, they can be loaded following the common path implemented in the ``DefaultDatabaseLoadingContext`` class.

3 changes: 0 additions & 3 deletions streamflow/core/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ async def load(
lazy=row["lazy"],
workdir=row["workdir"],
)
obj.persistent_id = persistent_id
loading_context.add_deployment(persistent_id, obj)
return obj

Expand Down Expand Up @@ -276,7 +275,6 @@ async def load(
row = await context.database.get_target(persistent_id)
type = cast(Type[Target], utils.get_class_from_name(row["type"]))
obj = await type._load(context, row, loading_context)
obj.persistent_id = persistent_id
loading_context.add_target(persistent_id, obj)
return obj

Expand Down Expand Up @@ -338,7 +336,6 @@ async def load(
type=row["type"],
config=json.loads(row["config"]),
)
obj.persistent_id = persistent_id
loading_context.add_filter(persistent_id, obj)
return obj

Expand Down
1 change: 0 additions & 1 deletion streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
MutableSequence,
TYPE_CHECKING,
)

from streamflow.core.exception import WorkflowExecutionException

if TYPE_CHECKING:
Expand Down
6 changes: 1 addition & 5 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ async def load(
row = await context.database.get_port(persistent_id)
type = cast(Type[Port], utils.get_class_from_name(row["type"]))
port = await type._load(context, row, loading_context)
port.persistent_id = persistent_id
loading_context.add_port(persistent_id, port)
return port

Expand Down Expand Up @@ -432,14 +431,14 @@ async def load(
row = await context.database.get_step(persistent_id)
type = cast(Type[Step], utils.get_class_from_name(row["type"]))
step = await type._load(context, row, loading_context)
step.persistent_id = persistent_id
step.status = Status(row["status"])
step.terminated = step.status in [
Status.COMPLETED,
Status.FAILED,
Status.SKIPPED,
]
input_deps = await context.database.get_input_ports(persistent_id)
loading_context.add_step(persistent_id, step)
input_ports = await asyncio.gather(
*(
asyncio.create_task(loading_context.load_port(context, d["port"]))
Expand All @@ -457,7 +456,6 @@ async def load(
step.output_ports = {
d["name"]: p.name for d, p in zip(output_deps, output_ports)
}
loading_context.add_step(persistent_id, step)
return step

@abstractmethod
Expand Down Expand Up @@ -549,7 +547,6 @@ async def load(
row = await context.database.get_token(persistent_id)
type = cast(Type[Token], utils.get_class_from_name(row["type"]))
token = await type._load(context, row, loading_context)
token.persistent_id = persistent_id
loading_context.add_token(persistent_id, token)
return token

Expand Down Expand Up @@ -676,7 +673,6 @@ async def load(
workflow = cls(
context=context, type=row["type"], config=params["config"], name=row["name"]
)
workflow.persistent_id = row["id"]
loading_context.add_workflow(persistent_id, workflow)
rows = await context.database.get_workflow_ports(persistent_id)
workflow.ports = {
Expand Down
8 changes: 5 additions & 3 deletions streamflow/cwl/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ async def _load(
format_graph=(
format_graph.parse(data=row["format_graph"])
if row["format_graph"] is not None
else format_graph
),
else None
), # todo: fix multiple instances
full_js=row["full_js"],
load_contents=row["load_contents"],
load_listing=LoadListing(row["load_listing"])
Expand Down Expand Up @@ -300,7 +300,9 @@ async def _save_additional_params(self, context: StreamFlowContext):
"expression_lib": self.expression_lib,
"file_format": self.file_format,
"format_graph": (
self.format_graph.serialize() if self.format_graph else None
self.format_graph.serialize()
if self.format_graph is not None
else None
),
"full_js": self.full_js,
"load_contents": self.load_contents,
Expand Down
59 changes: 58 additions & 1 deletion streamflow/persistence/loading_context.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations
from typing import MutableMapping

from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import DeploymentConfig, Target, FilterConfig
from streamflow.core.persistence import DatabaseLoadingContext
from streamflow.core.workflow import Port, Step, Token, Workflow
from streamflow.core.workflow import Port, Step, Token, Workflow, Status


class DefaultDatabaseLoadingContext(DatabaseLoadingContext):
Expand All @@ -18,24 +19,31 @@ def __init__(self):
self._workflows: MutableMapping[int, Workflow] = {}

def add_deployment(self, persistent_id: int, deployment: DeploymentConfig):
deployment.persistent_id = persistent_id
self._deployment_configs[persistent_id] = deployment

def add_filter(self, persistent_id: int, filter_config: FilterConfig):
filter_config.persistent_id = persistent_id
self._filter_configs[persistent_id] = filter_config

def add_port(self, persistent_id: int, port: Port):
port.persistent_id = persistent_id
self._ports[persistent_id] = port

def add_step(self, persistent_id: int, step: Step):
step.persistent_id = persistent_id
self._steps[persistent_id] = step

def add_target(self, persistent_id: int, target: Target):
target.persistent_id = persistent_id
self._targets[persistent_id] = target

def add_token(self, persistent_id: int, token: Token):
token.persistent_id = persistent_id
self._tokens[persistent_id] = token

def add_workflow(self, persistent_id: int, workflow: Workflow):
workflow.persistent_id = persistent_id
self._workflows[persistent_id] = workflow

async def load_deployment(self, context: StreamFlowContext, persistent_id: int):
Expand Down Expand Up @@ -72,3 +80,52 @@ async def load_workflow(self, context: StreamFlowContext, persistent_id: int):
return self._workflows.get(persistent_id) or await Workflow.load(
context, persistent_id, self
)


class WorkflowBuilder(DefaultDatabaseLoadingContext):
def __init__(self, workflow: Workflow):
super().__init__()
self.workflow: Workflow = workflow

def add_port(self, persistent_id: int, port: Port):
self._ports[persistent_id] = port

def add_step(self, persistent_id: int, step: Step):
self._steps[persistent_id] = step

def add_workflow(self, persistent_id: int, workflow: Workflow):
self._workflows[persistent_id] = self.workflow

async def load_step(self, context: StreamFlowContext, persistent_id: int):
if persistent_id in self._steps.keys():
return self._steps[persistent_id]
else:
step_row = await context.database.get_step(persistent_id)
if (step := self.workflow.steps.get(step_row["name"])) is None:
# If the step is not available in the new workflow, a new one must be created
self.add_workflow(step_row["workflow"], self.workflow)
step = await Step.load(context, persistent_id, self)

# restore initial step state
step.status = Status.WAITING
step.terminated = False

self.workflow.steps[step.name] = step
return step

async def load_port(self, context: StreamFlowContext, persistent_id: int):
if persistent_id in self._ports.keys():
return self._ports[persistent_id]
else:
port_row = await context.database.get_port(persistent_id)
if (port := self.workflow.ports.get(port_row["name"])) is None:
# If the port is not available in the new workflow, a new one must be created
self.add_workflow(port_row["workflow"], self.workflow)
port = await Port.load(context, persistent_id, self)
self.workflow.ports[port.name] = port
return port

async def load_workflow(self, context: StreamFlowContext, persistent_id: int):
if persistent_id not in self._workflows.keys():
await Workflow.load(context, persistent_id, self)
return self.workflow
Loading
Loading