Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Dec 1, 2023
1 parent 6815ae2 commit 05109b7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 197 deletions.
2 changes: 1 addition & 1 deletion tests/test_change_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
are_equals,
object_to_dict,
)
from tests.utils.get_instances import (
from tests.utils.workflow import (
create_workflow,
create_schedule_step,
create_deploy_step,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cwl_change_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
set_workflow_in_combinator,
workflow_in_combinator_test,
)
from tests.utils.get_instances import create_workflow
from tests.utils.workflow import create_workflow


@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cwl_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
put_tokens,
verify_dependency_tokens,
)
from tests.utils.get_instances import (
from tests.utils.workflow import (
create_workflow,
create_schedule_step,
create_deploy_step,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async def test_schedule_step(context: StreamFlowContext):
"""Test token provenance for ScheduleStep"""
workflow = (await create_workflow(context, num_port=0))[0]
deploy_step = create_deploy_step(workflow)
schedule_step = create_schedule_step(workflow, deploy_step)
schedule_step = create_schedule_step(workflow, [deploy_step])

await workflow.save(context)
executor = StreamFlowExecutor(workflow)
Expand Down Expand Up @@ -237,7 +237,7 @@ async def test_execute_step(context: StreamFlowContext):
context, num_port=3
)
deploy_step = create_deploy_step(workflow)
schedule_step = create_schedule_step(workflow, deploy_step)
schedule_step = create_schedule_step(workflow, [deploy_step])

in_port_name = "in-1"
out_port_name = "out-1"
Expand Down
190 changes: 0 additions & 190 deletions tests/utils/get_instances.py

This file was deleted.

41 changes: 39 additions & 2 deletions tests/utils/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from streamflow.core import utils
from streamflow.core.deployment import Target
from streamflow.core.config import BindingConfig
from streamflow.workflow.combinator import (
DotProductCombinator,
CartesianProductCombinator,
LoopTerminationCombinator,
)
from streamflow.workflow.port import ConnectorPort
from streamflow.core.workflow import Workflow, Port
from streamflow.workflow.step import DeployStep, ScheduleStep
Expand All @@ -27,22 +32,29 @@ def create_deploy_step(workflow, deployment_config=None):
)


def create_schedule_step(workflow, deploy_step, binding_config=None):
def create_schedule_step(
workflow: Workflow,
deploy_steps: MutableSequence[DeployStep],
binding_config: BindingConfig = None,
):
# It is necessary to pass in the correct order biding_config.targets and deploy_steps for the mapping
if not binding_config:
binding_config = BindingConfig(
targets=[
Target(
deployment=deploy_step.deployment_config,
workdir=utils.random_name(),
)
for deploy_step in deploy_steps
]
)
return workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(utils.random_name(), "__schedule__"),
job_prefix="something",
connector_ports={
binding_config.targets[0].deployment.name: deploy_step.get_output_port()
target.deployment.name: deploy_step.get_output_port()
for target, deploy_step in zip(binding_config.targets, deploy_steps)
},
binding_config=binding_config,
)
Expand All @@ -59,3 +71,28 @@ async def create_workflow(
ports.append(workflow.create_port())
await workflow.save(context)
return workflow, tuple(cast(MutableSequence[Port], ports))


def get_dot_combinator():
return DotProductCombinator(name=utils.random_name(), workflow=None)


def get_cartesian_product_combinator():
return CartesianProductCombinator(name=utils.random_name(), workflow=None)


def get_loop_terminator_combinator():
c = LoopTerminationCombinator(name=utils.random_name(), workflow=None)
c.add_output_item("test1")
c.add_output_item("test2")
return c


def get_nested_crossproduct():
combinator = DotProductCombinator(name=utils.random_name(), workflow=None)
c1 = CartesianProductCombinator(name=utils.random_name(), workflow=None)
c1.add_item("ext")
c1.add_item("inn")
items = c1.get_items(False)
combinator.add_combinator(c1, items)
return combinator

0 comments on commit 05109b7

Please sign in to comment.