diff --git a/requirements.txt b/requirements.txt index 412333a53..cf82ca5df 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,8 +4,7 @@ antlr4-python3-runtime==4.13.2 asyncssh==2.18.0 bcrypt==4.2.1 cachetools==5.5.0 -cwltool==3.1.20240708091337 -cwl-utils==0.35 +cwl-utils @ git+https://github.com/common-workflow-language/cwl-utils.git@refs/pull/337/head importlib-metadata==8.5.0 Jinja2==3.1.4 jsonschema==4.23.0 diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index ad2732d2e..1e88eb1c3 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -50,6 +50,7 @@ CWLExpressionCommand, CWLForwardCommandTokenProcessor, CWLMapCommandTokenProcessor, + CWLObjectCommandTokenProcessor, ) from streamflow.cwl.hardware import CWLHardwareRequirement from streamflow.cwl.processor import ( @@ -68,8 +69,14 @@ CWLDockerTranslatorConfig, ) from streamflow.cwl.step import ( - CWLConditionalStep, CWLEmptyScatterConditionalStep, CWLInputInjectorStep, CWLLoopConditionalStep, - CWLLoopOutputAllStep, CWLLoopOutputLastStep, CWLScheduleStep, CWLTransferStep + CWLConditionalStep, + CWLEmptyScatterConditionalStep, + CWLInputInjectorStep, + CWLLoopConditionalStep, + CWLLoopOutputAllStep, + CWLLoopOutputLastStep, + CWLScheduleStep, + CWLTransferStep, ) from streamflow.cwl.transformer import ( AllNonNullTransformer, @@ -161,12 +168,14 @@ def _create_command( command.environment[env_entry.envName] = env_entry.envValue # Process inputs for input_port in cwl_element.inputs: - command.processors.append = _get_command_token_processor_from_input( - cwl_element=input_port, - port_type=input_port.type_, - input_name=utils.get_name("", cwl_name_prefix, input_port.id), - is_shell_command=command.is_shell_command, - schema_def_types=schema_def_types, + command.processors.append( + _get_command_token_processor_from_input( + cwl_element=input_port, + port_type=input_port.type_, + input_name=utils.get_name("", cwl_name_prefix, input_port.id), + is_shell_command=command.is_shell_command, + schema_def_types=schema_def_types, + ) ) return command @@ -175,8 +184,12 @@ def _create_command_output_processor_base( port_name: str, workflow: CWLWorkflow, port_target: Target | None, - port_type: Any, - cwl_element: cwl_utils.parser.CommandOutputParameter, + port_type: str | MutableSequence[str], + cwl_element: ( + cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.ExpressionToolOutputParameter + ), context: MutableMapping[str, Any], optional: bool = False, ) -> CWLCommandOutputProcessor: @@ -202,7 +215,7 @@ def _create_command_output_processor_base( glob=( cwl_element.outputBinding.glob if getattr(cwl_element, "outputBinding", None) - else getattr(cwl_element, "path", None) + else None ), load_contents=_get_load_contents(cwl_element), load_listing=_get_load_listing(cwl_element, context), @@ -246,15 +259,26 @@ def _create_command_output_processor( port_name: str, workflow: CWLWorkflow, port_target: Target | None, - port_type: Any, - cwl_element: cwl_utils.parser.CommandOutputParameter, + port_type: ( + str + | cwl_utils.parser.OutputSchema + | MutableSequence[ + str, + cwl_utils.parser.OutputSchema, + ] + ), + cwl_element: ( + cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.ExpressionToolOutputParameter + ), cwl_name_prefix: str, schema_def_types: MutableMapping[str, Any], context: MutableMapping[str, Any], optional: bool = False, ) -> CommandOutputProcessor: # Array type: -> MapCommandOutputProcessor - if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + if isinstance(port_type, get_args(cwl_utils.parser.OutputArraySchema)): return CWLMapCommandOutputProcessor( name=port_name, workflow=workflow, @@ -262,7 +286,7 @@ def _create_command_output_processor( port_name=port_name, workflow=workflow, port_target=port_target, - port_type=port_type.items, + port_type=cast(cwl_utils.parser.OutputArraySchema, port_type).items, cwl_element=cwl_element, cwl_name_prefix=cwl_name_prefix, schema_def_types=schema_def_types, @@ -271,11 +295,11 @@ def _create_command_output_processor( ), ) # Enum type: -> create command output processor - elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + elif isinstance(port_type, get_args(cwl_utils.parser.OutputEnumSchema)): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) - if type_name := port_type.name: + if type_name := cast(cwl_utils.parser.OutputEnumSchema, port_type).name: if type_name.startswith("_:"): enum_prefix = cwl_name_prefix else: @@ -287,19 +311,19 @@ def _create_command_output_processor( name=port_name, workflow=workflow, target=port_target, - token_type=port_type.type_, + token_type=cast(cwl_utils.parser.OutputEnumSchema, port_type).type_, enum_symbols=[ posixpath.relpath( utils.get_name(posixpath.sep, posixpath.sep, s), enum_prefix ) - for s in port_type.symbols + for s in cast(cwl_utils.parser.OutputEnumSchema, port_type).symbols ], expression_lib=expression_lib, full_js=full_js, optional=optional, ) # Record type: -> ObjectCommandOutputProcessor - elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + elif isinstance(port_type, get_args(cwl_utils.parser.OutputRecordSchema)): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) @@ -326,13 +350,17 @@ def _create_command_output_processor( schema_def_types=schema_def_types, context=context, ) - for port_type in port_type.fields + for port_type in cast( + cwl_utils.parser.OutputRecordSchema, port_type + ).fields }, expression_lib=expression_lib, full_js=full_js, - output_eval=cwl_element.outputBinding.outputEval - if getattr(cwl_element, "outputBinding", None) - else None, + output_eval=( + cwl_element.outputBinding.outputEval + if getattr(cwl_element, "outputBinding", None) + else None + ), ) elif isinstance(port_type, MutableSequence): optional = "null" in port_type @@ -807,7 +835,7 @@ def _get_command_token_processor( processor: CommandTokenProcessor | None = None, is_shell_command: bool = False, input_name: str | None = None, - token_type: str | None = None, + token_type: Any | None = None, ) -> CWLCommandTokenProcessor: # Normalize type (Python does not distinguish among all CWL number types) token_type = ( @@ -848,11 +876,11 @@ def _get_command_token_processor_from_input( is_shell_command: bool = False, schema_def_types: MutableMapping[str, Any] | None = None, ) -> CommandTokenProcessor | None: - token = None + processor = None command_line_binding = cwl_element.inputBinding # Array type: -> CWLMapCommandToken if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): - token = CWLMapCommandTokenProcessor( + processor = CWLMapCommandTokenProcessor( name=input_name, processor=_get_command_token_processor_from_input( cwl_element=port_type, @@ -886,7 +914,7 @@ def _get_command_token_processor_from_input( is_shell_command=is_shell_command, schema_def_types=schema_def_types, ) - token = CWLObjectTokenProcessor( + processor = CWLObjectCommandTokenProcessor( name=input_name, processors=processors, ) @@ -926,30 +954,31 @@ def _get_command_token_processor_from_input( ) # Simple type with `inputBinding` specified -> CWLCommandToken if command_line_binding is not None: - if token is not None: + if processor is not None: # By default, do not escape composite command tokens if command_line_binding.shellQuote is None: command_line_binding.shellQuote = False is_shell_command = True - token = _get_command_token_processor( + processor = _get_command_token_processor( binding=command_line_binding, + processor=processor, is_shell_command=is_shell_command, input_name=input_name, ) else: - token = _get_command_token_processor( + processor = _get_command_token_processor( binding=command_line_binding, is_shell_command=is_shell_command, input_name=input_name, - token_type=port_type, + token_type=port_type if isinstance(port_type, str) else port_type.type_, ) - # Simple type without `inputBinding` specified -> token - else: - token = CWLForwardCommandTokenProcessor( + # Simple type without `inputBinding` specified -> CWLForwardCommandToken + elif processor is None: + processor = CWLForwardCommandTokenProcessor( name=input_name, - token_type=port_type, + token_type=port_type if isinstance(port_type, str) else port_type.type_, ) - return token + return processor def _get_hardware_requirement( @@ -1093,23 +1122,14 @@ def _get_secondary_files( secondary_files.append( SecondaryFile( pattern=sf.pattern, - required=sf.required - if sf.required is not None - else default_required, + required=( + sf.required if sf.required is not None else default_required + ), ) ) return secondary_files -def _get_workflow_step_input_type( - element_input: cwl_utils.parser.WorkflowStepInput, - workflow: cwl_utils.parser.Workflow, -): - if not element_input.source: - return "Any" - return cwl_utils.parser.utils.type_for_source(workflow, element_input.source) - - def _inject_value(value: Any): if isinstance(value, MutableSequence): return [_inject_value(v) for v in value] @@ -1167,12 +1187,6 @@ def _inject_value(value: Any): return value -def _is_instance(v: Any, v1_0_type: type, v1_1_type: type, v1_2_type: type): - return ( - isinstance(v, v1_0_type) or isinstance(v, v1_1_type) or isinstance(v, v1_2_type) - ) - - def _percolate_port(port_name: str, *args) -> Port | None: for arg in args: if port_name in arg: @@ -1424,11 +1438,7 @@ def _get_deploy_step(self, deployment_config: DeploymentConfig, workflow: Workfl def _get_input_port( self, workflow: Workflow, - cwl_element: ( - cwl_utils.parser.cwl_v1_0.Process - | cwl_utils.parser.cwl_v1_1.Process - | cwl_utils.parser.cwl_v1_2.Process - ), + cwl_element: cwl_utils.parser.Process, element_input: cwl_utils.parser.InputParameter, global_name: str, port_name: str, @@ -1502,31 +1512,32 @@ def _handle_default_port( def _handle_optional_input_variables( self, - cwl_element, - cwl_name_prefix, - default_ports, - name_prefix, - step_name, - workflow, + cwl_element: cwl_utils.parser.WorkflowStep, + inner_cwl_element: cwl_utils.parser.Process, + cwl_name_prefix: str, + default_ports: MutableMapping[str, Port], + name_prefix: str, + step_name: str, + workflow: CWLWorkflow, ): inner_input_ports, outer_input_ports = set(), set() # Get inner CWL object input names - for element_input in cwl_element.embedded_tool.tool["inputs"]: - inner_cwl_name_prefix = utils.get_inner_cwl_prefix( - cwl_name_prefix, name_prefix, cwl_element + for element_input in inner_cwl_element.inputs: + inner_cwl_name_prefix = utils.get_name( + name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True ) global_name = utils.get_name( - step_name, inner_cwl_name_prefix, element_input["id"] + step_name, inner_cwl_name_prefix, element_input.id ) port_name = posixpath.relpath(global_name, step_name) inner_input_ports.add(port_name) # Get WorkflowStep input names - for element_input in cwl_element.tool["inputs"]: + for element_input in cwl_element.in_: step_name = utils.get_name(name_prefix, cwl_name_prefix, cwl_element.id) cwl_step_name = utils.get_name( name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True ) - global_name = utils.get_name(step_name, cwl_step_name, element_input["id"]) + global_name = utils.get_name(step_name, cwl_step_name, element_input.id) port_name = posixpath.relpath(global_name, step_name) outer_input_ports.add(port_name) # Create a `DefaultTransformer` for each optional input @@ -1642,7 +1653,25 @@ def _recursive_translate( for hint in cwl_element.hints or []: if not isinstance(hint, MutableMapping): current_context["hints"][hint.class_] = hint + else: + # Fail if `cwltool:Loop` is defined as a hint + if hint["class"] == "cwltool:Loop": + raise WorkflowDefinitionException( + "The `cwltool:Loop` clause is valid only under requirements." + ) for requirement in cwl_element.requirements or []: + if requirement.class_ == "Loop": + if not isinstance(cwl_element, get_args(cwl_utils.parser.WorkflowStep)): + raise WorkflowDefinitionException( + "The `cwltool:Loop` clause is not compatible " + f"with the `{cwl_element.__class__.__name__}` class." + ) + if cwl_element.scatter is not None: + raise WorkflowDefinitionException( + "The `cwltool:Loop` clause is not compatible with the `when` directive.") + if cwl_element.when is not None: + raise WorkflowDefinitionException( + "The `cwltool:Loop` clause is not compatible with the `scatter` directive.") current_context["requirements"][requirement.class_] = requirement # In the root process, override requirements when provided in the input file if name_prefix == posixpath.sep: @@ -1700,9 +1729,7 @@ def _recursive_translate( def _translate_command_line_tool( self, workflow: CWLWorkflow, - cwl_element: ( - cwl_utils.parser.CommandLineTool | cwl_utils.parser.ExpressionTool - ), + cwl_element: cwl_utils.parser.CommandLineTool | cwl_utils.parser.ExpressionTool, context: MutableMapping[str, Any], name_prefix: str, cwl_name_prefix: str, @@ -1724,32 +1751,23 @@ def _translate_command_line_tool( if "NetworkAccess" in requirements else False ) - for target in binding_config.targets: - # If original target is local, use a container - if target.deployment.type == "local": - binding_config.targets.append( - _process_docker_requirement( - config_dir=os.path.dirname(self.context.config["path"]), - config=self.workflow_config.propagate( - path=PurePosixPath(name_prefix), - name="docker", - default=CWLDockerTranslatorConfig( - name="default", type="default", config={} - ), - ), - context=context, - docker_requirement=requirements["DockerRequirement"], - network_access=network_access, - target=target, - ) - ) - # Otherwise, throw a warning and skip the DockerRequirement conversion - else: - if logger.isEnabledFor(logging.WARN): - logger.warn( - f"Skipping DockerRequirement conversion for step `{name_prefix}` " - f"when executing on `{target.deployment.name}` deployment." - ) + binding_config.targets = [ + _process_docker_requirement( + config_dir=os.path.dirname(self.context.config["path"]), + config=self.workflow_config.propagate( + path=PurePosixPath(name_prefix), + name="docker", + default=CWLDockerTranslatorConfig( + name="default", type="default", config={} + ), + ), + context=context, + docker_requirement=requirements["DockerRequirement"], + network_access=network_access, + target=target, + ) + for target in binding_config.targets + ] # Create DeploySteps to initialise the execution environment deployments = {t.deployment.name: t.deployment for t in binding_config.targets} deploy_steps = { @@ -1989,13 +2007,19 @@ def _translate_workflow( global_name = utils.get_name( name_prefix, cwl_name_prefix, element_output.id ) + link_merge = element_output.linkMerge + pick_value = ( + None + if context["version"] in ["v1.0", "v1.1"] + else element_output.pickValue + ) # If outputSource element is a list, the output element can depend on multiple ports if isinstance(element_output.outputSource, MutableSequence): # If the list contains only one element and no `linkMerge` or `pickValue` are specified if ( len(element_output.outputSource) == 1 - and not getattr(element_output, "linkMerge", None) - and not getattr(element_output, "pickValue", None) + and link_merge is None + and pick_value is None ): # Treat it as a singleton source_name = utils.get_name( @@ -2031,8 +2055,8 @@ def _translate_workflow( workflow=workflow, ports=ports, output_port=self._get_source_port(workflow, global_name), - link_merge=getattr(element_output, "linkMerge", None), - pick_value=getattr(element_output, "pickValue", None), + link_merge=link_merge, + pick_value=pick_value, ) # Otherwise, the output element depends on a single output port else: @@ -2040,15 +2064,15 @@ def _translate_workflow( name_prefix, cwl_name_prefix, element_output.outputSource ) # If `pickValue` is specified, create a ListMergeCombinator - if getattr(element_output, "pickValue", None): + if pick_value is not None: source_port = self._get_source_port(workflow, source_name) _create_list_merger( name=global_name, workflow=workflow, ports={source_name: source_port}, output_port=self._get_source_port(workflow, global_name), - link_merge=getattr(element_output, "linkMerge", None), - pick_value=getattr(element_output, "pickValue", None), + link_merge=link_merge, + pick_value=pick_value, ) else: # If the output source is an input port, link the output to the input @@ -2085,12 +2109,6 @@ def _translate_workflow_step( cwl_step_name = utils.get_name( name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True ) - # Extract requirements - for hint in cwl_element.hints or []: - if not isinstance(hint, MutableMapping): - context["hints"][hint.class_] = hint - for requirement in cwl_element.requirements or []: - context["requirements"][requirement.class_] = requirement requirements = context["hints"] | context["requirements"] # Extract JavaScript requirements expression_lib, full_js = _process_javascript_requirement(requirements) @@ -2109,14 +2127,6 @@ def _translate_workflow_step( default_ports = {} value_from_transformers = {} input_dependencies = {} - self._handle_optional_input_variables( - cwl_element, - cwl_name_prefix, - default_ports, - name_prefix, - step_name, - workflow, - ) for element_input in cwl_element.in_: self._translate_workflow_step_input( workflow=workflow, @@ -2150,11 +2160,7 @@ def _translate_workflow_step( default_ports[default_name] = transformer.get_output_port() input_ports |= default_ports # Process loop inputs - element_requirements = { - h["class"]: h for h in (cwl_element.hints or []) - } | {r.class_: r for r in (cwl_element.requirements or [])} - if "http://commonwl.org/cwltool#Loop" in element_requirements: - loop_requirement = element_requirements["http://commonwl.org/cwltool#Loop"] + if "Loop" in requirements: # Build combinator loop_combinator = LoopCombinator( workflow=workflow, name=step_name + "-loop-combinator" @@ -2185,7 +2191,7 @@ def _translate_workflow_step( loop_conditional_step = workflow.create_step( cls=CWLLoopConditionalStep, name=step_name + "-loop-when", - expression=loop_requirement["loopWhen"], + expression=requirements["Loop"].loopWhen, expression_lib=expression_lib, full_js=full_js, ) @@ -2312,12 +2318,15 @@ def _translate_workflow_step( self.input_ports |= input_ports # Process condition conditional_step = None - if getattr(cwl_element, "when", None): + cwl_condition = ( + None if context["version"] in ["v1.0", "v1.1"] else cwl_element.when + ) + if cwl_condition is not None: # Create conditional step conditional_step = workflow.create_step( cls=CWLConditionalStep, name=step_name + "-when", - expression=cwl_element.when, + expression=cwl_condition, expression_lib=expression_lib, full_js=full_js, ) @@ -2416,14 +2425,13 @@ def _translate_workflow_step( port_name, external_output_ports[global_name] ) # Add skip ports if there is a condition - if getattr(cwl_element, "when", None): + if cwl_condition: cast(CWLConditionalStep, conditional_step).add_skip_port( port_name, internal_output_ports[global_name] ) # Process loop outputs - if "http://commonwl.org/cwltool#Loop" in element_requirements: - loop_requirement = element_requirements["http://commonwl.org/cwltool#Loop"] - output_method = loop_requirement.get("outputMethod", "last") + if "Loop" in requirements: + output_method = requirements["Loop"].outputMethod # Retrieve loop steps loop_conditional_step = cast( CWLLoopConditionalStep, workflow.steps[step_name + "-loop-when"] @@ -2484,9 +2492,7 @@ def _translate_workflow_step( loop_default_ports = {} loop_value_from_transformers = {} loop_input_dependencies = {} - for loop_input in loop_requirement.get("loop", []): - # Pre-process the `loopSource` field to avoid inconsistencies - loop_input["source"] = loop_input.get("loopSource", loop_input["id"]) + for loop_input in requirements["Loop"].loop or []: self._translate_workflow_step_input( workflow=workflow, context=context, @@ -2494,7 +2500,6 @@ def _translate_workflow_step( element_input=loop_input, name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, - scatter_inputs=scatter_inputs, requirements=requirements, input_ports=loop_input_ports, default_ports=loop_default_ports, @@ -2548,13 +2553,13 @@ def _translate_workflow_step( port_name, combinator_step.get_input_port(port_name) ) # Add skip ports if there is a condition - if getattr(cwl_element, "when", None): + if cwl_condition: for element_output in cwl_element.out: global_name = utils.get_name(step_name, cwl_step_name, element_output) port_name = posixpath.relpath(global_name, step_name) skip_port = ( external_output_ports[global_name] - if "http://commonwl.org/cwltool#Loop" in element_requirements + if "Loop" in requirements else internal_output_ports[global_name] ) cast(CWLConditionalStep, conditional_step).add_skip_port( @@ -2563,14 +2568,17 @@ def _translate_workflow_step( # Update output ports with the internal ones self.output_ports |= internal_output_ports # Process inner element - cwl_step_name = utils.get_name( - name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True - ) run_command = cwl_element.run if cwl_utils.parser.is_process(run_command): run_command.cwlVersion = context["version"] cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) if ":" in run_command.id.split("#")[-1]: + cwl_step_name = utils.get_name( + name_prefix, + cwl_name_prefix, + cwl_element.id, + preserve_cwl_prefix=True, + ) inner_cwl_name_prefix = ( step_name if context["version"] == "v1.0" @@ -2600,10 +2608,20 @@ def _translate_workflow_step( self._recursive_translate( workflow=workflow, cwl_element=run_command, - context=context, + context=context | {"requirements": {k: v for k, v in requirements.items() if k != "Loop"}}, name_prefix=step_name, cwl_name_prefix=inner_cwl_name_prefix, ) + # Handle optional input variables + self._handle_optional_input_variables( + cwl_element=cwl_element, + inner_cwl_element=run_command, + cwl_name_prefix=cwl_name_prefix, + default_ports=default_ports, + name_prefix=name_prefix, + step_name=step_name, + workflow=workflow, + ) # Update output ports with the external ones self.output_ports |= external_output_ports @@ -2625,6 +2643,12 @@ def _translate_workflow_step_input( ): # Extract custom types if present schema_def_types = _get_schema_def_types(requirements) + # Extract element source + element_source = ( + element_input.loopSource + if isinstance(element_input, cwl_utils.parser.cwl_v1_2.LoopInput) + else element_input.source + ) # Extract JavaScript requirements expression_lib, full_js = _process_javascript_requirement(requirements) # Extract names @@ -2683,23 +2707,32 @@ def _translate_workflow_step_input( expression_lib=expression_lib, ) input_dependencies[global_name] = set.union( - ({global_name} - if element_input.source is not None or element_input.default is not None - else set()), + ( + {global_name} + if element_source is not None or element_input.default is not None + else set() + ), {posixpath.join(step_name, d) for d in local_deps}, ) # If `source` entry is present, process output dependencies - if element_input.source is not None: + if element_source is not None: + link_merge = element_input.linkMerge + pick_value = ( + None + if context["version"] in ["v1.0", "v1.1"] + else element_input.pickValue + ) # If source element is a list, the input element can depend on multiple ports - if isinstance(element_input.source, MutableSequence): - # If the list contains only one element and no `linkMerge` is specified, treat it as a singleton + if isinstance(element_source, MutableSequence): + # If the list contains only one element and no `linkMerge` or `pickValue` + # are specified, treat it as a singleton if ( - len(element_input.source) == 1 - and not getattr(element_input, "linkMerge", None) - and not getattr(element_input, "pickValue", None) + len(element_source) == 1 + and link_merge is None + and pick_value is None ): source_name = utils.get_name( - name_prefix, cwl_name_prefix, element_input.source[0] + name_prefix, cwl_name_prefix, next(iter(element_source)) ) source_port = self._get_source_port(workflow, source_name) # If there is a default value, construct a default port block @@ -2719,7 +2752,7 @@ def _translate_workflow_step_input( # Otherwise, create a ListMergeCombinator else: if ( - len(element_input.source) > 1 + len(element_source) > 1 and "MultipleInputFeatureRequirement" not in requirements ): raise WorkflowDefinitionException( @@ -2728,7 +2761,7 @@ def _translate_workflow_step_input( ) source_names = [ utils.get_name(name_prefix, cwl_name_prefix, src) - for src in element_input.source + for src in element_source ] ports = { n: self._get_source_port(workflow, n) for n in source_names @@ -2739,15 +2772,15 @@ def _translate_workflow_step_input( + "-list-merge-combinator", workflow=workflow, ports=ports, - link_merge=getattr(element_input, "linkMerge", None), - pick_value=getattr(element_input, "pickValue", None), + link_merge=link_merge, + pick_value=pick_value, ) # Add ListMergeCombinator output port to the list of input ports for the current step input_ports[global_name] = list_merger.get_output_port() # Otherwise, the input element depends on a single output port else: source_name = utils.get_name( - name_prefix, cwl_name_prefix, cast(str, element_input.source) + name_prefix, cwl_name_prefix, cast(str, element_source) ) source_port = self._get_source_port(workflow, source_name) # If there is a default value, construct a default port block @@ -2786,8 +2819,8 @@ def translate(self) -> Workflow: context=self.context, config=self.workflow_config.config, name=self.name, - cwl_version=self.loading_context.metadata["cwlVersion"], - format_graph=self.loading_context.loader.graph, + cwl_version=self.cwl_definition.cwlVersion, + format_graph=self.cwl_definition.loadingOptions.graph, ) # Create context context = _create_context(version=self.cwl_definition.cwlVersion) diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 94d2a4140..41da4be5b 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -import argparse import asyncio import logging import posixpath @@ -13,13 +12,7 @@ import cwl_utils.expression import cwl_utils.parser -import cwltool.context -import cwltool.load_tool -import cwltool.main -import cwltool.process -import cwltool.resolver -import cwltool.workflow -from cwltool.utils import CONTENT_LIMIT +from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT from streamflow.core.context import StreamFlowContext from streamflow.core.data import DataLocation, DataType, FileType @@ -641,29 +634,6 @@ def get_path_from_token(token_value: MutableMapping[str, Any]) -> str | None: return location -def get_inner_cwl_prefix( - cwl_prefix: str, prefix: str, step: cwltool.workflow.WorkflowStep -) -> str: - cwl_step_name = get_name(prefix, cwl_prefix, step.id, preserve_cwl_prefix=True) - run_command = step.tool["run"] - if isinstance(run_command, MutableMapping): - if ":" in step.embedded_tool.tool["id"].split("#")[-1]: - return posixpath.join(cwl_step_name, "run") - else: - return get_name( - prefix, - cwl_prefix, - step.embedded_tool.tool["id"], - preserve_cwl_prefix=True, - ) - else: - return ( - get_name(posixpath.sep, posixpath.sep, step.embedded_tool.tool["id"]) - if "#" in step.embedded_tool.tool["id"] - else posixpath.sep - ) - - def get_token_class(token_value: Any) -> str | None: if isinstance(token_value, MutableMapping): return token_value.get("class", token_value.get("type")) @@ -689,43 +659,6 @@ def infer_type_from_token(token_value: Any) -> str: return "Any" -def load_cwl_inputs( - loading_context: cwltool.context.LoadingContext, - loadingOptions: cwl_utils.parser.LoadingOptions, - path: str, -) -> MutableMapping[str, Any]: - loader = cwltool.load_tool.default_loader(loading_context.fetcher_constructor) - loader.add_namespaces(loadingOptions.namespaces or {}) - cwl_inputs, _ = loader.resolve_ref( - path, checklinks=False, content_types=cwltool.CWL_CONTENT_TYPES - ) - - def expand_formats(p) -> None: - if "format" in p: - p["format"] = loader.expand_url(p["format"], "") - - cwltool.utils.visit_class(cwl_inputs, ("File",), expand_formats) - return cwl_inputs - - -def load_cwl_workflow( - path: str, -) -> tuple[cwltool.process.Process, cwltool.context.LoadingContext]: - loading_context = cwltool.context.LoadingContext() - loading_context.resolver = cwltool.resolver.tool_resolver - loading_context.loader = cwltool.load_tool.default_loader( - loading_context.fetcher_constructor - ) - loading_context, workflowobj, uri = cwltool.load_tool.fetch_document( - path, loading_context - ) - cwltool.main.setup_schema(argparse.Namespace(enable_ext=True), None) - loading_context, uri = cwltool.load_tool.resolve_and_validate_document( - loading_context, workflowobj, uri - ) - return cwltool.load_tool.make_tool(uri, loading_context), loading_context - - class LoadListing(Enum): no_listing = 0 shallow_listing = 1 diff --git a/streamflow/log_handler.py b/streamflow/log_handler.py index 1e964398e..f438cad33 100644 --- a/streamflow/log_handler.py +++ b/streamflow/log_handler.py @@ -122,5 +122,5 @@ def highlight(self, msg): ) defaultStreamHandler.setFormatter(formatter) logger.addHandler(defaultStreamHandler) -logger.setLevel(logging.INFO) +logger.setLevel(logging.DEBUG) logger.propagate = False diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index c44db0c54..60332f718 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -12,13 +12,11 @@ import uuid from abc import ABC, abstractmethod from collections.abc import MutableMapping, MutableSequence -from typing import Any, cast +from typing import Any, cast, get_args from zipfile import ZipFile -import cwltool.command_line_tool -import cwltool.context -import cwltool.process -import cwltool.workflow +import cwl_utils.parser +import cwl_utils.parser.utils from yattag import Doc import streamflow.core.utils @@ -78,6 +76,47 @@ def _get_action_status(status: Status) -> str: raise WorkflowProvenanceException(f"Action status {status.name} not supported.") +def _get_cwl_embedded_tool( + cwl_prefix: str, + prefix: str, + cwl_step: cwl_utils.parser.WorkflowStep, + step_name: str, + version: str, +) -> tuple[cwl_utils.parser.Process, str]: + run_command = cwl_step.run + if cwl_utils.parser.is_process(run_command): + run_command.cwlVersion = version + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + if ":" in run_command.id.split("#")[-1]: + cwl_step_name = streamflow.cwl.utils.get_name( + prefix, cwl_prefix, cwl_step.id, preserve_cwl_prefix=True + ) + cwl_prefix = ( + step_name if version == "v1.0" else posixpath.join(cwl_step_name, "run") + ) + else: + cwl_prefix = streamflow.cwl.utils.get_name( + prefix, + cwl_prefix, + run_command.id, + preserve_cwl_prefix=True, + ) + else: + run_command = cwl_step.loadingOptions.fetcher.urljoin( + cwl_step.loadingOptions.fileuri, run_command + ) + run_command = cwl_utils.parser.load_document_by_uri( + run_command, loadingOptions=cwl_step.loadingOptions + ) + cwl_utils.parser.utils.convert_stdstreams_to_files(run_command) + cwl_prefix = ( + streamflow.cwl.utils.get_name(posixpath.sep, posixpath.sep, run_command.id) + if "#" in run_command.id + else posixpath.sep + ) + return run_command, cwl_prefix + + def _get_cwl_entity_id(entity_id: str) -> str: tokens = entity_id.split("#") if len(tokens) > 1: @@ -108,11 +147,8 @@ def _get_cwl_param(cwl_param: MutableMapping[str, Any]) -> MutableMapping[str, A def _get_cwl_programming_language( - loading_context: cwltool.context.LoadingContext, + version: str, ) -> MutableMapping[str, Any]: - version = loading_context.metadata[ - "http://commonwl.org/cwltool#original_cwlVersion" - ] return { "@id": "https://w3id.org/workflowhub/workflow-ro-crate#cwl", "@type": "ComputerLanguage", @@ -934,18 +970,15 @@ def __init__( raise WorkflowProvenanceException( "Cannot build a single workflow provenance for multiple workflow definitions." ) - ( - self.cwl_definition, - self.loading_context, - ) = streamflow.cwl.utils.load_cwl_workflow(list(paths)[0]) + self.cwl_definition = cwl_utils.parser.load_document_by_uri(next(iter(paths))) self.scatter_map: MutableMapping[str, MutableSequence[str]] = {} def _add_metadata( self, - cwl_object: cwltool.process.Process, + cwl_object: cwl_utils.parser.Process, jsonld_object: MutableMapping[str, Any], ) -> None: - for key, value in cwl_object.metadata.items(): + for key, value in cwl_object.loadingOptions.addl_metadata.items(): jsonld_object[ key.removeprefix("http://schema.org/").removeprefix( "https://schema.org/" @@ -985,10 +1018,10 @@ def _add_params( cwl_prefix: str, prefix: str, jsonld_element: MutableMapping[str, Any], - cwl_element: cwltool.process.Process, + cwl_element: cwl_utils.parser.Process, ) -> None: # Add inputs - for cwl_input in cwl_element.tool["inputs"]: + for cwl_input in cwl_element.inputs or []: jsonld_param = _get_cwl_param(cwl_input) self.graph[jsonld_param["@id"]] = jsonld_param jsonld_element.setdefault("input", []).append({"@id": jsonld_param["@id"]}) @@ -999,7 +1032,7 @@ def _add_params( ) self.register_input_port(port_name, jsonld_param) # Add outputs - for cwl_output in cwl_element.tool["outputs"]: + for cwl_output in cwl_element.outputs or []: jsonld_param = _get_cwl_param(cwl_output) self.graph[jsonld_param["@id"]] = jsonld_param jsonld_element.setdefault("output", []).append({"@id": jsonld_param["@id"]}) @@ -1064,25 +1097,33 @@ def _get_source( return {"@id": self.output_port_map[source_name]["@id"]} def _get_step( - self, cwl_prefix: str, prefix: str, cwl_step: cwltool.workflow.WorkflowStep + self, + cwl_prefix: str, + prefix: str, + cwl_step: cwl_utils.parser.WorkflowStep, + version: str, ) -> MutableMapping[str, Any]: jsonld_step = { "@id": _get_cwl_entity_id(cwl_step.id), "@type": "HowToStep", } step_name = streamflow.cwl.utils.get_name(prefix, cwl_prefix, cwl_step.id) - cwl_prefix = streamflow.cwl.utils.get_inner_cwl_prefix( - prefix, cwl_prefix, cwl_step + embedded_tool, cwl_prefix = _get_cwl_embedded_tool( + cwl_prefix=cwl_prefix, + prefix=prefix, + cwl_step=cwl_step, + step_name=step_name, + version=version, ) - if isinstance(cwl_step.embedded_tool, cwltool.workflow.Workflow): + if isinstance(embedded_tool, get_args(cwl_utils.parser.Workflow)): work_example = self._get_workflow( cwl_prefix=cwl_prefix, prefix=step_name, - cwl_workflow=cwl_step.embedded_tool, + cwl_workflow=embedded_tool, ) else: work_example = self._get_tool( - cwl_prefix=cwl_prefix, prefix=step_name, cwl_tool=cwl_step.embedded_tool + cwl_prefix=cwl_prefix, prefix=step_name, cwl_tool=embedded_tool ) self.register_step(step_name, jsonld_step) jsonld_step["workExample"] = {"@id": work_example["@id"]} @@ -1093,14 +1134,11 @@ def _get_tool( self, cwl_prefix: str, prefix: str, - cwl_tool: ( - cwltool.command_line_tool.CommandLineTool - | cwltool.command_line_tool.ExpressionTool - ), + cwl_tool: cwl_utils.parser.CommandLineTool | cwl_utils.parser.ExpressionTool, ) -> MutableMapping[str, Any]: # Create entity - entity_id = _get_cwl_entity_id(cwl_tool.tool["id"]) - path = cwl_tool.tool["id"].split("#")[0][7:] + entity_id = _get_cwl_entity_id(cwl_tool.id) + path = cwl_tool.id.split("#")[0][7:] if path not in self.files_map: self.graph["./"]["hasPart"].append({"@id": entity_id}) self.files_map[path] = os.path.basename(path) @@ -1113,8 +1151,8 @@ def _get_tool( "@type": ["SoftwareApplication", "File"], } # Add description - if "doc" in cwl_tool.tool: - jsonld_tool["description"] = cwl_tool.tool["doc"] + if cwl_tool.doc: + jsonld_tool["description"] = cwl_tool.doc # Add metadata self._add_metadata(cwl_tool, jsonld_tool) # Add inputs and outputs @@ -1122,21 +1160,21 @@ def _get_tool( return jsonld_tool def _get_workflow( - self, cwl_prefix: str, prefix: str, cwl_workflow: cwltool.workflow.Workflow + self, cwl_prefix: str, prefix: str, cwl_workflow: cwl_utils.parser.Workflow ) -> MutableMapping[str, Any]: # Create entity - entity_id = _get_cwl_entity_id(cwl_workflow.tool["id"]) - path = cwl_workflow.tool["id"].split("#")[0][7:] + entity_id = _get_cwl_entity_id(cwl_workflow.id) + path = cwl_workflow.id.split("#")[0][7:] jsonld_workflow = cast( dict[str, Any], _get_workflow_template(entity_id, entity_id.split("#")[-1]) ) | {"sha1": _file_checksum(path, hashlib.new("sha1", usedforsecurity=False))} - if (path := cwl_workflow.tool["id"].split("#")[0][7:]) not in self.files_map: + if (path := cwl_workflow.id.split("#")[0][7:]) not in self.files_map: self.graph["./"]["hasPart"].append({"@id": entity_id}) self.files_map[path] = os.path.basename(path) - jsonld_workflow["@type"].append("File") + cast(MutableSequence, jsonld_workflow["@type"]).append("File") # Add description - if "doc" in cwl_workflow.tool: - jsonld_workflow["description"] = cwl_workflow.tool["doc"] + if cwl_workflow.doc: + jsonld_workflow["description"] = cwl_workflow.doc # Add metadata self._add_metadata(cwl_workflow, jsonld_workflow) # Add inputs and outputs @@ -1145,7 +1183,13 @@ def _get_workflow( if len(cwl_workflow.steps) > 0: jsonld_workflow["step"] = [] jsonld_steps = [ - self._get_step(cwl_prefix, prefix, s) for s in cwl_workflow.steps + self._get_step( + cwl_prefix=cwl_prefix, + prefix=prefix, + cwl_step=s, + version=cwl_workflow.cwlVersion, + ) + for s in cwl_workflow.steps ] self._register_steps( cwl_prefix=cwl_prefix, @@ -1153,10 +1197,11 @@ def _get_workflow( jsonld_entity=jsonld_workflow, jsonld_steps=jsonld_steps, cwl_steps=cwl_workflow.steps, + version=cwl_workflow.cwlVersion, ) # Connect output sources workflow_inputs = [inp["@id"] for inp in jsonld_workflow.get("output", [])] - for cwl_output in cwl_workflow.tool.get("outputs", []): + for cwl_output in cwl_workflow.outputs or []: if source := cwl_output.get("outputSource"): connection = self._get_connection( cwl_prefix=cwl_prefix, @@ -1253,7 +1298,8 @@ def _register_steps( prefix: str, jsonld_entity: MutableMapping[str, Any], jsonld_steps: MutableSequence[MutableMapping[str, Any]], - cwl_steps: MutableSequence[cwltool.workflow.WorkflowStep], + cwl_steps: MutableSequence[cwl_utils.parser.WorkflowStep], + version: str, ): has_part = set() for cwl_step, jsonld_step in zip(cwl_steps, jsonld_steps): @@ -1261,8 +1307,12 @@ def _register_steps( cwl_step_name = streamflow.cwl.utils.get_name( prefix, cwl_prefix, cwl_step.id, preserve_cwl_prefix=True ) - inner_cwl_prefix = streamflow.cwl.utils.get_inner_cwl_prefix( - prefix, cwl_prefix, cwl_step + embedded_tool, inner_cwl_prefix = _get_cwl_embedded_tool( + cwl_prefix=cwl_prefix, + prefix=prefix, + step_name=step_name, + cwl_step=cwl_step, + version=version, ) # Register step jsonld_entity["step"].append({"@id": jsonld_step["@id"]}) @@ -1270,28 +1320,28 @@ def _register_steps( # Register workExample has_part.add(jsonld_step["workExample"]["@id"]) # Find scatter elements - if isinstance(cwl_step.tool.get("scatter"), str): + if isinstance(cwl_step.scatter, str): self.scatter_map[step_name] = [ streamflow.cwl.utils.get_name( - step_name, cwl_step_name, cwl_step.tool["scatter"] + step_name, cwl_step_name, cwl_step.scatter ) ] else: self.scatter_map[step_name] = [ streamflow.cwl.utils.get_name(step_name, cwl_step_name, n) - for n in cwl_step.tool.get("scatter", []) + for n in (cwl_step.scatter or []) ] # Connect sources workflow_inputs = [inp["@id"] for inp in jsonld_entity.get("input", [])] - for cwl_input in cwl_step.tool.get("inputs", []): - if source := cwl_input.get("source"): + for cwl_input in cwl_step.in_ or []: + if source := cwl_input.source: global_name = streamflow.cwl.utils.get_name( prefix, cwl_prefix, cwl_input["id"] ) port_name = posixpath.relpath(global_name, step_name) - for inner_input in cwl_step.embedded_tool.tool.get("inputs", []): + for inner_input in embedded_tool.inputs or []: inner_global_name = streamflow.cwl.utils.get_name( - step_name, inner_cwl_prefix, inner_input["id"] + step_name, inner_cwl_prefix, inner_input.id ) inner_port_name = posixpath.relpath( inner_global_name, step_name @@ -1301,7 +1351,7 @@ def _register_steps( cwl_prefix=cwl_prefix, prefix=prefix, source=source, - target_parameter=_get_cwl_entity_id(inner_input["id"]), + target_parameter=_get_cwl_entity_id(inner_input.id), workflow_inputs=workflow_inputs, ) self.graph[connection["@id"]] = connection @@ -1357,7 +1407,9 @@ async def get_main_entity(self) -> MutableMapping[str, Any]: path, hashlib.new("sha1", usedforsecurity=False) ) # Add programming language - programming_language = _get_cwl_programming_language(self.loading_context) + programming_language = _get_cwl_programming_language( + self.cwl_definition.cwlVersion + ) self.graph[programming_language["@id"]] = programming_language main_entity["programmingLanguage"] = {"@id": programming_language["@id"]} # Add metadata @@ -1366,12 +1418,14 @@ async def get_main_entity(self) -> MutableMapping[str, Any]: self._add_params(cwl_prefix, posixpath.sep, main_entity, self.cwl_definition) # Add steps if present if ( - isinstance(self.cwl_definition, cwltool.workflow.Workflow) + isinstance(self.cwl_definition, get_args(cwl_utils.parser.Workflow)) and len(self.cwl_definition.steps) > 0 ): main_entity["step"] = [] jsonld_steps = [ - self._get_step(cwl_prefix, posixpath.sep, s) + self._get_step( + cwl_prefix, posixpath.sep, s, self.cwl_definition.cwlVersion + ) for s in self.cwl_definition.steps ] self._register_steps( @@ -1380,6 +1434,7 @@ async def get_main_entity(self) -> MutableMapping[str, Any]: jsonld_entity=main_entity, jsonld_steps=jsonld_steps, cwl_steps=self.cwl_definition.steps, + version=self.cwl_definition.cwlVersion, ) # Connect output sources workflow_inputs = [inp["@id"] for inp in main_entity.get("output", [])] diff --git a/test-requirements.txt b/test-requirements.txt index d17cc3f53..47f288246 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,5 @@ cwltest==2.5.20241122133319 +cwltool==3.1.20240708091337 pytest==8.3.4 pytest-asyncio==0.24.0 pytest-cov==6.0.0 diff --git a/tests/test_build_wf.py b/tests/test_build_wf.py index 140c14ceb..74ad6249c 100644 --- a/tests/test_build_wf.py +++ b/tests/test_build_wf.py @@ -208,7 +208,7 @@ async def test_execute_step(context: StreamFlowContext): port_target=None, port_type="string", cwl_element={}, - context={"hints": {}, "requirements": {}}, + context={"hints": {}, "requirements": {}, "version": "v1.2"}, ), ) step.add_input_port(in_port_name, in_port) @@ -350,7 +350,7 @@ async def test_workflow(context: StreamFlowContext): port_target=None, port_type="string", cwl_element={}, - context={"hints": {}, "requirements": {}}, + context={"hints": {}, "requirements": {}, "version": "v1.2"}, ), ) exec_step.add_input_port(in_port_name, in_port) diff --git a/tests/test_cwl_loop.py b/tests/test_cwl_loop.py index dddaa338c..07af3ef1a 100644 --- a/tests/test_cwl_loop.py +++ b/tests/test_cwl_loop.py @@ -6,13 +6,10 @@ import json from collections.abc import MutableSequence, MutableMapping -import pytest from cwltool.tests.util import get_data from streamflow.cwl.runner import main -pytestmark = pytest.mark.skip(reason="CWL extensions are not supported yet.") - def test_validate_loop() -> None: """Affirm that a loop workflow validates.""" diff --git a/tests/test_provenance.py b/tests/test_provenance.py index da8d5dc68..c40c447c3 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -259,8 +259,10 @@ async def test_execute_step(context: StreamFlowContext): workflow=cast(CWLWorkflow, workflow), port_target=None, port_type="string", - cwl_element=cwl_utils.parser.cwl_v1_2.CommandOutputParameter(type_="string"), - context={"hints": {}, "requirements": {}}, + cwl_element=cwl_utils.parser.cwl_v1_2.CommandOutputParameter( + type_="string" + ), + context={"hints": {}, "requirements": {}, "version": "v1.2"}, ), ) token_list = [Token(token_value)] diff --git a/tests/test_translator.py b/tests/test_translator.py index b79bbacc7..1ba25399f 100644 --- a/tests/test_translator.py +++ b/tests/test_translator.py @@ -6,21 +6,17 @@ from pathlib import PurePosixPath from typing import Any -import cwltool.context import pytest +from cwltool.tests.util import get_data from streamflow.config.config import WorkflowConfig from streamflow.config.validator import SfValidator -from streamflow.cwl.token import CWLFileToken -from streamflow.cwl.workflow import CWLWorkflow - from streamflow.core import utils from streamflow.core.context import StreamFlowContext from streamflow.cwl.runner import main - -from cwltool.tests.util import get_data - +from streamflow.cwl.token import CWLFileToken from streamflow.cwl.translator import CWLTranslator +from streamflow.cwl.workflow import CWLWorkflow from streamflow.deployment.utils import get_binding_config from streamflow.workflow.executor import StreamFlowExecutor from streamflow.workflow.token import TerminationToken @@ -126,8 +122,8 @@ async def test_inject_remote_input(context: StreamFlowContext) -> None: output_directory=tempfile.gettempdir(), cwl_definition=None, # cwltool.process.Process, cwl_inputs={"model": input_data}, + cwl_inputs_path=None, workflow_config=workflow_config, - loading_context=cwltool.context.LoadingContext(), ) workflow = CWLWorkflow( context=context,