Skip to content

Commit

Permalink
created two distinct files for utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Nov 22, 2023
1 parent 17b6c63 commit 789dcfe
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 129 deletions.
74 changes: 1 addition & 73 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,10 @@
import pytest_asyncio

from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import (
DeploymentConfig,
LOCAL_LOCATION,
Location,
)
from streamflow.core.persistence import PersistableEntity
from streamflow.main import build_context
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
from tests.utils.get_instances import (
get_local_deployment_config,
get_docker_deployment_config,
get_kubernetes_deployment_config,
get_singularity_deployment_config,
get_ssh_deployment_config,
)
from tests.utils.deployment import get_deployment_config


def csvtype(choices):
Expand Down Expand Up @@ -84,67 +73,6 @@ def all_deployment_types():
return deployments_


async def get_deployment_config(
_context: StreamFlowContext, deployment_t: str
) -> DeploymentConfig:
if deployment_t == "local":
return get_local_deployment_config()
elif deployment_t == "docker":
return get_docker_deployment_config()
elif deployment_t == "kubernetes":
return get_kubernetes_deployment_config()
elif deployment_t == "singularity":
return get_singularity_deployment_config()
elif deployment_t == "ssh":
return await get_ssh_deployment_config(_context)
else:
raise Exception(f"{deployment_t} deployment type not supported")


async def get_location(_context: StreamFlowContext, deployment_t: str) -> Location:
if deployment_t == "local":
return Location(deployment=LOCAL_LOCATION, name=LOCAL_LOCATION)
elif deployment_t == "docker":
connector = _context.deployment_manager.get_connector("alpine-docker")
locations = await connector.get_available_locations()
return Location(deployment="alpine-docker", name=next(iter(locations.keys())))
elif deployment_t == "kubernetes":
connector = _context.deployment_manager.get_connector("alpine-kubernetes")
locations = await connector.get_available_locations(service="sf-test")
return Location(
deployment="alpine-kubernetes",
service="sf-test",
name=next(iter(locations.keys())),
)
elif deployment_t == "singularity":
connector = _context.deployment_manager.get_connector("alpine-singularity")
locations = await connector.get_available_locations()
return Location(
deployment="alpine-singularity", name=next(iter(locations.keys()))
)
elif deployment_t == "ssh":
connector = _context.deployment_manager.get_connector("linuxserver-ssh")
locations = await connector.get_available_locations()
return Location(deployment="linuxserver-ssh", name=next(iter(locations.keys())))
else:
raise Exception(f"{deployment_t} location type not supported")


def get_service(_context: StreamFlowContext, deployment_t: str) -> str | None:
if deployment_t == "local":
return None
elif deployment_t == "docker":
return None
elif deployment_t == "kubernetes":
return "sf-test"
elif deployment_t == "singularity":
return None
elif deployment_t == "ssh":
return None
else:
raise Exception(f"{deployment_t} deployment type not supported")


@pytest_asyncio.fixture(scope="module")
async def context(chosen_deployment_types) -> StreamFlowContext:
_context = build_context(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
TerminationToken,
)
from tests.conftest import save_load_and_test
from tests.utils.get_instances import get_docker_deployment_config
from tests.utils.deployment import get_docker_deployment_config


@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion tests/test_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
ListToken,
TerminationToken,
)
from tests.utils.get_instances import (
from tests.utils.workflow import (
create_workflow,
create_deploy_step,
create_schedule_step,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_remotepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from streamflow.core.deployment import Connector, Location
from streamflow.data import remotepath
from streamflow.deployment.utils import get_path_processor
from tests.conftest import get_location
from tests.utils.deployment import get_location


@pytest_asyncio.fixture(scope="module")
Expand Down
7 changes: 5 additions & 2 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
from streamflow.core.scheduling import AvailableLocation, Hardware
from streamflow.core.workflow import Job, Status
from streamflow.deployment.connector import LocalConnector
from tests.utils.get_instances import get_docker_deployment_config
from tests.conftest import get_service, get_deployment_config
from tests.utils.deployment import (
get_docker_deployment_config,
get_service,
get_deployment_config,
)


@pytest_asyncio.fixture(scope="module")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from streamflow.data import remotepath
from streamflow.deployment.connector import LocalConnector
from streamflow.deployment.utils import get_path_processor
from tests.conftest import get_location
from tests.utils.deployment import get_location


@pytest_asyncio.fixture(scope="module")
Expand Down
113 changes: 63 additions & 50 deletions tests/utils/get_instances.py → tests/utils/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@

import asyncio
import os
import posixpath
import tempfile
from jinja2 import Template
from typing import MutableSequence, cast

import asyncssh
import asyncssh.public_key
import pkg_resources

from streamflow.core import utils
from streamflow.core.config import BindingConfig
from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import Target, DeploymentConfig, LOCAL_LOCATION
from streamflow.core.workflow import Workflow, Port
from streamflow.workflow.port import ConnectorPort
from streamflow.workflow.step import DeployStep, ScheduleStep
from streamflow.core.deployment import (
DeploymentConfig,
LOCAL_LOCATION,
Location,
)


def get_docker_deployment_config():
Expand Down Expand Up @@ -107,47 +105,62 @@ def get_local_deployment_config():
)


def create_deploy_step(workflow, deployment_config=None):
connector_port = workflow.create_port(cls=ConnectorPort)
if not deployment_config:
deployment_config = get_docker_deployment_config()
return workflow.create_step(
cls=DeployStep,
name=posixpath.join("__deploy__", deployment_config.name),
deployment_config=deployment_config,
connector_port=connector_port,
)


def create_schedule_step(workflow, deploy_step, binding_config=None):
if not binding_config:
binding_config = BindingConfig(
targets=[
Target(
deployment=deploy_step.deployment_config,
workdir=utils.random_name(),
)
]
async def get_deployment_config(
_context: StreamFlowContext, deployment_t: str
) -> DeploymentConfig:
if deployment_t == "local":
return get_local_deployment_config()
elif deployment_t == "docker":
return get_docker_deployment_config()
elif deployment_t == "kubernetes":
return get_kubernetes_deployment_config()
elif deployment_t == "singularity":
return get_singularity_deployment_config()
elif deployment_t == "ssh":
return await get_ssh_deployment_config(_context)
else:
raise Exception(f"{deployment_t} deployment type not supported")


async def get_location(_context: StreamFlowContext, deployment_t: str) -> Location:
if deployment_t == "local":
return Location(deployment=LOCAL_LOCATION, name=LOCAL_LOCATION)
elif deployment_t == "docker":
connector = _context.deployment_manager.get_connector("alpine-docker")
locations = await connector.get_available_locations()
return Location(deployment="alpine-docker", name=next(iter(locations.keys())))
elif deployment_t == "kubernetes":
connector = _context.deployment_manager.get_connector("alpine-kubernetes")
locations = await connector.get_available_locations(service="sf-test")
return Location(
deployment="alpine-kubernetes",
service="sf-test",
name=next(iter(locations.keys())),
)
return workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(utils.random_name(), "__schedule__"),
job_prefix="something",
connector_ports={
binding_config.targets[0].deployment.name: deploy_step.get_output_port()
},
binding_config=binding_config,
)


async def create_workflow(
context: StreamFlowContext, num_port: int = 2
) -> tuple[Workflow, tuple[Port]]:
workflow = Workflow(
context=context, type="cwl", name=utils.random_name(), config={}
)
ports = []
for _ in range(num_port):
ports.append(workflow.create_port())
await workflow.save(context)
return workflow, tuple(cast(MutableSequence[Port], ports))
elif deployment_t == "singularity":
connector = _context.deployment_manager.get_connector("alpine-singularity")
locations = await connector.get_available_locations()
return Location(
deployment="alpine-singularity", name=next(iter(locations.keys()))
)
elif deployment_t == "ssh":
connector = _context.deployment_manager.get_connector("linuxserver-ssh")
locations = await connector.get_available_locations()
return Location(deployment="linuxserver-ssh", name=next(iter(locations.keys())))
else:
raise Exception(f"{deployment_t} location type not supported")


def get_service(_context: StreamFlowContext, deployment_t: str) -> str | None:
if deployment_t == "local":
return None
elif deployment_t == "docker":
return None
elif deployment_t == "kubernetes":
return "sf-test"
elif deployment_t == "singularity":
return None
elif deployment_t == "ssh":
return None
else:
raise Exception(f"{deployment_t} deployment type not supported")
61 changes: 61 additions & 0 deletions tests/utils/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

import posixpath
from typing import MutableSequence, cast, TYPE_CHECKING

from streamflow.core import utils
from streamflow.core.deployment import Target
from streamflow.core.config import BindingConfig
from streamflow.workflow.port import ConnectorPort
from streamflow.core.workflow import Workflow, Port
from streamflow.workflow.step import DeployStep, ScheduleStep
from tests.utils.deployment import get_docker_deployment_config

if TYPE_CHECKING:
from streamflow.core.context import StreamFlowContext


def create_deploy_step(workflow, deployment_config=None):
connector_port = workflow.create_port(cls=ConnectorPort)
if not deployment_config:
deployment_config = get_docker_deployment_config()
return workflow.create_step(
cls=DeployStep,
name=posixpath.join("__deploy__", deployment_config.name),
deployment_config=deployment_config,
connector_port=connector_port,
)


def create_schedule_step(workflow, deploy_step, binding_config=None):
if not binding_config:
binding_config = BindingConfig(
targets=[
Target(
deployment=deploy_step.deployment_config,
workdir=utils.random_name(),
)
]
)
return workflow.create_step(
cls=ScheduleStep,
name=posixpath.join(utils.random_name(), "__schedule__"),
job_prefix="something",
connector_ports={
binding_config.targets[0].deployment.name: deploy_step.get_output_port()
},
binding_config=binding_config,
)


async def create_workflow(
context: StreamFlowContext, num_port: int = 2
) -> tuple[Workflow, tuple[Port]]:
workflow = Workflow(
context=context, type="cwl", name=utils.random_name(), config={}
)
ports = []
for _ in range(num_port):
ports.append(workflow.create_port())
await workflow.save(context)
return workflow, tuple(cast(MutableSequence[Port], ports))

0 comments on commit 789dcfe

Please sign in to comment.