diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index e40d65bed..cd615f53f 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -36,7 +36,7 @@ jobs: - commit: "6397014050177074c9ccd0d771577f7fa9f728a3" exclude: "docker_entrypoint,stdin_shorcut,inplace_update_on_file_content" version: "v1.1" - - commit: "ad6f77c648ae9f0eaa2fd53ba7032b0523e12de8" + - commit: "f79146f6b59884ecbe6445ab3cb2b8a374b0ee64" exclude: "docker_entrypoint,modify_file_content" version: "v1.2" steps: diff --git a/requirements.txt b/requirements.txt index 3833cc38f..c1984e6d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ asyncssh==2.13.0 bcrypt==4.0.1 cachetools==5.3.0 cwltool==3.1.20230127121939 -cwl-utils==0.22 +cwl-utils==0.23 importlib_metadata==6.0.0 Jinja2==3.1.2 jsonref==1.1.0 diff --git a/streamflow/cwl/main.py b/streamflow/cwl/main.py index 0b275a1dd..829271469 100644 --- a/streamflow/cwl/main.py +++ b/streamflow/cwl/main.py @@ -3,6 +3,7 @@ import logging import os +import cwl_utils.parser import cwltool.context import cwltool.load_tool import cwltool.loghandler @@ -12,7 +13,7 @@ from streamflow.config.config import WorkflowConfig from streamflow.core.context import StreamFlowContext from streamflow.cwl.translator import CWLTranslator -from streamflow.cwl.utils import load_cwl_inputs, load_cwl_workflow +from streamflow.cwl.utils import load_cwl_inputs from streamflow.log_handler import logger from streamflow.workflow.executor import StreamFlowExecutor @@ -43,9 +44,12 @@ async def main( # noinspection PyProtectedMember cwltool.loghandler._logger.setLevel(logging.WARN) # Load CWL workflow definition - cwl_definition, loading_context = load_cwl_workflow(cwl_args[0]) + cwl_definition = cwl_utils.parser.load_document_by_uri(cwl_args[0]) if len(cwl_args) == 2: - cwl_inputs = load_cwl_inputs(loading_context, cwl_definition, cwl_args[1]) + loading_context = cwltool.context.LoadingContext() + cwl_inputs = load_cwl_inputs( + loading_context, cwl_definition.loadingOptions, cwl_args[1] + ) else: cwl_inputs = {} # Transpile CWL workflow to the StreamFlow representation @@ -58,7 +62,6 @@ async def main( cwl_definition=cwl_definition, cwl_inputs=cwl_inputs, workflow_config=workflow_config, - loading_context=loading_context, ) if logger.isEnabledFor(logging.INFO): logger.info("Building workflow execution plan") diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 8b58e13a6..f1093b8bc 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1,6 +1,7 @@ from __future__ import annotations import copy +import importlib import logging import os import posixpath @@ -14,15 +15,13 @@ MutableMapping, MutableSequence, cast, + get_args, ) -import cwltool.command_line_tool -import cwltool.context -import cwltool.docker_id -import cwltool.process -import cwltool.workflow +import cwl_utils.parser +import cwl_utils.parser.utils from rdflib import Graph -from ruamel.yaml.comments import CommentedSeq +from schema_salad.exceptions import ValidationException from streamflow.config.config import WorkflowConfig from streamflow.core.config import BindingConfig @@ -109,59 +108,60 @@ def _create_command( - cwl_element: cwltool.command_line_tool.CommandLineTool, + cwl_element: cwl_utils.parser.CommandLineTool, cwl_name_prefix: str, schema_def_types: MutableMapping[str, Any], context: MutableMapping[str, Any], step: Step, ) -> CWLCommand: - command = CWLCommand(step) - # Process InitialWorkDirRequirement requirements = {**context["hints"], **context["requirements"]} + # Process ShellCommandRequirement + is_shell_command = "ShellCommandRequirement" in requirements + # Create command + command = CWLCommand( + step=step, + base_command=( + cwl_element.baseCommand + if isinstance(cwl_element.baseCommand, MutableSequence) + else ( + [cwl_element.baseCommand] if cwl_element.baseCommand is not None else [] + ) + ), + command_tokens=[ + _get_command_token(binding=a, is_shell_command=is_shell_command) + for a in (cwl_element.arguments or []) + ], + success_codes=cwl_element.successCodes, + failure_codes=(cwl_element.permanentFailCodes or []).extend( + cwl_element.permanentFailCodes or [] + ) + or None, + is_shell_command=is_shell_command, + step_stdin=cwl_element.stdin, + step_stdout=cwl_element.stdout, + step_stderr=cwl_element.stderr, + ) + # Process InitialWorkDirRequirement if "InitialWorkDirRequirement" in requirements: - command.initial_work_dir = requirements["InitialWorkDirRequirement"]["listing"] + command.initial_work_dir = _inject_value( + requirements["InitialWorkDirRequirement"].listing + ) command.absolute_initial_workdir_allowed = ( "DockerRequirement" in context["requirements"] ) # Process InplaceUpdateRequirement if "InplaceUpdateRequirement" in requirements: - command.inplace_update = requirements["InplaceUpdateRequirement"][ - "inplaceUpdate" - ] + command.inplace_update = requirements["InplaceUpdateRequirement"].inplaceUpdate # Process EnvVarRequirement if "EnvVarRequirement" in requirements: - for env_entry in requirements["EnvVarRequirement"]["envDef"]: - command.environment[env_entry["envName"]] = env_entry["envValue"] - # Process ShellCommandRequirement - if "ShellCommandRequirement" in requirements: - command.is_shell_command = True - # Process success and failure codes - if "successCodes" in cwl_element.tool: - command.success_codes = cwl_element.tool["successCodes"] - if "permanentFailCodes" or "temporaryFailCodes" in cwl_element.tool: - command.failure_codes = cwl_element.tool.get("permanentFailCodes", []) - command.failure_codes.extend(cwl_element.tool.get("temporaryFailCodes", [])) - # Process baseCommand - if "baseCommand" in cwl_element.tool: - if isinstance(cwl_element.tool["baseCommand"], CommentedSeq): - for command_token in cwl_element.tool["baseCommand"]: - command.base_command.append(command_token) - else: - command.base_command.append(cwl_element.tool["baseCommand"]) - # Process arguments - if "arguments" in cwl_element.tool: - for argument in cwl_element.tool["arguments"]: - command.command_tokens.append( - _get_command_token( - binding=argument, is_shell_command=command.is_shell_command - ) - ) + for env_entry in requirements["EnvVarRequirement"].envDef: + command.environment[env_entry.envName] = env_entry.envValue # Process inputs - for input_port in cwl_element.tool["inputs"]: + for input_port in cwl_element.inputs: command_token = _get_command_token_from_input( cwl_element=input_port, - port_type=input_port["type"], - input_name=utils.get_name("", cwl_name_prefix, input_port["id"]), + port_type=input_port.type, + input_name=utils.get_name("", cwl_name_prefix, input_port.id), is_shell_command=command.is_shell_command, schema_def_types=schema_def_types, ) @@ -175,103 +175,93 @@ def _create_command_output_processor( workflow: Workflow, port_target: Target | None, port_type: Any, - cwl_element: MutableMapping[str, Any], + cwl_element: cwl_utils.parser.CommandOutputParameter, cwl_name_prefix: str, schema_def_types: MutableMapping[str, Any], context: MutableMapping[str, Any], optional: bool = False, ) -> CommandOutputProcessor: - if isinstance(port_type, MutableMapping): - if "type" in port_type: - # Array type: -> MapCommandOutputProcessor - if port_type["type"] == "array": - return CWLMapCommandOutputProcessor( - name=port_name, + # Array type: -> MapCommandOutputProcessor + if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + return CWLMapCommandOutputProcessor( + name=port_name, + workflow=workflow, + processor=_create_command_output_processor( + port_name=port_name, + workflow=workflow, + port_target=port_target, + port_type=port_type.items, + cwl_element=cwl_element, + cwl_name_prefix=cwl_name_prefix, + schema_def_types=schema_def_types, + context=context, + optional=optional, + ), + ) + # Enum type: -> create command output processor + elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + # Process InlineJavascriptRequirement + requirements = {**context["hints"], **context["requirements"]} + expression_lib, full_js = _process_javascript_requirement(requirements) + if type_name := port_type.name: + if type_name.startswith("_:"): + enum_prefix = cwl_name_prefix + else: + enum_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) + else: + enum_prefix = cwl_name_prefix + # Return OutputProcessor + return CWLCommandOutputProcessor( + name=port_name, + workflow=workflow, + target=port_target, + token_type=port_type.type, + enum_symbols=[ + posixpath.relpath( + utils.get_name(posixpath.sep, posixpath.sep, s), enum_prefix + ) + for s in port_type.symbols + ], + expression_lib=expression_lib, + full_js=full_js, + optional=optional, + ) + # Record type: -> ObjectCommandOutputProcessor + elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + # Process InlineJavascriptRequirement + requirements = {**context["hints"], **context["requirements"]} + expression_lib, full_js = _process_javascript_requirement(requirements) + # Create processor + if (type_name := getattr(port_type, "name", port_name)).startswith("_:"): + type_name = port_name + record_name_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) + return CWLObjectCommandOutputProcessor( + name=port_name, + workflow=workflow, + processors={ + utils.get_name( + "", record_name_prefix, port_type.name + ): _create_command_output_processor( + port_name=port_name, + port_target=port_target, workflow=workflow, - processor=_create_command_output_processor( - port_name=port_name, - workflow=workflow, - port_target=port_target, - port_type=port_type["items"], - cwl_element=cwl_element, - cwl_name_prefix=cwl_name_prefix, - schema_def_types=schema_def_types, - context=context, - optional=optional, + port_type=port_type.type, + cwl_element=port_type, + cwl_name_prefix=posixpath.join( + record_name_prefix, + utils.get_name("", record_name_prefix, port_type.name), ), + schema_def_types=schema_def_types, + context=context, ) - # Enum type: -> create command output processor - elif port_type["type"] == "enum": - # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} - expression_lib, full_js = _process_javascript_requirement(requirements) - enum_prefix = ( - utils.get_name(posixpath.sep, posixpath.sep, port_type["name"]) - if "name" in port_type - else cwl_name_prefix - ) - # Return OutputProcessor - return CWLCommandOutputProcessor( - name=port_name, - workflow=workflow, - target=port_target, - token_type=port_type["type"], - enum_symbols=[ - posixpath.relpath( - utils.get_name(posixpath.sep, posixpath.sep, s), enum_prefix - ) - for s in port_type["symbols"] - ], - expression_lib=expression_lib, - full_js=full_js, - optional=optional, - ) - # Record type: -> ObjectCommandOutputProcessor - elif port_type["type"] == "record": - # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} - expression_lib, full_js = _process_javascript_requirement(requirements) - # Create processor - record_name_prefix = utils.get_name( - posixpath.sep, posixpath.sep, port_type.get("name", port_name) - ) - return CWLObjectCommandOutputProcessor( - name=port_name, - workflow=workflow, - processors={ - utils.get_name( - "", record_name_prefix, port_type["name"] - ): _create_command_output_processor( - port_name=port_name, - port_target=port_target, - workflow=workflow, - port_type=port_type["type"], - cwl_element=port_type, - cwl_name_prefix=posixpath.join( - record_name_prefix, - utils.get_name( - "", record_name_prefix, port_type["name"] - ), - ), - schema_def_types=schema_def_types, - context=context, - ) - for port_type in port_type["fields"] - }, - expression_lib=expression_lib, - full_js=full_js, - output_eval=cwl_element.get("outputBinding", {}).get("outputEval"), - ) - # Unknown type -> raise Exception - else: - raise WorkflowDefinitionException( - f"Unsupported type {port_type['type']} for port {port_name}." - ) - # Untyped object -> not supported - else: - raise WorkflowDefinitionException( - "Unsupported dictionary type without explicit `type` key" - ) + for port_type in port_type.fields + }, + expression_lib=expression_lib, + full_js=full_js, + output_eval=cwl_element.outputBinding.outputEval + if getattr(cwl_element, "outputBinding", None) + else None, + ) elif isinstance(port_type, MutableSequence): optional = "null" in port_type types = [t for t in filter(lambda x: x != "null", port_type)] @@ -334,22 +324,26 @@ def _create_command_output_processor( target=port_target, token_type=port_type, expression_lib=expression_lib, - file_format=cwl_element.get("format"), + file_format=getattr(cwl_element, "format", None), full_js=full_js, - glob=cwl_element.get("outputBinding", {}).get( - "glob", cwl_element.get("path") - ), - load_contents=cwl_element.get( - "loadContents", - cwl_element.get("outputBinding", {}).get("loadContents", False), + glob=( + cwl_element.outputBinding.glob + if getattr(cwl_element, "outputBinding", None) + else getattr(cwl_element, "path", None) ), + load_contents=_get_load_contents(cwl_element), load_listing=_get_load_listing(cwl_element, context), optional=optional, - output_eval=cwl_element.get("outputBinding", {}).get("outputEval"), + output_eval=( + cwl_element.outputBinding.outputEval + if getattr(cwl_element, "outputBinding", None) + else None + ), secondary_files=_get_secondary_files( - cwl_element.get("secondaryFiles"), default_required=False + cwl_element=getattr(cwl_element, "secondaryFiles", None), + default_required=False, ), - streamable=cwl_element.get("streamable", False), + streamable=getattr(cwl_element, "streamable", None), ) else: # Normalize port type (Python does not distinguish among all CWL number types) @@ -367,21 +361,28 @@ def _create_command_output_processor( token_type=port_type, expression_lib=expression_lib, full_js=full_js, - glob=cwl_element.get("outputBinding", {}).get( - "glob", cwl_element.get("path") - ), - load_contents=cwl_element.get( - "loadContents", - cwl_element.get("outputBinding", {}).get("loadContents", False), - ), + glob=cwl_element.outputBinding.glob + if getattr(cwl_element, "outputBinding", None) + else None, + load_contents=_get_load_contents(cwl_element), load_listing=_get_load_listing(cwl_element, context), optional=optional, - output_eval=cwl_element.get("outputBinding", {}).get("outputEval"), + output_eval=( + cwl_element.outputBinding.outputEval + if getattr(cwl_element, "outputBinding", None) + else None + ), ) -def _create_context() -> MutableMapping[str, Any]: - return {"default": {}, "requirements": {}, "hints": {}} +def _create_context(version: str) -> MutableMapping[str, Any]: + return { + "default": {}, + "elements": {}, + "requirements": {}, + "hints": {}, + "version": version, + } def _create_list_merger( @@ -479,7 +480,11 @@ def _create_token_processor( port_name: str, workflow: Workflow, port_type: Any, - cwl_element: MutableMapping[str, Any], + cwl_element: ( + cwl_utils.parser.InputParameter + | cwl_utils.parser.OutputParameter + | cwl_utils.parser.WorkflowStepInput + ), cwl_name_prefix: str, schema_def_types: MutableMapping[str, Any], format_graph: Graph, @@ -490,96 +495,84 @@ def _create_token_processor( force_deep_listing: bool = False, only_propagate_secondary_files: bool = True, ) -> TokenProcessor: - if isinstance(port_type, MutableMapping): - if "type" in port_type: - # Array type: -> MapTokenProcessor - if port_type["type"] == "array": - return CWLMapTokenProcessor( - name=port_name, + # Array type: -> MapTokenProcessor + if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + return CWLMapTokenProcessor( + name=port_name, + workflow=workflow, + processor=_create_token_processor( + port_name=port_name, + workflow=workflow, + port_type=port_type.items, + cwl_element=cwl_element, + cwl_name_prefix=cwl_name_prefix, + schema_def_types=schema_def_types, + format_graph=format_graph, + context=context, + optional=optional, + check_type=check_type, + force_deep_listing=force_deep_listing, + only_propagate_secondary_files=only_propagate_secondary_files, + ), + ) + # Enum type: -> create output processor + elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + # Process InlineJavascriptRequirement + requirements = {**context["hints"], **context["requirements"]} + expression_lib, full_js = _process_javascript_requirement(requirements) + if type_name := port_type.name: + if type_name.startswith("_:"): + enum_prefix = cwl_name_prefix + else: + enum_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) + else: + enum_prefix = cwl_name_prefix + # Return TokenProcessor + return CWLTokenProcessor( + name=port_name, + workflow=workflow, + token_type=port_type.type, + check_type=check_type, + enum_symbols=[ + posixpath.relpath( + utils.get_name(posixpath.sep, posixpath.sep, s), enum_prefix + ) + for s in port_type.symbols + ], + expression_lib=expression_lib, + full_js=full_js, + optional=optional, + ) + # Record type: -> ObjectTokenProcessor + elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + if (type_name := getattr(port_type, "name", port_name)).startswith("_:"): + type_name = port_name + record_name_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) + return CWLObjectTokenProcessor( + name=port_name, + workflow=workflow, + processors={ + utils.get_name( + "", record_name_prefix, port_type.name + ): _create_token_processor( + port_name=port_name, workflow=workflow, - processor=_create_token_processor( - port_name=port_name, - workflow=workflow, - port_type=port_type["items"], - cwl_element=cwl_element, - cwl_name_prefix=cwl_name_prefix, - schema_def_types=schema_def_types, - format_graph=format_graph, - context=context, - optional=optional, - check_type=check_type, - force_deep_listing=force_deep_listing, - only_propagate_secondary_files=only_propagate_secondary_files, + port_type=port_type.type, + cwl_element=port_type, + cwl_name_prefix=posixpath.join( + record_name_prefix, + utils.get_name("", record_name_prefix, port_type.name), ), - ) - # Enum type: -> create output processor - elif port_type["type"] == "enum": - # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} - expression_lib, full_js = _process_javascript_requirement(requirements) - enum_prefix = ( - utils.get_name(posixpath.sep, posixpath.sep, port_type["name"]) - if "name" in port_type - else cwl_name_prefix - ) - # Return TokenProcessor - return CWLTokenProcessor( - name=port_name, - workflow=workflow, - token_type=port_type["type"], + schema_def_types=schema_def_types, + format_graph=format_graph, + context=context, check_type=check_type, - enum_symbols=[ - posixpath.relpath( - utils.get_name(posixpath.sep, posixpath.sep, s), enum_prefix - ) - for s in port_type["symbols"] - ], - expression_lib=expression_lib, - full_js=full_js, - optional=optional, - ) - # Record type: -> ObjectTokenProcessor - elif port_type["type"] == "record": - record_name_prefix = utils.get_name( - posixpath.sep, posixpath.sep, port_type.get("name", port_name) - ) - return CWLObjectTokenProcessor( - name=port_name, - workflow=workflow, - processors={ - utils.get_name( - "", record_name_prefix, port_type["name"] - ): _create_token_processor( - port_name=port_name, - workflow=workflow, - port_type=port_type["type"], - cwl_element=port_type, - cwl_name_prefix=posixpath.join( - record_name_prefix, - utils.get_name( - "", record_name_prefix, port_type["name"] - ), - ), - schema_def_types=schema_def_types, - format_graph=format_graph, - context=context, - check_type=check_type, - force_deep_listing=force_deep_listing, - only_propagate_secondary_files=only_propagate_secondary_files, - ) - for port_type in port_type["fields"] - }, - ) - # Unknown type -> raise Exception - else: - raise WorkflowDefinitionException( - f"Unsupported type {port_type['type']} for port {port_name}." + force_deep_listing=force_deep_listing, + only_propagate_secondary_files=only_propagate_secondary_files, ) - # Untyped object -> not supported - else: - raise WorkflowDefinitionException( - "Unsupported dictionary type without explicit `type` key" - ) + for port_type in port_type.fields + }, + ) elif isinstance(port_type, MutableSequence): optional = "null" in port_type types = [t for t in filter(lambda x: x != "null", port_type)] @@ -651,13 +644,10 @@ def _create_token_processor( token_type=port_type, check_type=check_type, expression_lib=expression_lib, - file_format=cwl_element.get("format"), + file_format=getattr(cwl_element, "format", None), format_graph=format_graph, full_js=full_js, - load_contents=cwl_element.get( - "loadContents", - cwl_element.get("inputBinding", {}).get("loadContents"), - ), + load_contents=_get_load_contents(cwl_element), load_listing=( LoadListing.deep_listing if force_deep_listing @@ -666,10 +656,10 @@ def _create_token_processor( only_propagate_secondary_files=only_propagate_secondary_files, optional=optional, secondary_files=_get_secondary_files( - cwl_element=cwl_element.get("secondaryFiles"), + cwl_element=getattr(cwl_element, "secondaryFiles", None), default_required=default_required_sf, ), - streamable=cwl_element.get("streamable", False), + streamable=getattr(cwl_element, "streamable", None), ) else: # Normalize port type (Python does not distinguish among all CWL number types) @@ -687,10 +677,7 @@ def _create_token_processor( check_type=check_type, expression_lib=expression_lib, full_js=full_js, - load_contents=cwl_element.get( - "loadContents", - cwl_element.get("inputBinding", {}).get("loadContents"), - ), + load_contents=_get_load_contents(cwl_element), load_listing=( LoadListing.deep_listing if force_deep_listing @@ -704,7 +691,7 @@ def _create_token_transformer( name: str, port_name: str, workflow: Workflow, - cwl_element: MutableMapping[str, Any], + cwl_element: cwl_utils.parser.InputParameter, cwl_name_prefix: str, schema_def_types: MutableMapping[str, Any], format_graph: Graph, @@ -719,7 +706,7 @@ def _create_token_transformer( processor=_create_token_processor( port_name=port_name, workflow=workflow, - port_type=cwl_element["type"], + port_type=cwl_element.type, cwl_element=cwl_element, cwl_name_prefix=cwl_name_prefix, schema_def_types=schema_def_types, @@ -735,6 +722,7 @@ def _create_token_transformer( def _get_command_token( binding: Any, + value_from: Any | None = None, is_shell_command: bool = False, input_name: str | None = None, token_type: str | None = None, @@ -747,13 +735,13 @@ def _get_command_token( if token_type == "float" # nosec else token_type ) - if isinstance(binding, MutableMapping): - item_separator = binding.get("itemSeparator", None) - position = binding.get("position", 0) - prefix = binding.get("prefix", None) - separate = binding.get("separate", True) - shell_quote = binding.get("shellQuote", True) - value = binding["valueFrom"] if "valueFrom" in binding else None + if isinstance(binding, get_args(cwl_utils.parser.CommandLineBinding)): + item_separator = binding.itemSeparator + position = binding.position if binding.position is not None else 0 + prefix = binding.prefix + separate = binding.separate if binding.separate is not None else True + shell_quote = binding.shellQuote if binding.shellQuote is not None else True + value = value_from or binding.valueFrom return CWLCommandToken( name=input_name, value=value, @@ -777,64 +765,53 @@ def _get_command_token_from_input( schema_def_types: MutableMapping[str, Any] | None = None, ): token = None - command_line_binding = cwl_element.get("inputBinding", None) - if isinstance(port_type, MutableMapping): - if "type" in port_type: - # Array type: -> CWLMapCommandToken - if port_type["type"] == "array": - token = _get_command_token_from_input( - cwl_element=port_type, - port_type=port_type["items"], - input_name=input_name, - is_shell_command=is_shell_command, - schema_def_types=schema_def_types, - ) - if token is not None: - token = CWLMapCommandToken( - name=input_name, value=token, is_shell_command=is_shell_command - ) - # Enum type: -> substitute the type with string and reprocess - elif port_type["type"] == "enum": - return _get_command_token_from_input( - cwl_element=cwl_element, - port_type="string", - input_name=input_name, + command_line_binding = cwl_element.inputBinding + # Array type: -> CWLMapCommandToken + if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + token = _get_command_token_from_input( + cwl_element=port_type, + port_type=port_type.items, + input_name=input_name, + is_shell_command=is_shell_command, + schema_def_types=schema_def_types, + ) + if token is not None: + token = CWLMapCommandToken( + name=input_name, value=token, is_shell_command=is_shell_command + ) + # Enum type: -> substitute the type with string and reprocess + if isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + return _get_command_token_from_input( + cwl_element=cwl_element, + port_type="string", + input_name=input_name, + is_shell_command=is_shell_command, + schema_def_types=schema_def_types, + ) + # Object type: -> CWLObjectCommandToken + if isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + if (type_name := getattr(port_type, "name", input_name)).startswith("_:"): + type_name = input_name + record_name_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) + command_tokens = {} + for el in port_type.fields: + key = utils.get_name("", record_name_prefix, el.name) + if ( + el_token := _get_command_token_from_input( + cwl_element=el, + port_type=el.type, + input_name=key, is_shell_command=is_shell_command, schema_def_types=schema_def_types, ) - # Object type: -> CWLObjectCommandToken - elif port_type["type"] == "record": - record_name_prefix = utils.get_name( - posixpath.sep, posixpath.sep, port_type.get("name", input_name) - ) - command_tokens = {} - for el in port_type["fields"]: - key = utils.get_name("", record_name_prefix, el["name"]) - el_token = _get_command_token_from_input( - cwl_element=el, - port_type=el["type"], - input_name=key, - is_shell_command=is_shell_command, - schema_def_types=schema_def_types, - ) - if el_token is not None: - command_tokens[key] = el_token - if command_tokens: - token = CWLObjectCommandToken( - name=input_name, - value=command_tokens, - is_shell_command=True, - shell_quote=False, - ) - # Unknown type -> raise Exception - else: - raise WorkflowDefinitionException( - f"Unsupported type {port_type['type']} for port {input_name}." - ) - # Untyped object -> not supported - else: - raise WorkflowDefinitionException( - "Unsupported dictionary type without explicit `type` key" + ) is not None: + command_tokens[key] = el_token + if command_tokens: + token = CWLObjectCommandToken( + name=input_name, + value=command_tokens, + is_shell_command=True, + shell_quote=False, ) elif isinstance(port_type, MutableSequence): types = [t for t in filter(lambda x: x != "null", port_type)] @@ -881,16 +858,18 @@ def _get_command_token_from_input( if command_line_binding is not None: if token is not None: # By default, do not escape composite command tokens - if "shellQuote" not in command_line_binding: - command_line_binding["shellQuote"] = False + if command_line_binding.shellQuote is None: + command_line_binding.shellQuote = False return _get_command_token( - binding={**command_line_binding, **{"valueFrom": token}}, + binding=command_line_binding, + value_from=token, is_shell_command=True, input_name=input_name, ) else: return _get_command_token( - binding={**command_line_binding, **{"valueFrom": token}}, + binding=command_line_binding, + value_from=token, is_shell_command=is_shell_command, input_name=input_name, ) @@ -917,41 +896,153 @@ def _get_hardware_requirement( ) if "ResourceRequirement" in requirements: resource_requirement = requirements["ResourceRequirement"] - hardware_requirement.cores = resource_requirement.get( - "coresMin", resource_requirement.get("coresMax", hardware_requirement.cores) + hardware_requirement.cores = ( + resource_requirement.coresMin + if resource_requirement.coresMin is not None + else ( + resource_requirement.coresMax + if resource_requirement.coresMax is not None + else hardware_requirement.cores + ) ) - hardware_requirement.memory = resource_requirement.get( - "ramMin", resource_requirement.get("ramMax", hardware_requirement.memory) + hardware_requirement.memory = ( + resource_requirement.ramMin + if resource_requirement.ramMin is not None + else ( + resource_requirement.ramMax + if resource_requirement.ramMax is not None + else hardware_requirement.memory + ) ) - hardware_requirement.tmpdir = resource_requirement.get( - "tmpdirMin", - resource_requirement.get("tmpdirMax", hardware_requirement.tmpdir), + hardware_requirement.tmpdir = ( + resource_requirement.tmpdirMin + if resource_requirement.tmpdirMin is not None + else ( + resource_requirement.tmpdirMax + if resource_requirement.tmpdirMax is not None + else hardware_requirement.tmpdir + ) ) - hardware_requirement.outdir = resource_requirement.get( - "outdirMin", - resource_requirement.get("outdirMax", hardware_requirement.outdir), + hardware_requirement.outdir = ( + resource_requirement.outdirMin + if resource_requirement.outdirMin is not None + else ( + resource_requirement.outdirMax + if resource_requirement.outdirMax is not None + else hardware_requirement.outdir + ) ) return hardware_requirement +def _get_hint_object( + cwl_version: str, + hint_map: MutableMapping[str, Any], + loadingOptions: cwl_utils.parser.LoadingOptions, +): + mod = importlib.import_module( + "cwl_utils.parser.cwl_" + cwl_version.replace(".", "_") + ) + class_ = getattr(mod, hint_map["class"], None) + if class_: + hint = getattr(mod, hint_map["class"])( + loadingOptions=loadingOptions, + **{k: v for k, v in hint_map.items() if k != "class"}, + ) + if isinstance(hint, cwl_utils.parser.cwl_v1_0.EnvVarRequirement): + if isinstance(hint.envDef, MutableSequence): + hint.envDef = [ + cwl_utils.parser.cwl_v1_0.EnvironmentDef( + envName=ed["envName"], envValue=ed["envValue"] + ) + for ed in hint.envDef + ] + elif isinstance(hint, MutableMapping): + hint.envDef = [ + cwl_utils.parser.cwl_v1_0.EnvironmentDef(envName=k, envValue=v) + for k, v in hint.items() + ] + elif isinstance(hint, cwl_utils.parser.cwl_v1_1.EnvVarRequirement): + if isinstance(hint.envDef, MutableSequence): + hint.envDef = [ + cwl_utils.parser.cwl_v1_1.EnvironmentDef( + envName=ed["envName"], envValue=ed["envValue"] + ) + for ed in hint.envDef + ] + elif isinstance(hint, MutableMapping): + hint.envDef = [ + cwl_utils.parser.cwl_v1_1.EnvironmentDef(envName=k, envValue=v) + for k, v in hint.items() + ] + elif isinstance(hint, cwl_utils.parser.cwl_v1_2.EnvVarRequirement): + if isinstance(hint.envDef, MutableSequence): + hint.envDef = [ + cwl_utils.parser.cwl_v1_1.EnvironmentDef( + envName=ed["envName"], envValue=ed["envValue"] + ) + for ed in hint.envDef + ] + elif isinstance(hint, MutableMapping): + hint.envDef = [ + cwl_utils.parser.cwl_v1_2.EnvironmentDef(envName=k, envValue=v) + for k, v in hint.items() + ] + return hint + else: + return None + + +def _get_load_contents( + port_description: ( + cwl_utils.parser.InputParameter + | cwl_utils.parser.OutputParameter + | cwl_utils.parser.WorkflowStepInput + ), +): + if getattr(port_description, "loadContents", None) is not None: + return port_description.loadContents + elif ( + getattr(port_description, "inputBinding", None) + and port_description.inputBinding.loadContents is not None + ): + return port_description.inputBinding.loadContents + elif ( + getattr(port_description, "outputBinding", None) + and port_description.outputBinding.loadContents is not None + ): + return port_description.outputBinding.loadContents + else: + return None + + def _get_load_listing( - port_description: MutableMapping[str, Any], context: MutableMapping[str, Any] + port_description: ( + cwl_utils.parser.InputParameter + | cwl_utils.parser.OutputParameter + | cwl_utils.parser.WorkflowStepInput + ), + context: MutableMapping[str, Any], ) -> LoadListing: requirements = {**context["hints"], **context["requirements"]} - if "loadListing" in port_description: - return LoadListing[port_description["loadListing"]] + if getattr(port_description, "loadListing", None): + return LoadListing[port_description.loadListing] elif ( - "outputBinding" in port_description - and "loadListing" in port_description["outputBinding"] + getattr(port_description, "outputBinding", None) + and getattr(port_description.outputBinding, "loadListing", None) is not None ): - return LoadListing[port_description["outputBinding"]["loadListing"]] + return LoadListing[port_description.outputBinding.loadListing] elif ( "LoadListingRequirement" in requirements - and "loadListing" in requirements["LoadListingRequirement"] + and requirements["LoadListingRequirement"].loadListing ): - return LoadListing[requirements["LoadListingRequirement"]["loadListing"]] + return LoadListing[requirements["LoadListingRequirement"].loadListing] else: - return LoadListing.no_listing + return ( + LoadListing.deep_listing + if context["version"] == "v1.0" + else LoadListing.no_listing + ) def _get_path(element_id: str) -> str: @@ -967,7 +1058,7 @@ def _get_schema_def_types( requirements: MutableMapping[str, Any] ) -> MutableMapping[str, Any]: return ( - {sd["name"]: sd for sd in requirements["SchemaDefRequirement"]["types"]} + {sd.name: sd for sd in requirements["SchemaDefRequirement"].types} if "SchemaDefRequirement" in requirements else {} ) @@ -978,27 +1069,92 @@ def _get_secondary_files( ) -> MutableSequence[SecondaryFile]: if not cwl_element: return [] - elif isinstance(cwl_element, MutableSequence): - return [ - SecondaryFile( - pattern=sf["pattern"], - required=sf.get("required") - if sf.get("required") is not None - else default_required, - ) - for sf in cwl_element - ] - elif isinstance(cwl_element, MutableMapping): - return [ - SecondaryFile( - pattern=cwl_element["pattern"], - required=cwl_element.get("required") - if cwl_element.get("required") is not None - else default_required, + secondary_files = [] + for sf in cwl_element: + if isinstance(sf, str): + secondary_files.append(SecondaryFile(pattern=sf, required=default_required)) + elif isinstance(sf, get_args(cwl_utils.parser.SecondaryFileSchema)): + secondary_files.append( + SecondaryFile( + pattern=sf.pattern, + required=sf.required + if sf.required is not None + else default_required, + ) ) - ] + return secondary_files + + +def _get_workflow_step_input_type( + element_input: cwl_utils.parser.WorkflowStepInput, + workflow: cwl_utils.parser.Workflow, +): + if not element_input.source: + return "Any" + return cwl_utils.parser.utils.type_for_source(workflow, element_input.source) + + +def _inject_value(value: Any): + if isinstance(value, MutableSequence): + return [_inject_value(v) for v in value] + elif isinstance(value, MutableMapping): + if utils.get_token_class(value) is not None: + return value + else: + return {k: _inject_value(v) for k, v in value.items()} + elif isinstance(value, get_args(cwl_utils.parser.File)): + dict_value = {"class": value.class_} + if value.basename is not None: + dict_value["basename"] = value.basename + if value.checksum is not None: + dict_value["checksum"] = value.checksum + if value.contents is not None: + dict_value["contents"] = value.contents + if value.dirname is not None: + dict_value["dirname"] = value.dirname + if value.format is not None: + dict_value["format"] = value.format + if value.location is not None: + dict_value["location"] = value.location + if value.nameext is not None: + dict_value["nameext"] = value.nameext + if value.nameroot is not None: + dict_value["nameroot"] = value.nameroot + if value.path is not None: + dict_value["path"] = value.path + if value.secondaryFiles is not None: + dict_value["secondaryFiles"] = [ + _inject_value(sf) for sf in value.secondaryFiles + ] + if value.size is not None: + dict_value["size"] = value.size + return dict_value + elif isinstance(value, get_args(cwl_utils.parser.Directory)): + dict_value = {"class": value.class_} + if value.basename is not None: + dict_value["basename"] = value.basename + if value.listing is not None: + dict_value["listing"] = [_inject_value(sf) for sf in value.listing] + if value.location is not None: + dict_value["location"] = value.location + if value.path is not None: + dict_value["path"] = value.path + return dict_value + elif isinstance(value, get_args(cwl_utils.parser.Dirent)): + dict_value = {"entry": value.entry} + if value.entryname is not None: + dict_value["entryname"] = value.entryname + if value.writable is not None: + dict_value["writable"] = value.writable + return dict_value else: - return [] + return value + + +def _is_instance(v: Any, v1_0_type: type, v1_1_type: type, v1_2_type: type): + return ( + isinstance(v, v1_0_type) or isinstance(v, v1_1_type) or isinstance(v, v1_2_type) + ) def _percolate_port(port_name: str, *args) -> Port | None: @@ -1012,24 +1168,25 @@ def _percolate_port(port_name: str, *args) -> Port | None: return None -def _process_docker_image(docker_requirement: MutableMapping[str, Any]) -> str: +def _process_docker_image( + docker_requirement: cwl_utils.parser.DockerRequirement, +) -> str: # Retrieve image - if "dockerPull" in docker_requirement: - image_name = docker_requirement["dockerPull"] - elif "dockerImageId" in docker_requirement: - image_name = docker_requirement["dockerImageId"] + if docker_requirement.dockerPull is not None: + return docker_requirement.dockerPull + elif docker_requirement.dockerImageId is not None: + return docker_requirement.dockerImageId else: raise WorkflowDefinitionException( "DockerRequirements without `dockerPull` or `dockerImageId` are not supported yet" ) - return image_name def _process_docker_requirement( name: str, target: Target, context: MutableMapping[str, Any], - docker_requirement: MutableMapping[str, Any], + docker_requirement: cwl_utils.parser.DockerRequirement, network_access: bool, ) -> Target: image_name = _process_docker_image(docker_requirement=docker_requirement) @@ -1041,8 +1198,8 @@ def _process_docker_requirement( "volume": [f"{target.workdir}:{target.workdir}"], } # Manage dockerOutputDirectory directive - if "dockerOutputDirectory" in docker_requirement: - docker_config["workdir"] = docker_requirement["dockerOutputDirectory"] + if docker_requirement.dockerOutputDirectory is not None: + docker_config["workdir"] = docker_requirement.dockerOutputDirectory context["output_directory"] = docker_config["workdir"] local_dir = os.path.join(tempfile.gettempdir(), "streamflow", random_name()) os.makedirs(local_dir, exist_ok=True) @@ -1106,9 +1263,9 @@ def _process_javascript_requirement( full_js = False if "InlineJavascriptRequirement" in requirements: full_js = True - if "expressionLib" in requirements["InlineJavascriptRequirement"]: + if requirements["InlineJavascriptRequirement"].expressionLib is not None: expression_lib = [] - for lib in requirements["InlineJavascriptRequirement"]["expressionLib"]: + for lib in requirements["InlineJavascriptRequirement"].expressionLib: expression_lib.append(lib) return expression_lib, full_js @@ -1187,21 +1344,27 @@ def __init__( context: StreamFlowContext, name: str, output_directory: str, - cwl_definition: cwltool.process.Process, + cwl_definition: ( + cwl_utils.parser.CommandLineTool + | cwl_utils.parser.ExpressionTool + | cwl_utils.parser.Workflow + ), cwl_inputs: MutableMapping[str, Any], workflow_config: WorkflowConfig, - loading_context: cwltool.context.LoadingContext, ): self.context: StreamFlowContext = context self.name: str = name self.output_directory: str = output_directory - self.cwl_definition: cwltool.process.Process = cwl_definition + self.cwl_definition: ( + cwl_utils.parser.CommandLineTool + | cwl_utils.parser.ExpressionTool + | cwl_utils.parser.Workflow + ) = cwl_definition self.cwl_inputs: MutableMapping[str, Any] = cwl_inputs self.default_map: MutableMapping[str, Any] = {} self.deployment_map: MutableMapping[str, DeployStep] = {} self.gather_map: MutableMapping[str, str] = {} self.input_ports: MutableMapping[str, Port] = {} - self.loading_context: cwltool.context.LoadingContext = loading_context self.output_ports: MutableMapping[str, str | Port] = {} self.scatter: MutableMapping[str, Any] = {} self.workflow_config: WorkflowConfig = workflow_config @@ -1220,8 +1383,12 @@ def _get_deploy_step( def _get_input_port( self, workflow: Workflow, - cwl_element: cwltool.workflow.Process, - element_input: MutableMapping[str, Any], + cwl_element: ( + cwl_utils.parser.cwl_v1_0.Process + | cwl_utils.parser.cwl_v1_1.Process + | cwl_utils.parser.cwl_v1_2.Process + ), + element_input: cwl_utils.parser.InputParameter, global_name: str, port_name: str, ) -> Port: @@ -1230,11 +1397,11 @@ def _get_input_port( self.input_ports[global_name] = workflow.create_port() input_port = self.input_ports[global_name] # If there is a default value, construct a default port block - if "default" in element_input: + if element_input.default is not None: # Insert default port transformer_suffix = ( "-wf-default-transformer" - if isinstance(cwl_element, cwltool.workflow.Workflow) + if isinstance(cwl_element, get_args(cwl_utils.parser.Workflow)) else "-cmd-default-transformer" ) input_port = self._handle_default_port( @@ -1243,7 +1410,7 @@ def _get_input_port( transformer_suffix=transformer_suffix, port=input_port, workflow=workflow, - value=element_input["default"], + value=element_input.default, ) # Return port return input_port @@ -1256,7 +1423,9 @@ def _get_source_port(self, workflow: Workflow, source_name: str) -> Port: else: return self.input_ports[source_name] - def _get_binding_config(self, name: str, target_type: str) -> BindingConfig: + def _get_binding_config( + self, name: str, target_type: str, default_workdir: str | None = None + ) -> BindingConfig: path = PurePosixPath(name) config = self.workflow_config.propagate(path, target_type) if config is not None: @@ -1305,7 +1474,7 @@ def _get_binding_config(self, name: str, target_type: str) -> BindingConfig: ) return BindingConfig(targets=targets, filters=config.get("filters")) else: - return BindingConfig(targets=[LocalTarget()]) + return BindingConfig(targets=[LocalTarget(workdir=default_workdir)]) def _handle_default_port( self, @@ -1317,7 +1486,7 @@ def _handle_default_port( value: Any, ) -> Port: # Check output directory - path = _get_path(self.cwl_definition.tool["id"]) + path = _get_path(self.cwl_definition.id) # Build default port default_port = workflow.create_port() self._inject_input( @@ -1351,7 +1520,7 @@ def _inject_input( value: Any, ) -> None: # Retrieve a local DeployStep - binding_config = self._get_binding_config(global_name, "port") + binding_config = self._get_binding_config(global_name, "port", output_directory) target = binding_config.targets[0] deploy_step = self._get_deploy_step(target.deployment, workflow) # Remap path if target's workdir is defined @@ -1369,9 +1538,9 @@ def _inject_input( name=posixpath.join(f"{global_name}-injector", "__schedule__"), job_prefix=f"{global_name}-injector", connector_ports={target.deployment.name: deploy_step.get_output_port()}, - input_directory=target.workdir or output_directory, - output_directory=target.workdir or output_directory, - tmp_directory=target.workdir or output_directory, + input_directory=target.workdir, + output_directory=target.workdir, + tmp_directory=target.workdir, binding_config=binding_config, ) # Create a CWLInputInjector step to process the input @@ -1382,7 +1551,7 @@ def _inject_input( ) # Create an input port and inject values input_port = workflow.create_port() - input_port.put(Token(value=value)) + input_port.put(Token(value=_inject_value(value))) input_port.put(TerminationToken()) # Connect input and output ports to the injector step injector_step.add_input_port(port_name, input_port) @@ -1397,7 +1566,7 @@ def _inject_inputs(self, workflow: Workflow): # Compute suffix default_suffix = ( "-wf-default-transformer" - if isinstance(self.cwl_definition, cwltool.workflow.Workflow) + if isinstance(self.cwl_definition, get_args(cwl_utils.parser.Workflow)) else "-cmd-default-transformer" ) # Process externally provided inputs @@ -1428,26 +1597,41 @@ def _inject_inputs(self, workflow: Workflow): def _recursive_translate( self, workflow: Workflow, - cwl_element: cwltool.process.Process, + cwl_element: ( + cwl_utils.parser.CommandLineTool + | cwl_utils.parser.ExpressionTool + | cwl_utils.parser.Workflow + | cwl_utils.parser.WorkflowStep + ), context: MutableMapping[str, Any], name_prefix: str, cwl_name_prefix: str, ): # Update context current_context = copy.deepcopy(context) - for hint in cwl_element.hints: - current_context["hints"][hint["class"]] = hint - for requirement in cwl_element.requirements: - current_context["requirements"][requirement["class"]] = requirement + for hint in cwl_element.hints or []: + if not isinstance(hint, MutableMapping): + current_context["hints"][hint.class_] = hint + for requirement in cwl_element.requirements or []: + current_context["requirements"][requirement.class_] = requirement # In the root process, override requirements when provided in the input file if name_prefix == posixpath.sep: req_string = "https://w3id.org/cwl/cwl#requirements" if req_string in self.cwl_inputs: - current_context["requirements"] = { - req["class"]: req for req in self.cwl_inputs[req_string] - } + if context["version"] == "v1.0": + raise WorkflowDefinitionException( + "`cwl:requirements` in the input object is not part of CWL v1.0." + ) + else: + for requirement in self.cwl_inputs[req_string]: + if requirement := _get_hint_object( + context["version"], requirement, cwl_element.loadingOptions + ): + current_context["requirements"][ + requirement.class_ + ] = requirement # Dispatch element - if isinstance(cwl_element, cwltool.workflow.Workflow): + if isinstance(cwl_element, get_args(cwl_utils.parser.Workflow)): self._translate_workflow( workflow=workflow, cwl_element=cwl_element, @@ -1455,7 +1639,7 @@ def _recursive_translate( name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, ) - elif isinstance(cwl_element, cwltool.workflow.WorkflowStep): + elif isinstance(cwl_element, get_args(cwl_utils.parser.WorkflowStep)): self._translate_workflow_step( workflow=workflow, cwl_element=cwl_element, @@ -1463,7 +1647,7 @@ def _recursive_translate( name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, ) - elif isinstance(cwl_element, cwltool.command_line_tool.CommandLineTool): + elif isinstance(cwl_element, get_args(cwl_utils.parser.CommandLineTool)): self._translate_command_line_tool( workflow=workflow, cwl_element=cwl_element, @@ -1471,7 +1655,7 @@ def _recursive_translate( name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, ) - elif isinstance(cwl_element, cwltool.command_line_tool.ExpressionTool): + elif isinstance(cwl_element, get_args(cwl_utils.parser.ExpressionTool)): self._translate_command_line_tool( workflow=workflow, cwl_element=cwl_element, @@ -1490,13 +1674,13 @@ def _translate_command_line_tool( self, workflow: Workflow, cwl_element: ( - cwltool.command_line_tool.CommandLineTool - | cwltool.command_line_tool.ExpressionTool + cwl_utils.parser.CommandLineTool | cwl_utils.parser.ExpressionTool ), context: MutableMapping[str, Any], name_prefix: str, cwl_name_prefix: str, ): + context["elements"][cwl_element.id] = cwl_element if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Translating {cwl_element.__class__.__name__} {name_prefix}") # Extract custom types if present @@ -1509,7 +1693,7 @@ def _translate_command_line_tool( # Process DockerRequirement if "DockerRequirement" in requirements: network_access = ( - requirements["NetworkAccess"]["networkAccess"] + requirements["NetworkAccess"].networkAccess if "NetworkAccess" in requirements else False ) @@ -1552,7 +1736,7 @@ def _translate_command_line_tool( }, binding_config=binding_config, hardware_requirement=_get_hardware_requirement( - cwl_version=self.loading_context.metadata["cwlVersion"], + cwl_version=context["version"], requirements=requirements, expression_lib=expression_lib, full_js=full_js, @@ -1566,10 +1750,8 @@ def _translate_command_line_tool( # Process inputs input_ports = {} token_transformers = [] - for element_input in cwl_element.tool["inputs"]: - global_name = utils.get_name( - name_prefix, cwl_name_prefix, element_input["id"] - ) + for element_input in cwl_element.inputs: + global_name = utils.get_name(name_prefix, cwl_name_prefix, element_input.id) port_name = posixpath.relpath(global_name, name_prefix) # Retrieve or create input port input_port = self._get_input_port( @@ -1587,7 +1769,7 @@ def _translate_command_line_tool( cwl_element=element_input, cwl_name_prefix=posixpath.join(cwl_name_prefix, port_name), schema_def_types=schema_def_types, - format_graph=self.loading_context.loader.graph, + format_graph=cwl_element.loadingOptions.graph, context=context, only_propagate_secondary_files=(name_prefix != "/"), ) @@ -1611,9 +1793,9 @@ def _translate_command_line_tool( for token_transformer in token_transformers: token_transformer.add_input_port(port_name, input_port) # Process outputs - for element_output in cwl_element.tool["outputs"]: + for element_output in cwl_element.outputs: global_name = utils.get_name( - name_prefix, cwl_name_prefix, element_output["id"] + name_prefix, cwl_name_prefix, element_output.id ) port_name = posixpath.relpath(global_name, name_prefix) # Retrieve or create output port @@ -1641,14 +1823,14 @@ def _translate_command_line_tool( port_name=port_name, workflow=workflow, port_target=port_target, - port_type=element_output["type"], + port_type=element_output.type, cwl_element=element_output, cwl_name_prefix=posixpath.join(cwl_name_prefix, port_name), schema_def_types=schema_def_types, context=context, ), ) - if isinstance(cwl_element, cwltool.command_line_tool.CommandLineTool): + if isinstance(cwl_element, get_args(cwl_utils.parser.CommandLineTool)): # Process command step.command = _create_command( cwl_element=cwl_element, @@ -1659,31 +1841,26 @@ def _translate_command_line_tool( ) # Process ToolTimeLimit if "ToolTimeLimit" in requirements: - step.command.time_limit = requirements["ToolTimeLimit"]["timelimit"] - elif isinstance(cwl_element, cwltool.command_line_tool.ExpressionTool): - if "expression" in cwl_element.tool: - step.command = CWLExpressionCommand( - step, cwl_element.tool["expression"] - ) + step.command.time_limit = requirements["ToolTimeLimit"].timelimit + elif isinstance(cwl_element, get_args(cwl_utils.parser.ExpressionTool)): + step.command = CWLExpressionCommand(step, cwl_element.expression) # Add JS requirements step.command.expression_lib = expression_lib step.command.full_js = full_js - # Process streams - if "stdin" in cwl_element.tool: - step.command.stdin = cwl_element.tool["stdin"] - if "stdout" in cwl_element.tool: - step.command.stdout = cwl_element.tool["stdout"] - if "stderr" in cwl_element.tool: - step.command.stderr = cwl_element.tool["stderr"] def _translate_workflow( self, workflow: Workflow, - cwl_element: cwltool.workflow.Workflow, + cwl_element: cwl_utils.parser.Workflow, context: MutableMapping[str, Any], name_prefix: str, cwl_name_prefix: str, ): + try: + cwl_utils.parser.utils.static_checker(cwl_element) + except ValidationException as ve: + raise WorkflowDefinitionException from ve + context["elements"][cwl_element.id] = cwl_element step_name = name_prefix if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Translating Workflow {step_name}") @@ -1696,10 +1873,8 @@ def _translate_workflow( input_ports = {} token_transformers = {} input_dependencies = {} - for element_input in cwl_element.tool["inputs"]: - global_name = utils.get_name( - step_name, cwl_name_prefix, element_input["id"] - ) + for element_input in cwl_element.inputs: + global_name = utils.get_name(step_name, cwl_name_prefix, element_input.id) port_name = posixpath.relpath(global_name, step_name) # Retrieve or create input port input_ports[global_name] = self._get_input_port( @@ -1717,30 +1892,31 @@ def _translate_workflow( cwl_element=element_input, cwl_name_prefix=posixpath.join(cwl_name_prefix, port_name), schema_def_types=schema_def_types, - format_graph=self.loading_context.loader.graph, + format_graph=cwl_element.loadingOptions.graph, context=context, only_propagate_secondary_files=(name_prefix != "/"), ) # Process dependencies local_deps = resolve_dependencies( - expression=element_input.get("format"), + expression=element_input.format, full_js=full_js, expression_lib=expression_lib, ) - if "secondaryFiles" in element_input: - for secondary_file in element_input["secondaryFiles"]: - local_deps.update( - resolve_dependencies( - expression=secondary_file.get("pattern"), - full_js=full_js, - expression_lib=expression_lib, - ), - resolve_dependencies( - expression=secondary_file.get("required"), - full_js=full_js, - expression_lib=expression_lib, - ), - ) + for secondary_file in _get_secondary_files( + element_input.secondaryFiles, True + ): + local_deps.update( + resolve_dependencies( + expression=secondary_file.pattern, + full_js=full_js, + expression_lib=expression_lib, + ), + resolve_dependencies( + expression=secondary_file.required, + full_js=full_js, + expression_lib=expression_lib, + ), + ) input_dependencies[global_name] = set.union( {global_name}, {posixpath.join(step_name, d) for d in local_deps} ) @@ -1755,21 +1931,21 @@ def _translate_workflow( for input_name in token_transformers: self.input_ports[input_name] = input_ports[input_name] # Process outputs - for element_output in cwl_element.tool["outputs"]: + for element_output in cwl_element.outputs: global_name = utils.get_name( - name_prefix, cwl_name_prefix, element_output["id"] + name_prefix, cwl_name_prefix, element_output.id ) # If outputSource element is a list, the output element can depend on multiple ports - if isinstance(element_output["outputSource"], MutableSequence): + if isinstance(element_output.outputSource, MutableSequence): # If the list contains only one element and no `linkMerge` or `pickValue` are specified if ( - len(element_output["outputSource"]) == 1 - and "linkMerge" not in element_output - and "pickValue" not in element_output + len(element_output.outputSource) == 1 + and not getattr(element_output, "linkMerge", None) + and not getattr(element_output, "pickValue", None) ): # Treat it as a singleton source_name = utils.get_name( - name_prefix, cwl_name_prefix, element_output["outputSource"][0] + name_prefix, cwl_name_prefix, element_output.outputSource[0] ) # If the output source is an input port, link the output to the input if source_name in self.input_ports: @@ -1782,7 +1958,7 @@ def _translate_workflow( # Otherwise, create a ListMergeCombinator else: if ( - len(element_output["outputSource"]) > 1 + len(element_output.outputSource) > 1 and "MultipleInputFeatureRequirement" not in requirements ): raise WorkflowDefinitionException( @@ -1791,7 +1967,7 @@ def _translate_workflow( ) source_names = [ utils.get_name(name_prefix, cwl_name_prefix, src) - for src in element_output["outputSource"] + for src in element_output.outputSource ] ports = { n: self._get_source_port(workflow, n) for n in source_names @@ -1801,24 +1977,24 @@ def _translate_workflow( workflow=workflow, ports=ports, output_port=self._get_source_port(workflow, global_name), - link_merge=element_output.get("linkMerge"), - pick_value=element_output.get("pickValue"), + link_merge=getattr(element_output, "linkMerge", None), + pick_value=getattr(element_output, "pickValue", None), ) # Otherwise, the output element depends on a single output port else: source_name = utils.get_name( - name_prefix, cwl_name_prefix, element_output["outputSource"] + name_prefix, cwl_name_prefix, element_output.outputSource ) # If `pickValue` is specified, create a ListMergeCombinator - if "pickValue" in element_output: + if getattr(element_output, "pickValue", None): source_port = self._get_source_port(workflow, source_name) _create_list_merger( name=global_name, workflow=workflow, ports={source_name: source_port}, output_port=self._get_source_port(workflow, global_name), - link_merge=element_output.get("linkMerge"), - pick_value=element_output.get("pickValue"), + link_merge=getattr(element_output, "linkMerge", None), + pick_value=getattr(element_output, "pickValue", None), ) else: # If the output source is an input port, link the output to the input @@ -1842,12 +2018,13 @@ def _translate_workflow( def _translate_workflow_step( self, workflow: Workflow, - cwl_element: cwltool.workflow.WorkflowStep, + cwl_element: cwl_utils.parser.WorkflowStep, context: MutableMapping[str, Any], name_prefix: str, cwl_name_prefix: str, ): # Process content + context["elements"][cwl_element.id] = cwl_element step_name = utils.get_name(name_prefix, cwl_name_prefix, cwl_element.id) if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Translating WorkflowStep {step_name}") @@ -1855,31 +2032,30 @@ def _translate_workflow_step( name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True ) # Extract requirements - for hint in cwl_element.embedded_tool.hints: - context["hints"][hint["class"]] = hint - for requirement in cwl_element.embedded_tool.requirements: - context["requirements"][requirement["class"]] = requirement + for hint in cwl_element.hints or []: + if not isinstance(hint, MutableMapping): + context["hints"][hint.class_] = hint + for requirement in cwl_element.requirements or []: + context["requirements"][requirement.class_] = requirement requirements = {**context["hints"], **context["requirements"]} # Extract JavaScript requirements expression_lib, full_js = _process_javascript_requirement(requirements) # Find scatter elements - if isinstance(cwl_element.tool.get("scatter"), str): + if isinstance(cwl_element.scatter, str): scatter_inputs = [ - utils.get_name( - step_name, cwl_step_name, cwl_element.tool.get("scatter") - ) + utils.get_name(step_name, cwl_step_name, cwl_element.scatter) ] else: scatter_inputs = [ utils.get_name(step_name, cwl_step_name, n) - for n in cwl_element.tool.get("scatter", []) + for n in cwl_element.scatter or [] ] # Process inputs input_ports = {} default_ports = {} value_from_transformers = {} input_dependencies = {} - for element_input in cwl_element.tool["inputs"]: + for element_input in cwl_element.in_: self._translate_workflow_step_input( workflow=workflow, context=context, @@ -1914,8 +2090,8 @@ def _translate_workflow_step( input_ports = {**input_ports, **default_ports} # Process loop inputs element_requirements = { - **{h["class"]: h for h in cwl_element.embedded_tool.hints}, - **{r["class"]: r for r in cwl_element.embedded_tool.requirements}, + **{h["class"]: h for h in (cwl_element.hints or [])}, + **{r.class_: r for r in (cwl_element.requirements or [])}, } if "http://commonwl.org/cwltool#Loop" in element_requirements: loop_requirement = element_requirements["http://commonwl.org/cwltool#Loop"] @@ -1964,7 +2140,7 @@ def _translate_workflow_step( port_name, input_ports[global_name] ) # Retrieve scatter method (default to dotproduct) - scatter_method = cwl_element.tool.get("scatterMethod", "dotproduct") + scatter_method = cwl_element.scatterMethod or "dotproduct" # If there are scatter inputs if scatter_inputs: # If any scatter input is null, propagate an empty array on the output ports @@ -2043,12 +2219,12 @@ def _translate_workflow_step( self.input_ports = {**self.input_ports, **input_ports} # Process condition conditional_step = None - if "when" in cwl_element.tool: + if getattr(cwl_element, "when", None): # Create conditional step conditional_step = workflow.create_step( cls=CWLConditionalStep, name=step_name + "-when", - expression=cwl_element.tool["when"], + expression=cwl_element.when, expression_lib=expression_lib, full_js=full_js, ) @@ -2065,8 +2241,8 @@ def _translate_workflow_step( # Process outputs external_output_ports = {} internal_output_ports = {} - for element_output in cwl_element.tool["outputs"]: - global_name = utils.get_name(step_name, cwl_step_name, element_output["id"]) + for element_output in cwl_element.out: + global_name = utils.get_name(step_name, cwl_step_name, element_output) port_name = posixpath.relpath(global_name, step_name) # Retrieve or create output port if global_name not in self.output_ports: @@ -2122,7 +2298,7 @@ def _translate_workflow_step( port_name, external_output_ports[global_name] ) # Add skip ports if there is a condition - if "when" in cwl_element.tool: + if getattr(cwl_element, "when", None): cast(CWLConditionalStep, conditional_step).add_skip_port( port_name, internal_output_ports[global_name] ) @@ -2252,11 +2428,9 @@ def _translate_workflow_step( port_name, combinator_step.get_input_port(port_name) ) # Add skip ports if there is a condition - if "when" in cwl_element.tool: - for element_output in cwl_element.tool["outputs"]: - global_name = utils.get_name( - step_name, cwl_step_name, element_output["id"] - ) + if getattr(cwl_element, "when", None): + for element_output in cwl_element.out: + global_name = utils.get_name(step_name, cwl_step_name, element_output) port_name = posixpath.relpath(global_name, step_name) skip_port = ( external_output_ports[global_name] @@ -2269,12 +2443,43 @@ def _translate_workflow_step( # Update output ports with the internal ones self.output_ports = {**self.output_ports, **internal_output_ports} # Process inner element - inner_cwl_name_prefix = utils.get_inner_cwl_prefix( - cwl_name_prefix, name_prefix, cwl_element + cwl_step_name = utils.get_name( + name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True ) + run_command = cwl_element.run + if cwl_utils.parser.is_process(run_command): + run_command.cwlVersion = context["version"] + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + if ":" in run_command.id.split("#")[-1]: + inner_cwl_name_prefix = ( + step_name + if context["version"] == "v1.0" + else posixpath.join(cwl_step_name, "run") + ) + else: + inner_cwl_name_prefix = utils.get_name( + name_prefix, + cwl_name_prefix, + run_command.id, + preserve_cwl_prefix=True, + ) + else: + run_command = cwl_element.loadingOptions.fetcher.urljoin( + cwl_element.loadingOptions.fileuri, run_command + ) + run_command = cwl_utils.parser.load_document_by_uri( + run_command, loadingOptions=cwl_element.loadingOptions + ) + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + inner_cwl_name_prefix = ( + utils.get_name(posixpath.sep, posixpath.sep, run_command.id) + if "#" in run_command.id + else posixpath.sep + ) + context = {**context, **{"version": run_command.cwlVersion}} self._recursive_translate( workflow=workflow, - cwl_element=cwl_element.embedded_tool, + cwl_element=run_command, context=context, name_prefix=step_name, cwl_name_prefix=inner_cwl_name_prefix, @@ -2287,7 +2492,7 @@ def _translate_workflow_step_input( workflow: Workflow, context: MutableMapping[str, Any], element_id: str, - element_input: MutableMapping[str, Any], + element_input: cwl_utils.parser.WorkflowStepInput, name_prefix: str, cwl_name_prefix: str, scatter_inputs: MutableSequence[str], @@ -2308,72 +2513,71 @@ def _translate_workflow_step_input( cwl_step_name = utils.get_name( name_prefix, cwl_name_prefix, element_id, preserve_cwl_prefix=True ) - global_name = utils.get_name(step_name, cwl_step_name, element_input["id"]) + global_name = utils.get_name(step_name, cwl_step_name, element_input.id) port_name = posixpath.relpath(global_name, step_name) - # Adjust type to handle scatter - if global_name in scatter_inputs: - element_input = { - **element_input, - **{"type": element_input["type"]["items"]}, - } # If element contains `valueFrom` directive - if "valueFrom" in element_input: - # Check if StepInputExpressionRequirement is specified - if "StepInputExpressionRequirement" not in requirements: - raise WorkflowDefinitionException( - "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements" - ) + if element_input.valueFrom: + prefix, suffix = element_id.split("#") + if "/" in suffix: + workflow_id = "#".join([prefix, "/".join(suffix.split("/")[:-1])]) + else: + workflow_id = prefix # Create a ValueFromTransformer + port_type = _get_workflow_step_input_type( + element_input, context["elements"][workflow_id] + ) + if global_name in scatter_inputs: + port_type = port_type.items value_from_transformers[global_name] = workflow.create_step( cls=value_from_transformer_cls, name=global_name + inner_steps_prefix + "-value-from-transformer", processor=_create_token_processor( port_name=port_name, workflow=workflow, - port_type=element_input.get("type", "Any"), + port_type=port_type, cwl_element=element_input, cwl_name_prefix=posixpath.join(cwl_step_name, port_name), schema_def_types=schema_def_types, - format_graph=self.loading_context.loader.graph, + format_graph=element_input.loadingOptions.graph, context=context, check_type=False, ), port_name=port_name, expression_lib=expression_lib, full_js=full_js, - value_from=element_input["valueFrom"], + value_from=element_input.valueFrom, ) value_from_transformers[global_name].add_output_port( port_name, workflow.create_port() ) # Retrieve dependencies local_deps = resolve_dependencies( - expression=element_input.get("valueFrom"), + expression=element_input.valueFrom, full_js=full_js, expression_lib=expression_lib, ) input_dependencies[global_name] = set.union( {global_name} - if "source" in element_input or "default" in element_input + if element_input.source is not None or element_input.default is not None else set(), {posixpath.join(step_name, d) for d in local_deps}, ) or {global_name} # If `source` entry is present, process output dependencies - if "source" in element_input: + if element_input.source is not None: # If source element is a list, the input element can depend on multiple ports - if isinstance(element_input["source"], MutableSequence): + if isinstance(element_input.source, MutableSequence): # If the list contains only one element and no `linkMerge` is specified, treat it as a singleton if ( - len(element_input["source"]) == 1 - and "linkMerge" not in element_input - and "pickValue" not in element_input + len(element_input.source) == 1 + and not getattr(element_input, "linkMerge", None) + and not getattr(element_input, "pickValue", None) ): source_name = utils.get_name( - name_prefix, cwl_name_prefix, element_input["source"][0] + name_prefix, cwl_name_prefix, element_input.source[0] ) source_port = self._get_source_port(workflow, source_name) # If there is a default value, construct a default port block - if "default" in element_input: + if element_input.default is not None: # Insert default port source_port = self._handle_default_port( global_name=global_name, @@ -2382,14 +2586,14 @@ def _translate_workflow_step_input( + "-step-default-transformer", port=source_port, workflow=workflow, - value=element_input["default"], + value=element_input.default, ) # Add source port to the list of input ports for the current step input_ports[global_name] = source_port # Otherwise, create a ListMergeCombinator else: if ( - len(element_input["source"]) > 1 + len(element_input.source) > 1 and "MultipleInputFeatureRequirement" not in requirements ): raise WorkflowDefinitionException( @@ -2398,7 +2602,7 @@ def _translate_workflow_step_input( ) source_names = [ utils.get_name(name_prefix, cwl_name_prefix, src) - for src in element_input["source"] + for src in element_input.source ] ports = { n: self._get_source_port(workflow, n) for n in source_names @@ -2409,19 +2613,19 @@ def _translate_workflow_step_input( + "-list-merge-combinator", workflow=workflow, ports=ports, - link_merge=element_input.get("linkMerge"), - pick_value=element_input.get("pickValue"), + link_merge=getattr(element_input, "linkMerge", None), + pick_value=getattr(element_input, "pickValue", None), ) # Add ListMergeCombinator output port to the list of input ports for the current step input_ports[global_name] = list_merger.get_output_port() # Otherwise, the input element depends on a single output port else: source_name = utils.get_name( - name_prefix, cwl_name_prefix, element_input["source"] + name_prefix, cwl_name_prefix, cast(str, element_input.source) ) source_port = self._get_source_port(workflow, source_name) # If there is a default value, construct a default port block - if "default" in element_input: + if element_input.default is not None: # Insert default port source_port = self._handle_default_port( global_name=global_name, @@ -2430,25 +2634,28 @@ def _translate_workflow_step_input( + "-step-default-transformer", port=source_port, workflow=workflow, - value=element_input["default"], + value=element_input.default, ) # Add source port to the list of input ports for the current step input_ports[global_name] = source_port # Otherwise, search for default values - elif "default" in element_input: + elif element_input.default is not None: default_ports[global_name] = self._handle_default_port( global_name=global_name, port_name=port_name, transformer_suffix=inner_steps_prefix + "-step-default-transformer", port=None, workflow=workflow, - value=element_input["default"], + value=element_input.default, ) # Otherwise, inject a synthetic port into the workflow else: input_ports[global_name] = workflow.create_port() def translate(self) -> Workflow: + # Parse streams + cwl_utils.parser.utils.convert_stdstreams_to_files(self.cwl_definition) + # Create workflow workflow = Workflow( context=self.context, type="cwl", @@ -2456,16 +2663,16 @@ def translate(self) -> Workflow: name=self.name, ) # Create context - context = _create_context() + context = _create_context(version=self.cwl_definition.cwlVersion) # Compute root prefix - workflow_id = self.cwl_definition.tool["id"] + workflow_id = self.cwl_definition.id cwl_root_prefix = ( utils.get_name(posixpath.sep, posixpath.sep, workflow_id) if "#" in workflow_id else posixpath.sep ) # Register data locations for config files - path = _get_path(self.cwl_definition.tool["id"]) + path = _get_path(self.cwl_definition.id) self.context.data_manager.register_path( location=Location(deployment=LOCAL_LOCATION, name=LOCAL_LOCATION), path=path, @@ -2489,15 +2696,16 @@ def translate(self) -> Workflow: # Inject initial inputs self._inject_inputs(workflow) # Extract requirements - for hint in self.cwl_definition.hints: - context["hints"][hint["class"]] = hint - for requirement in self.cwl_definition.requirements: - context["requirements"][requirement["class"]] = requirement + for hint in self.cwl_definition.hints or []: + if not isinstance(hint, MutableMapping): + context["hints"][hint.class_] = hint + for requirement in self.cwl_definition.requirements or []: + context["requirements"][requirement.class_] = requirement requirements = {**context["hints"], **context["requirements"]} # Extract workflow outputs cwl_elements = { - utils.get_name("/", cwl_root_prefix, element["id"]): element - for element in self.cwl_definition.tool.get("outputs", []) + utils.get_name("/", cwl_root_prefix, element.id): element + for element in self.cwl_definition.outputs or [] } for output_name in self.output_ports: if output_name.lstrip(posixpath.sep).count(posixpath.sep) == 0: @@ -2512,7 +2720,7 @@ def translate(self) -> Workflow: ) # Search for dependencies in format expression format_deps = resolve_dependencies( - expression=cwl_elements[output_name].get("format"), + expression=cwl_elements[output_name].format, full_js=full_js, expression_lib=expression_lib, ) @@ -2524,11 +2732,11 @@ def translate(self) -> Workflow: processor=_create_token_processor( port_name=port_name, workflow=workflow, - port_type=cwl_elements[output_name]["type"], + port_type=cwl_elements[output_name].type, cwl_element=cwl_elements[output_name], cwl_name_prefix=posixpath.join(cwl_root_prefix, port_name), schema_def_types=_get_schema_def_types(requirements), - format_graph=self.loading_context.loader.graph, + format_graph=cwl_elements[output_name].loadingOptions.graph, context=context, check_type=False, default_required_sf=False, diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index d0f6f47a3..4641369e8 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -15,6 +15,7 @@ ) import cwl_utils.expression +import cwl_utils.parser import cwltool.context import cwltool.load_tool import cwltool.main @@ -679,11 +680,11 @@ def infer_type_from_token(token_value: Any) -> str: def load_cwl_inputs( loading_context: cwltool.context.LoadingContext, - cwl_process: cwltool.process.Process, + loadingOptions: cwl_utils.parser.LoadingOptions, path: str, ) -> MutableMapping[str, Any]: loader = cwltool.load_tool.default_loader(loading_context.fetcher_constructor) - loader.add_namespaces(cwl_process.metadata.get("$namespaces", {})) + loader.add_namespaces(loadingOptions.namespaces or {}) cwl_inputs, _ = loader.resolve_ref( path, checklinks=False, content_types=cwltool.CWL_CONTENT_TYPES ) diff --git a/streamflow/deployment/connector/occam.py b/streamflow/deployment/connector/occam.py index 42624d868..4d2cba90e 100644 --- a/streamflow/deployment/connector/occam.py +++ b/streamflow/deployment/connector/occam.py @@ -315,7 +315,7 @@ async def deploy(self, external: bool) -> None: await super().deploy(external) if not external: deploy_tasks = [] - for (name, service) in self.env_description.items(): + for name, service in self.env_description.items(): nodes = service.get("nodes", ["node22"]) for node in nodes: deploy_tasks.append( diff --git a/streamflow/persistence/sqlite.py b/streamflow/persistence/sqlite.py index 6e65b7a65..494ec221d 100644 --- a/streamflow/persistence/sqlite.py +++ b/streamflow/persistence/sqlite.py @@ -65,7 +65,10 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): async def close(self): if self._connection: + async with self as db: + await db.commit() await self._connection.close() + self._connection = None class SqliteDatabase(CachedDatabase): @@ -81,9 +84,8 @@ def __init__(self, context: StreamFlowContext, connection: str, timeout: int = 2 ) async def close(self): - async with self.connection as db: - await db.commit() await self.connection.close() + self.connection = None @classmethod def get_schema(cls): diff --git a/streamflow/workflow/token.py b/streamflow/workflow/token.py index 726750de7..21b141d27 100644 --- a/streamflow/workflow/token.py +++ b/streamflow/workflow/token.py @@ -7,7 +7,7 @@ from streamflow.core.context import StreamFlowContext from streamflow.core.persistence import DatabaseLoadingContext -from streamflow.core.workflow import Token, Job +from streamflow.core.workflow import Job, Token class IterationTerminationToken(Token): diff --git a/tests/cwl-conformance/streamflow.yml b/tests/cwl-conformance/streamflow.yml index 1d8af5e55..b42d4ea91 100644 --- a/tests/cwl-conformance/streamflow.yml +++ b/tests/cwl-conformance/streamflow.yml @@ -2,4 +2,4 @@ version: v1.0 database: type: default config: - connection: ":memory:" \ No newline at end of file + connection: ":memory:" diff --git a/tests/test_cwl_loop.py b/tests/test_cwl_loop.py index 57ab6b2ff..1fe2afa97 100644 --- a/tests/test_cwl_loop.py +++ b/tests/test_cwl_loop.py @@ -6,10 +6,13 @@ import json from typing import MutableMapping, MutableSequence +import pytest from cwltool.tests.util import get_data from streamflow.cwl.runner import main +pytestmark = pytest.mark.skip(reason="CWL extensions are not supported yet.") + def test_validate_loop() -> None: """Affirm that a loop workflow validates."""