diff --git a/src/wic/api/__init__.py b/src/wic/api/__init__.py index 19585a29..580b2d6e 100644 --- a/src/wic/api/__init__.py +++ b/src/wic/api/__init__.py @@ -1,3 +1,3 @@ -from .pythonapi import Step, Workflow +from .pythonapi import Step, Workflow, configure_step, configure_workflow -__all__ = ["Step", "Workflow"] +__all__ = ["Step", "Workflow", "configure_step", "configure_workflow"] diff --git a/src/wic/api/pythonapi.py b/src/wic/api/pythonapi.py index cce9a4f9..2bd3becd 100644 --- a/src/wic/api/pythonapi.py +++ b/src/wic/api/pythonapi.py @@ -1,17 +1,24 @@ # pylint: disable=W1203 """CLT utilities.""" +import json import logging import subprocess from dataclasses import field from functools import singledispatch from pathlib import Path -from typing import Any, ClassVar, Optional, TypeVar, Union +from typing import Any, ClassVar, Generic, Optional, TypeVar, Union import cwl_utils.parser as cu_parser import yaml from cwl_utils.parser import CommandLineTool as CWLCommandLineTool from cwl_utils.parser import load_document_by_uri -from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator +from pydantic import ( # pylint: disable=E0611 + BaseModel, + ConfigDict, + Field, + PrivateAttr, + ValidationError, +) from pydantic.dataclasses import dataclass as pydantic_dataclass from wic.api._types import CWL_TYPES_DICT @@ -100,6 +107,18 @@ def _set_value( self.linked = True +InputType = TypeVar("InputType") + + +class _ConfiguredStepInput(BaseModel, Generic[InputType]): + """Input of Step used for configuration from JSON. + + This Generic Pydantic Model is used to configure inputs of + Steps when using a JSON configuration file. + """ + value: InputType + + def _default_dict() -> dict: return {} @@ -240,6 +259,8 @@ def __setattr__(self, __name: str, __value: Any) -> Any: except BaseException as exc: raise exc + elif isinstance(__value, _ConfiguredStepInput): # when configuring from JSON + self.inputs[index]._set_value(__value.value) else: if isinstance(__value, str) and _is_link(__value): self.inputs[index]._set_value(__value, check=False) @@ -281,9 +302,13 @@ def _yml(self) -> dict: } return d - def _save_cwl(self, path: Path) -> None: + def _save_cwl(self, path: Path, overwrite: bool) -> None: cwl_adapters = path.joinpath("cwl_adapters") cwl_adapters.mkdir(exist_ok=True, parents=True) + if not overwrite: + if cwl_adapters.joinpath(f"{self.clt_name}.cwl").exists(): + return + # does not exist: with open( cwl_adapters.joinpath(f"{self.clt_name}.cwl"), "w", @@ -292,6 +317,54 @@ def _save_cwl(self, path: Path) -> None: file.write(yaml.dump(self.yaml)) +def _load_config(config_path: StrPath) -> Any: + """Load configuration JSON file. + + Returns `dict[str, Any]`. + Type annotation is `Any` because `json.load` type annotation is `Any`. + """ + with open(config_path, "r", encoding="utf-8") as file: + cfg = json.load(file) + return cfg + + +def configure_step(clt_path: StrPath, config_path: StrPath) -> Step: + """Configure Step from JSON configuration file. + + This function takes a path to a CLT (Command Line Tool) file and a path to a JSON configuration file, + and configures a `Step` object based on the provided configuration. + + Args: + clt_path (StrPath): The path to the CLT file. + config_path (StrPath): The path to the JSON configuration file. + + + Returns: + Step: The configured `Step` object. + + """ + clt = Step(clt_path) + input_names = clt._input_names # pylint: disable=W0212 + config = config_path if isinstance(config_path, dict) else _load_config(config_path) + # dict with only the inputs that are in the clt + inp_dict = dict(filter(lambda x: x[0] in input_names, config.items())) + for inp_name, inp_value in inp_dict.items(): + if isinstance(inp_value, str) and _is_link(inp_value): # linked * or & inp + # do not perform validation for linked inputs + setattr(clt, inp_name, inp_value) + else: # not linked inp + # generic_input performs validation with Pydantic + inp_type = clt.inputs[input_names.index(inp_name)].inp_type + try: + # inp_type is a variable, mypy complains + generic_input = _ConfiguredStepInput[inp_type](value=inp_value) # type: ignore + except ValidationError as exc: + raise InvalidInputValueError(f"invalid value for {inp_name}") from exc + setattr(clt, inp_name, generic_input) + + return clt + + DATACLASS_CONFIG = ConfigDict(validate_assignment=True) @@ -299,7 +372,8 @@ def _save_cwl(self, path: Path) -> None: class Workflow: steps: list[Step] name: str - yml_path: Optional[Path] = field(default=None, init=False, repr=False) + path: Path = field(default=Path.cwd()) + yml_path: Optional[Path] = field(default=None, init=False, repr=False) # pylint: disable=E3701 def __post_init__(self) -> None: for s in self.steps: @@ -309,6 +383,7 @@ def __post_init__(self) -> None: raise InvalidStepError( f"{s.clt_name} is missing required inputs" ) from exc + self.path.joinpath(self.name).mkdir(parents=True, exist_ok=True) def append(self, step_: Step) -> None: """Append step to Workflow.""" @@ -328,7 +403,11 @@ def _save_yaml(self) -> None: with open(self.yml_path, "w", encoding="utf-8") as file: file.write(yaml.dump(self.yaml)) - def _save_all_cwl(self) -> None: + # copy to wf path, only informational, not used by WIC + with open(self.path.joinpath(self.name, f"{self.name}.yml"), "w", encoding="utf-8") as file: + file.write(yaml.dump(self.yaml)) + + def _save_all_cwl(self, overwrite: bool) -> None: """Save CWL files to cwl_adapters. This is necessary for WIC to compile the workflow. @@ -336,17 +415,24 @@ def _save_all_cwl(self) -> None: _WIC_PATH.mkdir(parents=True, exist_ok=True) for s in self.steps: try: - s._save_cwl(_WIC_PATH) # pylint: disable=W0212 + s._save_cwl(_WIC_PATH, overwrite) # pylint: disable=W0212 + except BaseException as exc: + raise exc + + # copy to wf path, only informational, not used by WIC + for s in self.steps: + try: + s._save_cwl(self.path.joinpath(self.name), overwrite) # pylint: disable=W0212 except BaseException as exc: raise exc - def compile(self, run_local: bool = False) -> Path: + def compile(self, run_local: bool = False, overwrite: bool = False) -> Path: """Compile Workflow using WIC. Returns path to compiled CWL Workflow. """ - self._save_all_cwl() + self._save_all_cwl(overwrite) self._save_yaml() logger.info(f"Compiling {self.name}") args = ["wic", "--yaml", f"{self.name}.yml"] @@ -363,12 +449,55 @@ def compile(self, run_local: bool = False) -> Path: ) except subprocess.CalledProcessError as exc: logger.error(exc.stderr) - logger.info(exc.stdout) - raise exc + logger.error(exc.stdout) + return None logger.info(proc.stdout) - return _WIC_PATH.joinpath("autogenerated", f"{self.name}.cwl") + # copy files to wf path + compiled_cwl_path = _WIC_PATH.joinpath("autogenerated", f"{self.name}.cwl") + self.path.joinpath(self.name, f"{self.name}.cwl").symlink_to(compiled_cwl_path) + return compiled_cwl_path - def run(self) -> None: + def run(self, overwrite: bool = False) -> None: """Run compiled workflow.""" logger.info(f"Running {self.name}") - self.compile(run_local=True) + self.compile(run_local=True, overwrite=overwrite) + + +def configure_workflow( # pylint: disable=R0913 + clt_list: list[StrPath], + config_list: list[StrPath], + name: str, + path: Union[str, Path] = Path.cwd(), + compile_workflow: bool = True, + run: bool = False, + overwrite: bool = False) -> Workflow: + """Configure Workflow from list of CLT and configuration files. + + This function takes a list of CLT (Command Line Tool) files and a list of configuration files + and creates a Workflow object. Each CLT file is paired with a corresponding configuration file + based on their order in the lists. The Workflow object is then returned. + + Args: + clt_list (list[StrPath]): A list of paths to CLT files. + config_list (list[StrPath]): A list of paths to configuration files. + name (str): The name of the Workflow. + path (Optional[StrPath]): The path where the Workflow will be created. Defaults to `pathlib.Path.cwd()`. + compile_workflow (bool): Flag indicating whether to compile the Workflow using WIC. Defaults to True. + run (bool): Flag indicating whether to run the Workflow after configuration using WIC. Defaults to False. + overwrite (bool): Flag indicating whether to overwrite existing CLT files in cwl_adapters. Defaults to False. + + Returns: + Workflow: The configured Workflow object. + + """ + steps = [configure_step(clt, config) for clt, config in zip(clt_list, config_list)] + path_ = Path(path) + # mypy incorrectly marks this init + # of Workflow (pydantic dataclass) as invalid + # having 'too many arguments' + workflow = Workflow(steps, name, path_) # type: ignore + if run: + workflow.run(overwrite=overwrite) + elif compile_workflow: + workflow.compile(overwrite=overwrite) + return workflow