Skip to content

Commit

Permalink
feat: add compile_step/workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
camilovelezr committed Feb 29, 2024
1 parent 94d86c4 commit 0c318f8
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/wic/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .pythonapi import Step, Workflow
from .pythonapi import Step, Workflow, configure_step, configure_workflow

__all__ = ["Step", "Workflow"]
__all__ = ["Step", "Workflow", "configure_step", "configure_workflow"]
117 changes: 113 additions & 4 deletions src/wic/api/pythonapi.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
# pylint: disable=W1203
"""CLT utilities."""
import json
import logging
import subprocess
from dataclasses import field
from functools import singledispatch
from pathlib import Path
from typing import Any, ClassVar, Optional, TypeVar, Union
from typing import Any, ClassVar, Generic, Optional, TypeVar, Union

import cwl_utils.parser as cu_parser
import yaml
from cwl_utils.parser import CommandLineTool as CWLCommandLineTool
from cwl_utils.parser import load_document_by_uri
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator
from pydantic import ( # pylint: disable=E0611
BaseModel,
ConfigDict,
Field,
PrivateAttr,
ValidationError,
)
from pydantic.dataclasses import dataclass as pydantic_dataclass

from wic.api._types import CWL_TYPES_DICT
Expand Down Expand Up @@ -100,6 +107,18 @@ def _set_value(
self.linked = True


InputType = TypeVar("InputType")


class _ConfiguredStepInput(BaseModel, Generic[InputType]):
"""Input of Step used for configuration from JSON.
This Generic Pydantic Model is used to configure inputs of
Steps when using a JSON configuration file.
"""
value: InputType


def _default_dict() -> dict:
return {}

Expand Down Expand Up @@ -240,6 +259,8 @@ def __setattr__(self, __name: str, __value: Any) -> Any:
except BaseException as exc:
raise exc

elif isinstance(__value, _ConfiguredStepInput): # when configuring from JSON
self.inputs[index]._set_value(__value.value)
else:
if isinstance(__value, str) and _is_link(__value):
self.inputs[index]._set_value(__value, check=False)
Expand Down Expand Up @@ -296,6 +317,54 @@ def _save_cwl(self, path: Path, overwrite: bool) -> None:
file.write(yaml.dump(self.yaml))


def _load_config(config_path: StrPath) -> Any:
"""Load configuration JSON file.
Returns `dict[str, Any]`.
Type annotation is `Any` because `json.load` type annotation is `Any`.
"""
with open(config_path, "r", encoding="utf-8") as file:
cfg = json.load(file)
return cfg


def configure_step(clt_path: StrPath, config_path: StrPath) -> Step:
"""Configure Step from JSON configuration file.
This function takes a path to a CLT (Command Line Tool) file and a path to a JSON configuration file,
and configures a `Step` object based on the provided configuration.
Args:
clt_path (StrPath): The path to the CLT file.
config_path (StrPath): The path to the JSON configuration file.
Returns:
Step: The configured `Step` object.
"""
clt = Step(clt_path)
input_names = clt._input_names # pylint: disable=W0212
config = config_path if isinstance(config_path, dict) else _load_config(config_path)
# dict with only the inputs that are in the clt
inp_dict = dict(filter(lambda x: x[0] in input_names, config.items()))
for inp_name, inp_value in inp_dict.items():
if isinstance(inp_value, str) and _is_link(inp_value): # linked * or & inp
# do not perform validation for linked inputs
setattr(clt, inp_name, inp_value)
else: # not linked inp
# generic_input performs validation with Pydantic
inp_type = clt.inputs[input_names.index(inp_name)].inp_type
try:
# inp_type is a variable, mypy complains
generic_input = _ConfiguredStepInput[inp_type](value=inp_value) # type: ignore
except ValidationError as exc:
raise InvalidInputValueError(f"invalid value for {inp_name}") from exc
setattr(clt, inp_name, generic_input)

return clt


DATACLASS_CONFIG = ConfigDict(validate_assignment=True)


Expand Down Expand Up @@ -380,8 +449,8 @@ def compile(self, run_local: bool = False, overwrite: bool = False) -> Path:
)
except subprocess.CalledProcessError as exc:
logger.error(exc.stderr)
logger.info(exc.stdout)
raise exc
logger.error(exc.stdout)
return None
logger.info(proc.stdout)
# copy files to wf path
compiled_cwl_path = _WIC_PATH.joinpath("autogenerated", f"{self.name}.cwl")
Expand All @@ -392,3 +461,43 @@ def run(self, overwrite: bool = False) -> None:
"""Run compiled workflow."""
logger.info(f"Running {self.name}")
self.compile(run_local=True, overwrite=overwrite)


def configure_workflow( # pylint: disable=R0913
clt_list: list[StrPath],
config_list: list[StrPath],
name: str,
path: Union[str, Path] = Path.cwd(),
compile_workflow: bool = True,
run: bool = False,
overwrite: bool = False) -> Workflow:
"""Configure Workflow from list of CLT and configuration files.
This function takes a list of CLT (Command Line Tool) files and a list of configuration files
and creates a Workflow object. Each CLT file is paired with a corresponding configuration file
based on their order in the lists. The Workflow object is then returned.
Args:
clt_list (list[StrPath]): A list of paths to CLT files.
config_list (list[StrPath]): A list of paths to configuration files.
name (str): The name of the Workflow.
path (Optional[StrPath]): The path where the Workflow will be created. Defaults to `pathlib.Path.cwd()`.
compile_workflow (bool): Flag indicating whether to compile the Workflow using WIC. Defaults to True.
run (bool): Flag indicating whether to run the Workflow after configuration using WIC. Defaults to False.
overwrite (bool): Flag indicating whether to overwrite existing CLT files in cwl_adapters. Defaults to False.
Returns:
Workflow: The configured Workflow object.
"""
steps = [configure_step(clt, config) for clt, config in zip(clt_list, config_list)]
path_ = Path(path)
# mypy incorrectly marks this init
# of Workflow (pydantic dataclass) as invalid
# having 'too many arguments'
workflow = Workflow(steps, name, path_) # type: ignore
if run:
workflow.run(overwrite=overwrite)
elif compile_workflow:
workflow.compile(overwrite=overwrite)
return workflow

0 comments on commit 0c318f8

Please sign in to comment.