Skip to content

Commit

Permalink
moved methods in a dedicated file
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Nov 22, 2023
1 parent fdca153 commit 17b6c63
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 169 deletions.
117 changes: 8 additions & 109 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,29 @@

import argparse
import asyncio
import os
import platform
import tempfile
from asyncio.locks import Lock
from typing import Collection

import asyncssh.public_key
import pkg_resources
import pytest
import pytest_asyncio
from jinja2 import Template

from streamflow.core import utils
from streamflow.core.config import Config
from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import (
DeploymentConfig,
LOCAL_LOCATION,
Location,
Target,
)
from streamflow.core.persistence import PersistableEntity
from streamflow.core.workflow import Port, Step, Token, Workflow
from streamflow.main import build_context
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
from tests.utils.get_instances import (
get_local_deployment_config,
get_docker_deployment_config,
get_kubernetes_deployment_config,
get_singularity_deployment_config,
get_ssh_deployment_config,
)


def csvtype(choices):
Expand Down Expand Up @@ -147,93 +145,6 @@ def get_service(_context: StreamFlowContext, deployment_t: str) -> str | None:
raise Exception(f"{deployment_t} deployment type not supported")


def get_docker_deployment_config():
return DeploymentConfig(
name="alpine-docker",
type="docker",
config={"image": "alpine:3.16.2"},
external=False,
lazy=False,
)


def get_kubernetes_deployment_config():
with open(pkg_resources.resource_filename(__name__, "pod.jinja2")) as t:
template = Template(t.read())
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
template.stream(name=utils.random_name()).dump(f.name)
return DeploymentConfig(
name="alpine-kubernetes",
type="kubernetes",
config={"files": [f.name]},
external=False,
lazy=False,
)


def get_singularity_deployment_config():
return DeploymentConfig(
name="alpine-singularity",
type="singularity",
config={"image": "docker://alpine:3.16.2"},
external=False,
lazy=False,
)


async def get_ssh_deployment_config(_context: StreamFlowContext):
skey = asyncssh.public_key.generate_private_key(
alg_name="ssh-rsa",
comment="streamflow-test",
key_size=4096,
)
public_key = skey.export_public_key().decode("utf-8")
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
skey.write_private_key(f.name)
docker_config = DeploymentConfig(
name="linuxserver-ssh-docker",
type="docker",
config={
"image": "lscr.io/linuxserver/openssh-server",
"env": [f"PUBLIC_KEY={public_key}"],
"init": False,
"publish": ["2222:2222"],
},
external=False,
lazy=False,
)
await _context.deployment_manager.deploy(docker_config)
await asyncio.sleep(5)
return DeploymentConfig(
name="linuxserver-ssh",
type="ssh",
config={
"nodes": [
{
"checkHostKey": False,
"hostname": "127.0.0.1:2222",
"sshKey": f.name,
"username": "linuxserver.io",
}
],
"maxConcurrentSessions": 10,
},
external=False,
lazy=False,
)


def get_local_deployment_config():
return DeploymentConfig(
name=LOCAL_LOCATION,
type="local",
config={},
external=True,
lazy=False,
workdir=os.path.realpath(tempfile.gettempdir()),
)


@pytest_asyncio.fixture(scope="module")
async def context(chosen_deployment_types) -> StreamFlowContext:
_context = build_context(
Expand Down Expand Up @@ -338,17 +249,5 @@ async def save_load_and_test(elem: PersistableEntity, context):
# created a new DefaultDatabaseLoadingContext to have the objects fetched from the database
# (and not take their reference saved in the attributes)
loading_context = DefaultDatabaseLoadingContext()
loaded = None
if isinstance(elem, Step):
loaded = await loading_context.load_step(context, elem.persistent_id)
elif isinstance(elem, Port):
loaded = await loading_context.load_port(context, elem.persistent_id)
elif isinstance(elem, Token):
loaded = await loading_context.load_token(context, elem.persistent_id)
elif isinstance(elem, Workflow):
loaded = await loading_context.load_workflow(context, elem.persistent_id)
elif isinstance(elem, Target):
loaded = await loading_context.load_target(context, elem.persistent_id)
elif isinstance(elem, Config):
loaded = await loading_context.load_deployment(context, elem.persistent_id)
loaded = await type(elem).load(context, elem.persistent_id, loading_context)
assert are_equals(elem, loaded)
3 changes: 2 additions & 1 deletion tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
ObjectToken,
TerminationToken,
)
from tests.conftest import get_docker_deployment_config, save_load_and_test
from tests.conftest import save_load_and_test
from tests.utils.get_instances import get_docker_deployment_config


@pytest.mark.asyncio
Expand Down
59 changes: 5 additions & 54 deletions tests/test_provenance.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from __future__ import annotations

import asyncio
import posixpath
from typing import Any, MutableMapping, MutableSequence, cast

import pytest

from streamflow.core import utils
from streamflow.core.config import BindingConfig
from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import Target
from streamflow.core.persistence import DatabaseLoadingContext
from streamflow.core.workflow import Port, Status, Step, Token, Workflow
from streamflow.cwl.command import CWLCommand, CWLCommandToken
Expand All @@ -22,22 +19,23 @@
LoopTerminationCombinator,
)
from streamflow.workflow.executor import StreamFlowExecutor
from streamflow.workflow.port import ConnectorPort
from streamflow.workflow.step import (
CombinatorStep,
DeployStep,
ExecuteStep,
GatherStep,
LoopCombinatorStep,
ScatterStep,
ScheduleStep,
)
from streamflow.workflow.token import (
IterationTerminationToken,
ListToken,
TerminationToken,
)
from tests.conftest import get_docker_deployment_config
from tests.utils.get_instances import (
create_workflow,
create_deploy_step,
create_schedule_step,
)


def _contains_id(id_: int, token_list: MutableSequence[Token]) -> bool:
Expand Down Expand Up @@ -81,52 +79,6 @@ async def _load_dependers(
)


def create_deploy_step(workflow, deployment_config=None):
connector_port = workflow.create_port(cls=ConnectorPort)
if not deployment_config:
deployment_config = get_docker_deployment_config()
return workflow.create_step(
cls=DeployStep,
name=posixpath.join("__deploy__", deployment_config.name),
deployment_config=deployment_config,
connector_port=connector_port,
)


def create_schedule_step(workflow, deploy_step, binding_config=None):
if not binding_config:
binding_config = BindingConfig(
targets=[
Target(
deployment=deploy_step.deployment_config,
workdir=utils.random_name(),
)
]
)
return workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(utils.random_name(), "__schedule__"),
job_prefix="something",
connector_ports={
binding_config.targets[0].deployment.name: deploy_step.get_output_port()
},
binding_config=binding_config,
)


async def create_workflow(
context: StreamFlowContext, num_port: int = 2
) -> tuple[Workflow, tuple[Port]]:
workflow = Workflow(
context=context, type="cwl", name=utils.random_name(), config={}
)
ports = []
for _ in range(num_port):
ports.append(workflow.create_port())
await workflow.save(context)
return workflow, tuple(cast(MutableSequence[Port], ports))


async def general_test(
context: StreamFlowContext,
workflow: Workflow,
Expand All @@ -137,7 +89,6 @@ async def general_test(
token_list: MutableSequence[Token],
port_name: str = "test",
) -> Step:
""" """
step = workflow.create_step(cls=step_cls, **kwargs_step)
step.add_input_port(port_name, in_port)
step.add_output_port(port_name, out_port)
Expand Down
7 changes: 2 additions & 5 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
from streamflow.core.scheduling import AvailableLocation, Hardware
from streamflow.core.workflow import Job, Status
from streamflow.deployment.connector import LocalConnector
from tests.conftest import (
get_deployment_config,
get_docker_deployment_config,
get_service,
)
from tests.utils.get_instances import get_docker_deployment_config
from tests.conftest import get_service, get_deployment_config


@pytest_asyncio.fixture(scope="module")
Expand Down
Loading

0 comments on commit 17b6c63

Please sign in to comment.