Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configure_step/workflow #190

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/wic/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .pythonapi import Step, Workflow
from .pythonapi import Step, Workflow, configure_step, configure_workflow

__all__ = ["Step", "Workflow"]
__all__ = ["Step", "Workflow", "configure_step", "configure_workflow"]
155 changes: 142 additions & 13 deletions src/wic/api/pythonapi.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
# pylint: disable=W1203
"""CLT utilities."""
import json
import logging
import subprocess
from dataclasses import field
from functools import singledispatch
from pathlib import Path
from typing import Any, ClassVar, Optional, TypeVar, Union
from typing import Any, ClassVar, Generic, Optional, TypeVar, Union

import cwl_utils.parser as cu_parser
import yaml
from cwl_utils.parser import CommandLineTool as CWLCommandLineTool
from cwl_utils.parser import load_document_by_uri
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from pydantic import ( # pylint: disable=E0611
BaseModel,
ConfigDict,
Field,
PrivateAttr,
ValidationError,
)
from pydantic.dataclasses import dataclass as pydantic_dataclass

from wic.api._types import CWL_TYPES_DICT
Expand Down Expand Up @@ -100,6 +107,18 @@ def _set_value(
self.linked = True


InputType = TypeVar("InputType")


class _ConfiguredStepInput(BaseModel, Generic[InputType]):
"""Input of Step used for configuration from JSON.

This Generic Pydantic Model is used to configure inputs of
Steps when using a JSON configuration file.
"""
value: InputType


def _default_dict() -> dict:
return {}

Expand Down Expand Up @@ -240,6 +259,8 @@ def __setattr__(self, __name: str, __value: Any) -> Any:
except BaseException as exc:
raise exc

elif isinstance(__value, _ConfiguredStepInput): # when configuring from JSON
self.inputs[index]._set_value(__value.value)
else:
if isinstance(__value, str) and _is_link(__value):
self.inputs[index]._set_value(__value, check=False)
Expand Down Expand Up @@ -281,9 +302,13 @@ def _yml(self) -> dict:
}
return d

def _save_cwl(self, path: Path) -> None:
def _save_cwl(self, path: Path, overwrite: bool) -> None:
cwl_adapters = path.joinpath("cwl_adapters")
cwl_adapters.mkdir(exist_ok=True, parents=True)
if not overwrite:
if cwl_adapters.joinpath(f"{self.clt_name}.cwl").exists():
return
# does not exist:
with open(
cwl_adapters.joinpath(f"{self.clt_name}.cwl"),
"w",
Expand All @@ -292,14 +317,63 @@ def _save_cwl(self, path: Path) -> None:
file.write(yaml.dump(self.yaml))


def _load_config(config_path: StrPath) -> Any:
"""Load configuration JSON file.

Returns `dict[str, Any]`.
Type annotation is `Any` because `json.load` type annotation is `Any`.
"""
with open(config_path, "r", encoding="utf-8") as file:
cfg = json.load(file)
return cfg


def configure_step(clt_path: StrPath, config_path: StrPath) -> Step:
"""Configure Step from JSON configuration file.

This function takes a path to a CLT (Command Line Tool) file and a path to a JSON configuration file,
and configures a `Step` object based on the provided configuration.

Args:
clt_path (StrPath): The path to the CLT file.
config_path (StrPath): The path to the JSON configuration file.


Returns:
Step: The configured `Step` object.

"""
clt = Step(clt_path)
input_names = clt._input_names # pylint: disable=W0212
config = config_path if isinstance(config_path, dict) else _load_config(config_path)
# dict with only the inputs that are in the clt
inp_dict = dict(filter(lambda x: x[0] in input_names, config.items()))
for inp_name, inp_value in inp_dict.items():
if isinstance(inp_value, str) and _is_link(inp_value): # linked * or & inp
# do not perform validation for linked inputs
setattr(clt, inp_name, inp_value)
else: # not linked inp
# generic_input performs validation with Pydantic
inp_type = clt.inputs[input_names.index(inp_name)].inp_type
try:
# inp_type is a variable, mypy complains
generic_input = _ConfiguredStepInput[inp_type](value=inp_value) # type: ignore
except ValidationError as exc:
raise InvalidInputValueError(f"invalid value for {inp_name}") from exc
setattr(clt, inp_name, generic_input)

return clt


DATACLASS_CONFIG = ConfigDict(validate_assignment=True)


@pydantic_dataclass(config=DATACLASS_CONFIG)
class Workflow:
steps: list[Step]
name: str
yml_path: Optional[Path] = field(default=None, init=False, repr=False)
path: Path = field(default=Path.cwd())
yml_path: Optional[Path] = field(default=None, init=False, repr=False) # pylint: disable=E3701

def __post_init__(self) -> None:
for s in self.steps:
Expand All @@ -309,6 +383,7 @@ def __post_init__(self) -> None:
raise InvalidStepError(
f"{s.clt_name} is missing required inputs"
) from exc
self.path.joinpath(self.name).mkdir(parents=True, exist_ok=True)

def append(self, step_: Step) -> None:
"""Append step to Workflow."""
Expand All @@ -328,25 +403,36 @@ def _save_yaml(self) -> None:
with open(self.yml_path, "w", encoding="utf-8") as file:
file.write(yaml.dump(self.yaml))

def _save_all_cwl(self) -> None:
# copy to wf path, only informational, not used by WIC
with open(self.path.joinpath(self.name, f"{self.name}.yml"), "w", encoding="utf-8") as file:
file.write(yaml.dump(self.yaml))

def _save_all_cwl(self, overwrite: bool) -> None:
"""Save CWL files to cwl_adapters.

This is necessary for WIC to compile the workflow.
"""
_WIC_PATH.mkdir(parents=True, exist_ok=True)
for s in self.steps:
try:
s._save_cwl(_WIC_PATH) # pylint: disable=W0212
s._save_cwl(_WIC_PATH, overwrite) # pylint: disable=W0212
except BaseException as exc:
raise exc

# copy to wf path, only informational, not used by WIC
for s in self.steps:
try:
s._save_cwl(self.path.joinpath(self.name), overwrite) # pylint: disable=W0212
except BaseException as exc:
raise exc

def compile(self, run_local: bool = False) -> Path:
def compile(self, run_local: bool = False, overwrite: bool = False) -> Path:
"""Compile Workflow using WIC.

Returns path to compiled CWL Workflow.
"""

self._save_all_cwl()
self._save_all_cwl(overwrite)
self._save_yaml()
logger.info(f"Compiling {self.name}")
args = ["wic", "--yaml", f"{self.name}.yml"]
Expand All @@ -363,12 +449,55 @@ def compile(self, run_local: bool = False) -> Path:
)
except subprocess.CalledProcessError as exc:
logger.error(exc.stderr)
logger.info(exc.stdout)
raise exc
logger.error(exc.stdout)
return None
logger.info(proc.stdout)
return _WIC_PATH.joinpath("autogenerated", f"{self.name}.cwl")
# copy files to wf path
compiled_cwl_path = _WIC_PATH.joinpath("autogenerated", f"{self.name}.cwl")
self.path.joinpath(self.name, f"{self.name}.cwl").symlink_to(compiled_cwl_path)
return compiled_cwl_path

def run(self) -> None:
def run(self, overwrite: bool = False) -> None:
"""Run compiled workflow."""
logger.info(f"Running {self.name}")
self.compile(run_local=True)
self.compile(run_local=True, overwrite=overwrite)


def configure_workflow( # pylint: disable=R0913
clt_list: list[StrPath],
config_list: list[StrPath],
name: str,
path: Union[str, Path] = Path.cwd(),
compile_workflow: bool = True,
run: bool = False,
overwrite: bool = False) -> Workflow:
"""Configure Workflow from list of CLT and configuration files.

This function takes a list of CLT (Command Line Tool) files and a list of configuration files
and creates a Workflow object. Each CLT file is paired with a corresponding configuration file
based on their order in the lists. The Workflow object is then returned.

Args:
clt_list (list[StrPath]): A list of paths to CLT files.
config_list (list[StrPath]): A list of paths to configuration files.
name (str): The name of the Workflow.
path (Optional[StrPath]): The path where the Workflow will be created. Defaults to `pathlib.Path.cwd()`.
compile_workflow (bool): Flag indicating whether to compile the Workflow using WIC. Defaults to True.
run (bool): Flag indicating whether to run the Workflow after configuration using WIC. Defaults to False.
overwrite (bool): Flag indicating whether to overwrite existing CLT files in cwl_adapters. Defaults to False.

Returns:
Workflow: The configured Workflow object.

"""
steps = [configure_step(clt, config) for clt, config in zip(clt_list, config_list)]
path_ = Path(path)
# mypy incorrectly marks this init
# of Workflow (pydantic dataclass) as invalid
# having 'too many arguments'
workflow = Workflow(steps, name, path_) # type: ignore
if run:
workflow.run(overwrite=overwrite)
elif compile_workflow:
workflow.compile(overwrite=overwrite)
return workflow
Loading