diff --git a/CHANGES.rst b/CHANGES.rst index df920a2b4..6363450d6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -23,6 +23,10 @@ Changes: have to provide the value explicitly, or update the deployed `Process` definition afterwards with the relevant ``PUT`` request. Since ``public`` will now be used by default, the `CLI` will not automatically inject the value in the payload anymore when omitted. +- Remove attribute ``WpsProcessInterface.stage_output_id_nested`` and enforce the behavior of nesting output by ID + under corresponding directories for all remote `Process` execution when resolving `CWL` `Workflow` steps. This + ensures a more consistent file and directory resolution between steps of different nature (`CWL`, `WPS`, `OGC` based) + using multiple combinations of ``glob`` patterns and expected media-types. Fixes: ------ @@ -38,6 +42,10 @@ Fixes: Links will only be listed within the returned ``processSummary`` to respect the `OGC API - Processes` schema. - Fix `CLI` not removing embedded ``links`` in ``processSummary`` from ``deploy`` operation response when ``-nL``/``--no-links`` option is specified. +- Fix `CWL` definitions combining nested ``enum`` types as ``["null", , {type: array, items: ]`` without an + explicit ``name`` or ``SchemaDefRequirement`` causing failing ``schema_salad`` resolution under ``cwltool``. A patch + is applied for the moment to inject a temporary ``name`` to let the `CWL` engine succeed schema validation (relates + to `common-workflow-language/cwltool#1908 `_). .. _changes_4.31.0: diff --git a/setup.cfg b/setup.cfg index 4658d498a..a3e913928 100644 --- a/setup.cfg +++ b/setup.cfg @@ -86,7 +86,7 @@ skip = *.egg*,build,env,src,venv,reports,node_modules [bandit] skips = B101,B320,B410 -exclude = *.egg-info,build,dist,env,tests,./tests,test_* +exclude = *.egg-info,./build,./dist,./env,./tests,test_* targets = . [flake8] diff --git a/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml b/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml index 0aa59f1a9..6f3f884c7 100644 --- a/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml +++ b/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml @@ -10,6 +10,7 @@ processDescription: - mimeType: text/plain default: true minOccurs: 1 + maxOccurs: "unbounded" outputs: - id: output_files formats: diff --git a/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl b/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl index e38f54517..78f3cd0f1 100644 --- a/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl +++ b/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl @@ -32,6 +32,7 @@ inputs: position: 1 outputs: output_files: + # NOTE: always one, but using array to allow chaining itself any amount of times type: type: array items: File diff --git a/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/deploy.yml b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/deploy.yml new file mode 100644 index 000000000..56a5a12e2 --- /dev/null +++ b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/deploy.yml @@ -0,0 +1,13 @@ +processDescription: + process: + id: Finch_EnsembleGridPointWetdays + jobControlOptions: + - async-execute + outputTransmission: + - reference +executionUnit: + # note: This does not work by itself! The test suite injects the file dynamically. + - href: "tests/functional/application-packages/Finch_EnsembleGridPointWetdays/package.cwl" + # note: alternative for WPS (applied in tests), inspired from: + # https://finch.crim.ca/wps?service=WPS&request=DescribeProcess&version=1.0.0&identifier=ensemble_grid_point_wetdays +deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication" diff --git a/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/describe.xml b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/describe.xml new file mode 100644 index 000000000..23b87868f --- /dev/null +++ b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/describe.xml @@ -0,0 +1,424 @@ + + + + + + ensemble_grid_point_wetdays + Number of wet days + The number of days with daily precipitation at or above a given threshold. + + + lat + Latitude + Latitude coordinate. Accepts a comma separated list of floats for multiple grid cells. + + string + + + + lon + Longitude + Longitude coordinate. Accepts a comma separated list of floats for multiple grid cells. + + string + + + + start_date + Initial date + Initial date for temporal subsetting. Can be expressed as year (%Y), year-month (%Y-%m) or year-month-day(%Y-%m-%d). Defaults to first day in file. + + string + + + + end_date + Final date + Final date for temporal subsetting. Can be expressed as year (%Y), year-month (%Y-%m) or year-month-day(%Y-%m-%d). Defaults to last day in file. + + string + + + + ensemble_percentiles + Ensemble percentiles + Ensemble percentiles to calculate for input climate simulations. Accepts a comma separated list of integers. An empty string will disable the ensemble reduction and the output will have all members along the 'realization' dimension, using the input filenames as coordinates. + + string + 10,50,90 + + + + average + Perform spatial average. + Whether to average over spatial dimensions or not. Averaging is done before the ensemble percentiles. + + boolean + False + + + + dataset + Dataset name + Name of the dataset from which to get netcdf files for inputs. + + string + + candcs-u5 + bccaqv2 + candcs-u6 + humidex-daily + + candcs-u5 + + + + scenario + Emission Scenario + Emission scenario (RCPs or SSPs, depending on the dataset) + + string + + ssp245 + ssp585 + rcp85 + rcp26 + rcp45 + ssp126 + + + + + models + Models to include in ensemble + When calculating the ensemble, include only these models. Allowed values depend on the dataset chosen. By default, all models are used ('all'), taking the first realization of each. Special sub-lists are also available :candcs-u5: ['24models', 'pcic12'], bccaqv2: ['24models', 'pcic12'], candcs-u6: ['26models'], humidex-daily: ['humidex_models'] + + string + + all + BNU-ESM + CCSM4 + CESM1-CAM5 + CNRM-CM5 + CSIRO-Mk3-6-0 + CanESM2 + FGOALS-g2 + GFDL-CM3 + GFDL-ESM2G + GFDL-ESM2M + HadGEM2-AO + HadGEM2-ES + IPSL-CM5A-LR + IPSL-CM5A-MR + MIROC-ESM-CHEM + MIROC-ESM + MIROC5 + MPI-ESM-LR + MPI-ESM-MR + MRI-CGCM3 + NorESM1-M + NorESM1-ME + bcc-csm1-1-m + bcc-csm1-1 + + ACCESS-CM2 + ACCESS-ESM1-5 + BCC-CSM2-MR + CMCC-ESM2 + CNRM-CM6-1 + CNRM-ESM2-1 + CanESM5 + EC-Earth3 + EC-Earth3-Veg + FGOALS-g3 + GFDL-ESM4 + HadGEM3-GC31-LL + INM-CM4-8 + INM-CM5-0 + IPSL-CM6A-LR + KACE-1-0-G + KIOST-ESM + MIROC-ES2L + MIROC6 + MPI-ESM1-2-HR + MPI-ESM1-2-LR + MRI-ESM2-0 + NorESM2-LM + NorESM2-MM + TaiESM1 + UKESM1-0-LL + + GISS-E2-1-G + EC-Earth3-Veg-LR + + 24models + pcic12 + 26models + humidex_models + + all + + + + thresh + Thresh + Precipitation value over which a day is considered wet. + + string + 1.0 mm/day + + + + freq + Frequency + Resampling frequency. + + string + + YS + MS + QS-DEC + AS-JUL + + YS + + + + op + Op + Comparison operation. Default: ">=". + + string + + gt + >= + > + ge + + >= + + + + month + Select by month + Months of the year over which to compute indicator. + + integer + + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + + + + + season + Select by season + Climatological season over which to compute indicator. + + string + + DJF + MAM + JJA + SON + + + + + check_missing + Missing value handling method + Method used to determine which aggregations should be considered missing. + + string + + any + wmo + pct + at_least_n + skip + from_context + + any + + + + missing_options + Missing method parameters + JSON representation of dictionary of missing method parameters. + + + + application/json + + + + + application/json + + + + + + cf_compliance + Strictness level for CF-compliance input checks. + Whether to log, warn or raise when inputs have non-CF-compliant attributes. + + string + + log + warn + raise + + warn + + + + data_validation + Strictness level for data validation input checks. + Whether to log, warn or raise when inputs fail data validation checks. + + string + + log + warn + raise + + raise + + + + output_name + Name of the output + Prefix of the output filename, defaults to the dataset name and the identifier of the process. + + string + + + + output_format + Output format choice + Choose in which format you want to receive the result. CSV actually means a zip file of two csv files. + + string + + netcdf + csv + + netcdf + + + + csv_precision + Number of decimal places to round to in the CSV output. + Only valid if output_format is CSV. If not set, all decimal places of a 64 bit floating precision number are printed. If negative, rounds before the decimal point. + + integer + + + + + + output + Result + The format depends on the 'output_format' input parameter. + + + + application/x-netcdf + base64 + + + + + application/x-netcdf + base64 + + + application/zip + base64 + + + + + + output_log + Logging information + Collected logs during process run. + + + + text/plain + + + + + text/plain + + + + + + + diff --git a/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/execute.yml b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/execute.yml new file mode 100644 index 000000000..486c6ffc9 --- /dev/null +++ b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/execute.yml @@ -0,0 +1,13 @@ +lat: "45.35629610945964" +lon: "-73.98748912005094" +start_date: "1950" +end_date: "1960" +ensemble_percentiles: "" +dataset: "candcs-u6" +scenario: "ssp126" +models: "26models" +freq: "YS" +data_validation: "warn" +output_format: "csv" +csv_precision: 0 +thresh: "15 mm\/day" diff --git a/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/package.cwl b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/package.cwl new file mode 100644 index 000000000..0c983d97f --- /dev/null +++ b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/package.cwl @@ -0,0 +1,312 @@ +# NOTE: +# Inspired from (but not an exact equivalent): +# https://finch.crim.ca/wps?service=WPS&request=DescribeProcess&version=1.0.0&identifier=ensemble_grid_point_wetdays +# Outputs are modified to only collect stdout, since we don't have the expected outputs produced by the real process. +# The 'baseCommand' is also added to produce this stdout output. +# All remaining arguments are identical. +cwlVersion: v1.0 +class: CommandLineTool +requirements: + InlineJavascriptRequirement: {} +# NOTE: replaced by 'baseCommand' +#hints: +# WPS1Requirement: +# provider: https://finch.crim.ca/wps +# process: ensemble_grid_point_wetdays +baseCommand: echo +inputs: +- id: lat + type: + - string + - type: array + items: string +- id: lon + type: + - string + - type: array + items: string +- id: start_date + type: + - 'null' + - string +- id: end_date + type: + - 'null' + - string +- id: ensemble_percentiles + type: + - 'null' + - string + default: 10,50,90 +- id: average + type: + - 'null' + - boolean + default: false +- id: dataset + type: + - 'null' + - type: enum + symbols: + - humidex-daily + - candcs-u5 + - candcs-u6 + - bccaqv2 + default: candcs-u5 +# WARNING: +# Following definition combining 'enum' and its corresponding nested definition in 'array' caused a +# schema-salad name resolution error. This CWL is used particularly to validate this *valid* type resolution. +# see https://github.com/common-workflow-language/cwltool/issues/1908 +- id: scenario + type: + - 'null' + - type: enum + symbols: + - ssp126 + - rcp85 + - rcp45 + - rcp26 + - ssp585 + - ssp245 + - type: array + items: + type: enum + symbols: + - ssp126 + - rcp85 + - rcp45 + - rcp26 + - ssp585 + - ssp245 +- id: models + type: + - 'null' + - type: enum + symbols: + - KACE-1-0-G + - CCSM4 + - MIROC5 + - EC-Earth3-Veg + - TaiESM1 + - GFDL-ESM4 + - GFDL-CM3 + - CanESM5 + - HadGEM3-GC31-LL + - INM-CM4-8 + - IPSL-CM5A-MR + - EC-Earth3 + - GFDL-ESM2G + - humidex_models + - GFDL-ESM2M + - MIROC-ESM + - CSIRO-Mk3-6-0 + - MPI-ESM-LR + - NorESM1-M + - CNRM-CM5 + - all + - GISS-E2-1-G + - 24models + - MPI-ESM1-2-HR + - CNRM-ESM2-1 + - CNRM-CM6-1 + - CanESM2 + - FGOALS-g3 + - NorESM1-ME + - IPSL-CM6A-LR + - CMCC-ESM2 + - pcic12 + - EC-Earth3-Veg-LR + - ACCESS-ESM1-5 + - MRI-CGCM3 + - MIROC-ESM-CHEM + - NorESM2-MM + - bcc-csm1-1-m + - BNU-ESM + - UKESM1-0-LL + - CESM1-CAM5 + - MIROC-ES2L + - MRI-ESM2-0 + - HadGEM2-ES + - MIROC6 + - MPI-ESM-MR + - INM-CM5-0 + - bcc-csm1-1 + - BCC-CSM2-MR + - ACCESS-CM2 + - NorESM2-LM + - IPSL-CM5A-LR + - FGOALS-g2 + - HadGEM2-AO + - 26models + - MPI-ESM1-2-LR + - KIOST-ESM + - type: array + items: + type: enum + symbols: + - KACE-1-0-G + - CCSM4 + - MIROC5 + - EC-Earth3-Veg + - TaiESM1 + - GFDL-ESM4 + - GFDL-CM3 + - CanESM5 + - HadGEM3-GC31-LL + - INM-CM4-8 + - IPSL-CM5A-MR + - EC-Earth3 + - GFDL-ESM2G + - humidex_models + - GFDL-ESM2M + - MIROC-ESM + - CSIRO-Mk3-6-0 + - MPI-ESM-LR + - NorESM1-M + - CNRM-CM5 + - all + - GISS-E2-1-G + - 24models + - MPI-ESM1-2-HR + - CNRM-ESM2-1 + - CNRM-CM6-1 + - CanESM2 + - FGOALS-g3 + - NorESM1-ME + - IPSL-CM6A-LR + - CMCC-ESM2 + - pcic12 + - EC-Earth3-Veg-LR + - ACCESS-ESM1-5 + - MRI-CGCM3 + - MIROC-ESM-CHEM + - NorESM2-MM + - bcc-csm1-1-m + - BNU-ESM + - UKESM1-0-LL + - CESM1-CAM5 + - MIROC-ES2L + - MRI-ESM2-0 + - HadGEM2-ES + - MIROC6 + - MPI-ESM-MR + - INM-CM5-0 + - bcc-csm1-1 + - BCC-CSM2-MR + - ACCESS-CM2 + - NorESM2-LM + - IPSL-CM5A-LR + - FGOALS-g2 + - HadGEM2-AO + - 26models + - MPI-ESM1-2-LR + - KIOST-ESM + default: all +- id: thresh + type: + - 'null' + - string + default: 1.0 mm/day +- id: freq + type: + - 'null' + - type: enum + symbols: + - YS + - QS-DEC + - AS-JUL + - MS + default: YS +- id: op + type: + - 'null' + - type: enum + symbols: + - '>=' + - '>' + - gt + - ge + default: '>=' +- id: month + type: + - 'null' + - int + - type: array + items: int + inputBinding: + valueFrom: "\n ${\n const values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];\n if (Array.isArray(self)) {\n \n if (self.every(item => values.includes(item))) {\n return self;\n }\n else {\n throw \"invalid value(s) in [\" + self + \"] are not all allowed values from [\" + values + \"]\";\n }\n \n }\n else {\n \n if (values.includes(self)) {\n return self;\n }\n else {\n throw \"invalid value \" + self + \" is not an allowed value from [\" + values + \"]\";\n }\n \n }\n }\n " +- id: season + type: + - 'null' + - type: enum + symbols: + - SON + - MAM + - JJA + - DJF +- id: check_missing + type: + - 'null' + - type: enum + symbols: + - pct + - at_least_n + - wmo + - skip + - from_context + - any + default: any +- id: missing_options + type: + - 'null' + - File + format: iana:application/json +- id: cf_compliance + type: + - 'null' + - type: enum + symbols: + - raise + - log + - warn + default: warn +- id: data_validation + type: + - 'null' + - type: enum + symbols: + - raise + - log + - warn + default: raise +- id: output_name + type: + - 'null' + - string +- id: output_format + type: + - 'null' + - type: enum + symbols: + - csv + - netcdf + default: netcdf +- id: csv_precision + type: + - 'null' + - int +# NOTE: +# Following structure is permitted in standard CWL, but not supported in Weaver. +# Must use the equivalent 'long form' in the meantime. +#outputs: +#- id: output +# type: stdout +outputs: + - id: output + type: File + outputBinding: + glob: "stdout.log" +stdout: stdout.log +$namespaces: + iana: https://www.iana.org/assignments/media-types/ + edam: http://edamontology.org/ diff --git a/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/status.xml b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/status.xml new file mode 100644 index 000000000..504776f1e --- /dev/null +++ b/tests/functional/application-packages/Finch_EnsembleGridPointWetdays/status.xml @@ -0,0 +1,41 @@ + + + + {PROCESS_ID} + {PROCESS_ID} + {PROCESS_ID} + + + PyWPS Process {PROCESS_ID} finished + + + + + output + output + + + + output_log + output_log + + + + diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 453ff5bfd..87cfc3fe4 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -434,6 +434,19 @@ def clean_test_processes_iter_after(cls, process_info): def clean_test_processes(cls, allowed_codes=frozenset([HTTPOk.code, HTTPNotFound.code])): for process_info in cls.test_processes_info.values(): cls.clean_test_processes_iter_before(process_info) + + # if any job is still pending in the database, it can cause process delete to fail (forbidden, in use) + path = f"/processes/{process_info.id}/jobs" + resp = cls.request("GET", path, + headers=cls.headers, cookies=cls.cookies, + ignore_errors=True, log_enabled=False) + cls.assert_response(resp, allowed_codes, message="Failed cleanup of test processes jobs!") + for job in resp.json.get("jobs", []): + cls.request("DELETE", f"{path}/{job}", + headers=cls.headers, cookies=cls.cookies, + ignore_errors=True, log_enabled=False) + + # then clean the actual process path = f"/processes/{process_info.id}" resp = cls.request("DELETE", path, headers=cls.headers, cookies=cls.cookies, diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 589411db7..d0036eab6 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -37,6 +37,7 @@ mocked_file_server, mocked_http_file, mocked_reference_test_file, + mocked_remote_server_requests_wps1, mocked_sub_requests, mocked_wps_output, setup_aws_s3_bucket @@ -62,12 +63,14 @@ ) from weaver.processes.types import ProcessType from weaver.status import Status -from weaver.utils import fetch_file, get_any_value, load_file +from weaver.utils import fetch_file, get_any_value, get_path_kvp, load_file from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location if TYPE_CHECKING: from typing import List + from responses import RequestsMock + from weaver.typedefs import CWL_AnyRequirements, CWL_RequirementsDict, JSON, Number EDAM_PLAIN = f"{EDAM_NAMESPACE}:{EDAM_MAPPING[ContentType.TEXT_PLAIN]}" @@ -2955,12 +2958,18 @@ def test_deploy_literal_and_complex_io_from_wps_xml_reference(self): assert "default" not in pkg["outputs"][0] assert pkg["outputs"][0]["format"] == OGC_NETCDF assert pkg["outputs"][0]["type"] == "File" - assert pkg["outputs"][0]["outputBinding"]["glob"] == "output_netcdf/*.nc" + # NOTE: + # not using "glob: /*." anymore in **generated** CWL for remote WPS + # the package definition will consider the outputs as if generated relatively + # to the URL endpoint where the process runs + # it is only during *Workflow Steps* (when each result is staged locally) that output ID dir nesting + # is applied to resolve potential conflict/over-matching of files by globs is applied for local file-system. + assert pkg["outputs"][0]["outputBinding"]["glob"] == "*.nc" # output_netcdf/*.nc assert pkg["outputs"][1]["id"] == "output_log" assert "default" not in pkg["outputs"][1] assert pkg["outputs"][1]["format"] == EDAM_PLAIN assert pkg["outputs"][1]["type"] == "File" - assert pkg["outputs"][1]["outputBinding"]["glob"] == "output_log/*.*" + assert pkg["outputs"][1]["outputBinding"]["glob"] == "*.*" # "output_log/*.*" # process description I/O validation assert len(proc["inputs"]) == 2 @@ -3121,6 +3130,128 @@ def test_deploy_enum_array_and_multi_format_inputs_from_wps_xml_reference(self): def test_deploy_multi_outputs_file_from_wps_xml_reference(self): raise NotImplementedError + def test_execute_cwl_enum_schema_combined_type_single_array_from_cwl(self): + """ + Test that validates successful reuse of :term:`CWL` ``Enum`` within a list of types. + + .. code-block:: yaml + + input: + type: + - "null" + - type: enum + symbols: [A, B, C] + - type: array + items: + type: enum + symbols: [A, B, C] + + When the above definition is applied, :mod:`cwltool` and its underlying :mod:`schema_salad` utilities often + resulted in failed schema validation due to the reused :term:`CWL` ``Enum`` being detected as "*conflicting*" + by ``name`` auto-generated when parsing the tool definition. + + .. seealso:: + :func:`test_execute_cwl_enum_schema_combined_type_single_array_from_wps` + """ + proc = "Finch_EnsembleGridPointWetdays" + body = self.retrieve_payload(proc, "deploy", local=True) + pkg = self.retrieve_payload(proc, "package", local=True) + body["executionUnit"] = [{"unit": pkg}] + body["processDescription"]["process"]["id"] = self._testMethodName + self.deploy_process(body, describe_schema=ProcessSchema.OGC) + + data = self.retrieve_payload(proc, "execute", local=True) + exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": data, + } + with contextlib.ExitStack() as stack: + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/{self._testMethodName}/jobs" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + + status_url = resp.json["location"] + results = self.monitor_job(status_url) + + assert results + + @mocked_remote_server_requests_wps1([ + resources.TEST_REMOTE_SERVER_URL, + resources.TEST_REMOTE_SERVER_WPS1_GETCAP_XML, + { + "Finch_EnsembleGridPointWetdays": os.path.join( + resources.FUNCTIONAL_APP_PKG, + "Finch_EnsembleGridPointWetdays/describe.xml" + ) + }, + ]) + def test_execute_cwl_enum_schema_combined_type_single_array_from_wps(self, mock_responses): + # type: (RequestsMock) -> None + """ + Test that validates successful reuse of :term:`CWL` ``Enum`` within a list of types. + + In this case, the :term:`CWL` ``Enum`` combining single-value reference and array of ``Enum`` should be + automatically generated from the corresponding :term:`WPS` I/O descriptions. + + .. seealso:: + :func:`test_execute_cwl_enum_schema_combined_type_single_array_from_cwl` + """ + proc = "Finch_EnsembleGridPointWetdays" + body = self.retrieve_payload(proc, "deploy", local=True) + wps = get_path_kvp( + resources.TEST_REMOTE_SERVER_URL, + service="WPS", + request="DescribeProcess", + identifier=proc, + version="1.0.0" + ) + body["executionUnit"] = [{"href": wps}] + body["processDescription"]["process"]["id"] = self._testMethodName + self.deploy_process(body, describe_schema=ProcessSchema.OGC) + + data = self.retrieve_payload(proc, "execute", local=True) + exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": data, + } + status_path = os.path.join(resources.FUNCTIONAL_APP_PKG, "Finch_EnsembleGridPointWetdays/status.xml") + status_url = f"{resources.TEST_REMOTE_SERVER_URL}/status.xml" + output_log_url = f"{resources.TEST_REMOTE_SERVER_URL}/output.txt" + output_zip_url = f"{resources.TEST_REMOTE_SERVER_URL}/output.zip" + with open(status_path, mode="r", encoding="utf-8") as status_file: + status_body = status_file.read().format( + TEST_SERVER_URL=resources.TEST_REMOTE_SERVER_URL, + PROCESS_ID=proc, + LOCATION_XML=status_url, + OUTPUT_FILE_URL=output_zip_url, + OUTPUT_LOG_FILE_URL=output_log_url, + ) + + with contextlib.ExitStack() as stack: + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + + # mock responses expected by "remote" WPS-1 Execute request and relevant documents + mock_responses.add("POST", resources.TEST_REMOTE_SERVER_URL, body=status_body, headers=self.xml_headers) + mock_responses.add("GET", status_url, body=status_body, headers=self.xml_headers) + mock_responses.add("GET", output_log_url, body="log", headers={"Content-Type": ContentType.TEXT_PLAIN}) + mock_responses.add("GET", output_zip_url, body="zip", headers={"Content-Type": ContentType.APP_ZIP}) + + proc_url = f"/processes/{self._testMethodName}/jobs" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + + status_url = resp.json["location"] + results = self.monitor_job(status_url) + + assert results + @pytest.mark.functional class WpsPackageAppWithS3BucketTest(WpsConfigBase, ResourcesUtil): diff --git a/tests/functional/test_wps_provider.py b/tests/functional/test_wps_provider.py index 252880299..143404517 100644 --- a/tests/functional/test_wps_provider.py +++ b/tests/functional/test_wps_provider.py @@ -209,7 +209,6 @@ def test_register_describe_execute_ncdump(self, mock_responses): OUTPUT_FILE=output_url, ) - xml_headers = {"Content-Type": ContentType.TEXT_XML} ncdump_data = "Fake NetCDF Data" with contextlib.ExitStack() as stack_exec: # mock direct execution bypassing celery @@ -217,8 +216,8 @@ def test_register_describe_execute_ncdump(self, mock_responses): stack_exec.enter_context(mock_exec) # mock responses expected by "remote" WPS-1 Execute request and relevant documents mock_responses.add("GET", exec_file, body=ncdump_data, headers={"Content-Type": ContentType.APP_NETCDF}) - mock_responses.add("POST", resources.TEST_REMOTE_SERVER_URL, body=status, headers=xml_headers) - mock_responses.add("GET", status_url, body=status, headers=xml_headers) + mock_responses.add("POST", resources.TEST_REMOTE_SERVER_URL, body=status, headers=self.xml_headers) + mock_responses.add("GET", status_url, body=status, headers=self.xml_headers) mock_responses.add("GET", output_url, body=ncdump_data, headers={"Content-Type": ContentType.TEXT_PLAIN}) # add reference to specific provider execute class to validate it was called diff --git a/tests/functional/utils.py b/tests/functional/utils.py index e77657401..c1be24ed0 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -306,6 +306,7 @@ def assert_equal_with_jobs_diffs(self, @pytest.mark.functional class WpsConfigBase(unittest.TestCase): json_headers = {"Accept": ContentType.APP_JSON, "Content-Type": ContentType.APP_JSON} + xml_headers = {"Content-Type": ContentType.TEXT_XML} monitor_timeout = 30 monitor_interval = 1 settings = {} # type: SettingsType diff --git a/tests/processes/test_convert.py b/tests/processes/test_convert.py index 0f8f1601a..0bf40cf8b 100644 --- a/tests/processes/test_convert.py +++ b/tests/processes/test_convert.py @@ -1,8 +1,6 @@ """ Unit tests of functions within :mod:`weaver.processes.convert`. """ -import copy - # pylint: disable=R1729 # ignore non-generator representation employed for displaying test log results import json @@ -25,7 +23,15 @@ from tests import resources from tests.utils import MockedResponse, assert_equal_any_order, mocked_remote_server_requests_wps1 from weaver.exceptions import PackageTypeError -from weaver.formats import IANA_NAMESPACE_DEFINITION, OGC_MAPPING, OGC_NAMESPACE_DEFINITION, ContentType +from weaver.formats import ( + EDAM_MAPPING, + EDAM_NAMESPACE, + IANA_NAMESPACE, + IANA_NAMESPACE_DEFINITION, + OGC_MAPPING, + OGC_NAMESPACE_DEFINITION, + ContentType +) from weaver.processes.constants import ( CWL_REQUIREMENT_APP_OGC_API, CWL_REQUIREMENT_APP_WPS1, @@ -40,6 +46,7 @@ ProcessSchema ) from weaver.processes.convert import _are_different_and_set # noqa: W0212 +from weaver.processes.convert import _convert_any2cwl_io_complex # noqa: W0212 from weaver.processes.convert import _get_cwl_js_value_from # noqa: W0212 from weaver.processes.convert import ( DEFAULT_FORMAT, @@ -132,6 +139,99 @@ def test_are_different_and_set_single_null(): assert _are_different_and_set(null, item) is False +@pytest.mark.parametrize( + ["wps_io", "cwl_io_expect"], + [ + ( + { + "id": "output", + "formats": [ + {"mimeType": ContentType.APP_JSON, "encoding": None, "default": True}, + ] + }, + { + "id": "output", + "type": "File", + "format": f"{IANA_NAMESPACE}:{ContentType.APP_JSON}", + "outputBinding": { + "glob": "*.json" + } + } + ), + ( + { + "id": "output", + "formats": [ + {"mimeType": ContentType.TEXT_PLAIN, "encoding": None, "default": True}, + ] + }, + { + "id": "output", + "type": "File", + "format": f"{EDAM_NAMESPACE}:{EDAM_MAPPING[ContentType.TEXT_PLAIN]}", + "outputBinding": { + "glob": "*.*" # *.txt replaced by *.* since anything can be text/plain + } + } + ), + ( + { + "id": "output", + "formats": [ + {"mimeType": ContentType.TEXT_PLAIN, "encoding": None, "default": True}, + {"mimeType": ContentType.APP_JSON, "encoding": None, "default": True}, + ] + }, + { + "id": "output", + "type": "File", + "outputBinding": { + "glob": "*.*" # *.txt replaced by *.* since anything can be text/plain, including JSON + } + } + ), + ( + { + "id": "output", + "formats": [ + {"mimeType": ContentType.APP_XML, "encoding": "base64", "default": True}, + {"mimeType": ContentType.APP_XML, "encoding": None, "default": True}, + ] + }, + { + "id": "output", + "type": "File", + "format": f"{IANA_NAMESPACE}:{ContentType.APP_XML}", + "outputBinding": { + "glob": "*.xml" + } + } + ), + ( + { + "id": "output", + "formats": [ + {"mimeType": ContentType.APP_NETCDF, "encoding": "base64", "default": True}, + {"mimeType": ContentType.APP_ZIP, "encoding": "base64", "default": False} + ] + }, + { + "id": "output", + "type": "File", + # no "format" since more than one, CWL does not support many + "outputBinding": { + "glob": ["*.nc", "*.zip"] + } + } + ) + ] +) +def test_convert_any2cwl_io_complex(wps_io, cwl_io_expect): + cwl_io = {"id": wps_io["id"]} + _convert_any2cwl_io_complex(cwl_io, {}, wps_io, IO_OUTPUT) + assert cwl_io == cwl_io_expect + + def test_any2cwl_io_from_wps(): fmt = Format(ContentType.APP_NETCDF) wps_io = ComplexInput("test", "", supported_formats=[fmt], data_format=fmt) @@ -781,7 +881,7 @@ def test_get_cwl_io_type_unmodified(io_info, io_def): .. seealso:: - https://github.com/crim-ca/weaver/pull/546 """ - io_copy = copy.deepcopy(io_info) + io_copy = deepcopy(io_info) io_res = get_cwl_io_type(io_info) assert io_res == io_def assert io_info == io_copy, "Argument I/O information should not be modified from parsing." @@ -1829,8 +1929,12 @@ def test_ogcapi2cwl_process_without_extra(): "in-file": {"type": "File", "format": f"iana:{ContentType.APP_JSON}"}, }, "outputs": { - "output": {"type": "File", "format": "ogc:geotiff", - "outputBinding": {"glob": "output/*.tiff"}}, + "output": { + "type": "File", "format": "ogc:geotiff", + "outputBinding": { + "glob": "*.tiff" # "output/*.tiff" only during Workflow step execution + } + }, }, "$namespaces": cwl_ns } diff --git a/tests/processes/test_wps_package.py b/tests/processes/test_wps_package.py index a8ad1a9aa..a83a3a799 100644 --- a/tests/processes/test_wps_package.py +++ b/tests/processes/test_wps_package.py @@ -7,17 +7,22 @@ import contextlib import copy import io +import json import logging import os import re import shutil import sys import tempfile +import warnings from typing import TYPE_CHECKING import cwltool.process import mock import pytest +from _pytest.outcomes import Failed +from cwltool.errors import WorkflowException +from cwltool.factory import Factory as CWLFactory from tests.utils import assert_equal_any_order from weaver.datatype import Process @@ -486,3 +491,71 @@ def test_cwl_extension_requirements_no_error(): f"Error message must contain all following items: {valid_msg}. " f"Some items were missing in: \n{message}" ) + + +def test_cwl_enum_schema_name_patched(): + """ + Ensure that :term:`CWL` ``Enum`` contains a ``name`` to avoid false-positive conflicting schemas. + + When an ``Enum`` is reused multiple times to define an I/O, omitting the ``name`` makes the duplicate definition + to be considered a conflict, since :mod:`cwltool` will automatically apply an auto-generated ``name`` for that + schema. + + .. seealso:: + - https://github.com/common-workflow-language/cwltool/issues/1908 + - :meth:`weaver.processes.wps_package.WpsPackage.update_cwl_schema_names` + """ + test_symbols = [str(i) for i in range(100)] + cwl_input_without_name = { + "type": [ + "null", + { + "type": "enum", + "symbols": test_symbols, + }, + { + "type": "array", + "items": { + "type": "enum", + "symbols": test_symbols, + }, + }, + ] + } + cwl_without_name = { + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "baseCommand": "echo", + "inputs": { + "test": cwl_input_without_name, + }, + "outputs": { + "output": {"type": "stdout"}, + } + } # type: CWL + + factory = CWLFactory() + with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as tmp_file: + json.dump(cwl_without_name, tmp_file) + tmp_file.flush() + try: + with pytest.raises(WorkflowException): + tool = factory.make(f"file://{tmp_file.name}") + tool(test=test_symbols[0]) + except Failed: + # WARNING: + # CWL tool schema-salad validator seems to inconsistently raise in some situations and not others (?) + # (see https://github.com/common-workflow-language/cwltool/issues/1908) + # Ignore if it raises since it is not breaking for our test and implementation. + warnings.warn("CWL nested enums without 'name' did not raise, but not breaking...") + + # our implementation that eventually gets called goes through 'update_cwl_schema_names', that one MUST NOT raise + pkg = WpsPackage(package=cwl_without_name, identifier="test", title="test") + pkg.update_cwl_schema_names() + with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as tmp_file: + json.dump(pkg.package, tmp_file) + tmp_file.flush() + tool = factory.make(f"file://{tmp_file.name}") + tool(test=None) + tool(test=test_symbols[0]) + tool(test=[test_symbols[0]]) diff --git a/tests/resources/__init__.py b/tests/resources/__init__.py index 00c6968cf..5e4357b10 100644 --- a/tests/resources/__init__.py +++ b/tests/resources/__init__.py @@ -11,6 +11,7 @@ RESOURCES_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "")) EXAMPLES_PATH = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/examples") +FUNCTIONAL_APP_PKG = os.path.abspath(os.path.join(RESOURCES_PATH, "../functional/application-packages")) GET_CAPABILITIES_TEMPLATE_URL = "{}?service=WPS&request=GetCapabilities&version=1.0.0" DESCRIBE_PROCESS_TEMPLATE_URL = "{}?service=WPS&request=DescribeProcess&identifier={}&version=1.0.0" diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 9016e0cb0..360201116 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -122,14 +122,14 @@ AnySettingsContainer, AnyValueType, CWL, - CWL_Input_Type, CWL_IO_ComplexType, CWL_IO_DataType, CWL_IO_EnumSymbols, CWL_IO_FileValue, CWL_IO_LiteralType, + CWL_IO_Type, CWL_IO_Value, - CWL_Output_Type, + CWL_SchemaNames, ExecutionInputs, ExecutionInputsList, ExecutionInputsMap, @@ -176,7 +176,6 @@ "supported_formats": NotRequired[List[JSON_Format]], }, total=False) JSON_IO_ListOrMap = Union[List[JSON], Dict[str, Union[JSON, str]]] - CWL_IO_Type = Union[CWL_Input_Type, CWL_Output_Type] PKG_IO_Type = Union[JSON_IO_Type, WPS_IO_Type] ANY_IO_Type = Union[CWL_IO_Type, JSON_IO_Type, WPS_IO_Type, OWS_IO_Type] ANY_Format_Type = Union[Dict[str, Optional[str]], Format] @@ -514,6 +513,10 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select): """ Converts the :term:`WPS`-like I/O definition and defines them inplace into the :term:`CWL` containers. + .. seealso:: + See :meth:`weaver.processes.wps_process_base.WpsProcessInterface.stage_results` which closely interacts + with the produced ``outputBinding.glob`` patterns generated here. Methodology should align between them. + :param cwl_io: Basic :term:`CWL` I/O container (only ID needed) where to write conversion results. :param cwl_ns: Namespaces to gradually update when encountering new format Media-Type definitions. :param wps_io: Original :term:`WPS`-like I/O to be converted. @@ -521,7 +524,7 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select): :return: Nothing. Changed inplace. """ cwl_io_fmt = None - cwl_io_ext = ContentType.ANY + cwl_io_ext = get_extension(ContentType.ANY) cwl_io["type"] = PACKAGE_FILE_TYPE cwl_id = cwl_io["id"] @@ -529,58 +532,78 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select): # outputs are allowed to define only one 'applied' format for field in WPS_FIELD_FORMAT: fmt = get_field(wps_io, field, search_variations=True) - if isinstance(fmt, dict): + if not fmt: + continue + if isinstance(fmt, (list, tuple)) and len(fmt) == 1: + fmt = fmt[0] + if not isinstance(fmt, (list, tuple)): # could be 'dict', 'Format' or any other 'object' holder cwl_io_ref, cwl_io_fmt, cwl_io_ext = _get_cwl_fmt_details(fmt) if cwl_io_ref and cwl_io_fmt: cwl_ns.update(cwl_io_ref) break if isinstance(fmt, (list, tuple)): - if len(fmt) == 1: - cwl_io_ref, cwl_io_fmt, cwl_io_ext = _get_cwl_fmt_details(fmt[0]) - if cwl_io_ref and cwl_io_fmt: - cwl_ns.update(cwl_io_ref) - break - if io_select == IO_OUTPUT and len(fmt) > 1: - break # don't use any format because we cannot enforce one cwl_ns_multi = {} - cwl_fmt_multi = [] + cwl_fmt_multi = {} # use dict as ordered set + cwl_ext_multi = {} # use dict as ordered set for fmt_i in fmt: # FIXME: (?) # when multiple formats are specified, but at least one schema/namespace reference can't be found, # we must drop all since that unknown format is still allowed but cannot be validated # avoid potential validation error if that format was the one provided during execute... # (see: https://github.com/crim-ca/weaver/issues/50) - cwl_io_ref_i, cwl_io_fmt_i, _ = _get_cwl_fmt_details(fmt_i) - if cwl_io_ref_i and cwl_io_fmt_i: + cwl_io_ref_i, cwl_io_fmt_i, cwl_io_ext = _get_cwl_fmt_details(fmt_i) + if cwl_io_ref_i and cwl_io_fmt_i: # if any known format was resolved cwl_ns_multi.update(cwl_io_ref_i) - cwl_fmt_multi.append(cwl_io_fmt_i) + cwl_fmt_multi.update({cwl_io_fmt_i: None}) + cwl_ext_multi.update({cwl_io_ext: None}) else: # reset all since at least one format could not be mapped to an official schema cwl_ns_multi = {} cwl_fmt_multi = None break cwl_io_fmt = cwl_fmt_multi # all formats or none of them + cwl_io_ext = cwl_ext_multi cwl_ns.update(cwl_ns_multi) break + + cwl_io_ext = [cwl_io_ext] if isinstance(cwl_io_ext, str) else list(cwl_io_ext) if cwl_io_fmt: - cwl_io["format"] = cwl_io_fmt - # for backward compatibility with deployed processes, consider text/plan as 'any' for glob pattern - cwl_io_txt = get_extension(ContentType.TEXT_PLAIN) - if cwl_io_ext == cwl_io_txt: - cwl_io_any = get_extension(ContentType.ANY) - LOGGER.warning("Replacing '%s' [%s] to generic '%s' [%s] glob pattern. " - "More explicit format could be considered for %s '%s'.", - ContentType.TEXT_PLAIN, cwl_io_txt, ContentType.ANY, cwl_io_any, io_select, cwl_id) - cwl_io_ext = cwl_io_any + # don't use any format if more than one because we cannot enforce multiple formats + # ('format' must be string: https://www.commonwl.org/v1.2/CommandLineTool.html#File) + if not isinstance(cwl_io_fmt, str) and len(cwl_io_fmt) == 1: + cwl_io["format"] = list(cwl_io_fmt)[0] + if isinstance(cwl_io_fmt, str): + cwl_io["format"] = cwl_io_fmt + if io_select == IO_OUTPUT: + # for backward compatibility with deployed processes, consider text/plan as 'any' for glob pattern + cwl_io_txt = get_extension(ContentType.TEXT_PLAIN) + if cwl_io_txt in cwl_io_ext: + cwl_io_any = get_extension(ContentType.ANY) + LOGGER.warning("Replacing '%s' [%s] to generic '%s' [%s] glob pattern from resolved formats %s. " + "More explicit format media-type should be considered for %s '%s'.", + ContentType.TEXT_PLAIN, cwl_io_txt, cwl_io_ext, + ContentType.ANY, cwl_io_any, io_select, cwl_id) + cwl_io_ext = [cwl_io_any] + # Method 'weaver.processes.wps_process_base.WpsProcessInterface.stage_results' uses the produced glob - # pattern below of generated output definitions from WPS items that don't offer any hint about the expected - # file naming format or specification. - # Require that the file is nested in a directory named as the output ID (to isolate against conflict by - # other outputs) and has the expected extension based on the file format/schema/media-type. - # Any character can be employed for the file name within the sub-dir as generated by the remote process. + # pattern(s) below of generated output definitions from WPS items that don't offer any hint about the + # expected file naming format or specification (because we cannot guess what will be produced as output + # from the remote process definitions alone). We can only provide expected extension based on the file + # format/schema/media-type of the output definition. + # To avoid potential naming clashes or conflicting matching from generic patterns when CWL tries to resolve + # paths, that staging operation stage outputs and adjust each glob pattern under a directory named by the + # respective output ID. + # However, it is very important **NOT** to add the output ID directory nesting approach here, otherwise it + # will confuse the staging process between Workflow steps, since it won't be able to distinguish whether the + # nesting was already applied by Weaver (here), or provided by an user-provided CWL Application Package, since + # WPS-based. OGC-based, CWL-based, (or any future implementation) can be combined within a same Workflow. + cwl_glob = [ + f"*{ext}" if ext != "/" else "./" # handle special case of "extension" for 'Directory' type + for ext in cwl_io_ext + ] cwl_io["outputBinding"] = { - "glob": f"{cwl_id}/*{cwl_io_ext}" + "glob": cwl_glob[0] if len(cwl_glob) == 1 else cwl_glob } @@ -1175,6 +1198,34 @@ def get_cwl_io_type_name(io_type): return io_type +def resolve_cwl_io_type_schema(io_info, cwl_schema_names=None): + # type: (CWL_IO_Type, Optional[CWL_SchemaNames]) -> CWL_IO_Type + """ + Reverse :term:`CWL` schema references by name back to their full :term:`CWL` I/O definition. + + .. seealso:: + - :meth:`weaver.processes.wps_package.WpsPackage.make_inputs` + - :meth:`weaver.processes.wps_package.WpsPackage.update_cwl_schema_names` + """ + if not isinstance(io_info, dict) or not cwl_schema_names: + return get_cwl_io_type_name(io_info) + io_type = io_info.get("type") + io_item = io_info.get("items") + if io_type == PACKAGE_ARRAY_BASE and isinstance(io_item, str): + io_info = io_info.copy() # avoid undoing CWL tool parsing/resolution + io_name = get_cwl_io_type_name(io_item) # avoid mapping back to File/Directory records in CWL schema names + if io_name in cwl_schema_names: + io_name = cwl_schema_names[io_item]._props + io_info["items"] = io_name + elif isinstance(io_type, str): + io_info = io_info.copy() # avoid undoing CWL tool parsing/resolution + io_name = get_cwl_io_type_name(io_type) # avoid mapping back to File/Directory records in CWL schema names + if io_name in cwl_schema_names: + io_name = cwl_schema_names[io_type]._props + io_info["type"] = io_name + return io_info + + @dataclass class CWLIODefinition(object): """ @@ -1272,8 +1323,8 @@ def __iter__(self): """ -def get_cwl_io_type(io_info, strict=True): - # type: (CWL_IO_Type, bool) -> CWLIODefinition +def get_cwl_io_type(io_info, strict=True, cwl_schema_names=None): + # type: (CWL_IO_Type, bool, Optional[CWL_SchemaNames]) -> CWLIODefinition """ Obtains the basic type of the CWL input and identity if it is optional. @@ -1290,6 +1341,7 @@ def get_cwl_io_type(io_info, strict=True): :param io_info: :term:`CWL` definition to parse. :param strict: Indicates if only pure :term:`CWL` definition is allowed, or allow implicit data-type conversions. + :param cwl_schema_names: Mapping of CWL type schema references to resolve in long form if used in a definition. :return: tuple of guessed base type and flag indicating if it can be null (optional input). """ io_type = get_cwl_io_type_name(io_info["type"]) @@ -1315,7 +1367,7 @@ def get_cwl_io_type(io_info, strict=True): io_type_many = set() io_base_type = None for i, typ in enumerate(io_type, start=int(is_null)): - typ = get_cwl_io_type_name(typ) + typ = resolve_cwl_io_type_schema(typ, cwl_schema_names) io_name = io_info["name"] sub_type = {"type": typ, "name": f"{io_name}[{i}]"} # type: CWL_IO_Type array_io_def = parse_cwl_array_type(sub_type, strict=strict) diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index e5eb9d93a..49562b1a0 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -62,7 +62,6 @@ def __init__(self, # following are defined after 'prepare' step self.wps_provider = None # type: Optional[WebProcessingService] self.wps_process = None # type: Optional[ProcessOWS] - self.stage_output_id_nested = True super(Wps1Process, self).__init__( request, lambda _message, _progress, _status, *args, **kwargs: update_status( diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 0ba7b9d74..5f790b41d 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -143,25 +143,21 @@ from weaver.datatype import Authentication, Job from weaver.processes.constants import IO_Select_Type - from weaver.processes.convert import ( - ANY_IO_Type, - CWL_Input_Type, - JSON_IO_Type, - PKG_IO_Type, - WPS_Input_Type, - WPS_Output_Type - ) + from weaver.processes.convert import ANY_IO_Type, JSON_IO_Type, PKG_IO_Type, WPS_Input_Type, WPS_Output_Type from weaver.status import AnyStatusType from weaver.typedefs import ( AnyHeadersContainer, AnyValueType, CWL, CWL_AnyRequirements, + CWL_Input_Type, CWL_IO_ComplexType, + CWL_IO_Type, CWL_Requirement, CWL_RequirementNames, CWL_RequirementsDict, CWL_Results, + CWL_SchemaNames, CWL_ToolPathObject, CWL_WorkflowStepPackage, CWL_WorkflowStepPackageMap, @@ -1542,6 +1538,48 @@ def update_requirements(self): if self.package.get("baseCommand") == "python": self.package["baseCommand"] = os.path.join(active_python_path, "python") + def update_cwl_schema_names(self): + # type: () -> None + """ + Detect duplicate :term:`CWL` schema types not referred by name to provide one and avoid resolution failure. + + Doing this resolution avoids reused definitions being considered as "conflicts" because of missing ``name``. + To avoid introducing a real conflict, names are injected only under corresponding :term:`CWL` I/O by ID. + The most common type of definition resolve this way is when :term:`CWL` ``Enum`` is reused for single and + array-based definitions simultaneously. + + .. seealso:: + - :func:`weaver.processes.convert.resolve_cwl_io_type_schema` + - :meth:`weaver.processes.wps_package.WpsPackage.make_inputs` + + .. fixme:: + Workaround for https://github.com/common-workflow-language/cwltool/issues/1908. + """ + for io_select in ["inputs", "outputs"]: + if isinstance(self.package[io_select], dict): + io_items = self.package[io_select] # type: Dict[str, CWL_IO_Type] + else: + io_items = {item["id"]: item for item in self.package[io_select]} # type: Dict[str, CWL_IO_Type] + for io_name, io_def in io_items.items(): + if isinstance(io_def["type"], list): + item_enum = None + array_enum = None + for io_item in io_def["type"]: + if not isinstance(io_item, dict): + continue + if io_item.get("type") == "enum": + item_enum = io_item + continue + if io_item.get("type") != "array": + continue + if not isinstance(io_item.get("items", {}), dict): + continue + if io_item["items"].get("type") == "enum": + array_enum = io_item["items"] + # only apply the name reference if not already provided (eg: explicit name defined in original CWL) + if item_enum and array_enum and item_enum == array_enum and "name" not in item_enum: + item_enum["name"] = array_enum["name"] = f"{io_name}{uuid.uuid4()}" + def update_effective_user(self): # type: () -> None """ @@ -1732,6 +1770,7 @@ def _handler(self, request, response): self.update_effective_user() self.update_requirements() + self.update_cwl_schema_names() runtime_params = self.setup_runtime() self.logger.debug("Using cwltool.RuntimeContext args:\n%s", json.dumps(runtime_params, indent=2)) @@ -1767,7 +1806,8 @@ def _handler(self, request, response): eoimage_data_sources, accept_mime_types, settings=self.settings) - cwl_inputs = self.make_inputs(request.inputs, cwl_inputs_info) + cwl_schema_refs = package_inst.t.names.names + cwl_inputs = self.make_inputs(request.inputs, cwl_inputs_info, cwl_schema_refs) self.update_status("Convert package inputs done.", PACKAGE_PROGRESS_CONVERT_INPUT, Status.RUNNING) except PackageException as exc: raise self.exception_message(type(exc), None, str(exc)) # re-raise as is, but with extra log entry @@ -1838,6 +1878,7 @@ def must_fetch(self, input_ref, input_type): def make_inputs(self, wps_inputs, # type: Dict[str, Deque[WPS_Input_Type]] cwl_inputs_info, # type: Dict[str, CWL_Input_Type] + cwl_schema_names, # type: CWL_SchemaNames ): # type: (...) -> Dict[str, ValueType] """ Converts :term:`WPS` input values to corresponding :term:`CWL` input values for processing by the package. @@ -1848,6 +1889,7 @@ def make_inputs(self, :param wps_inputs: Actual :term:`WPS` inputs parsed from execution request. :param cwl_inputs_info: Expected CWL input definitions for mapping. + :param cwl_schema_names: Mapping of CWL type schema references to resolve 'type: ' if used in a definition. :return: :term:`CWL` input values. """ cwl_inputs = {} @@ -1859,7 +1901,7 @@ def make_inputs(self, # process single occurrences input_i = input_occurs[0] # handle as reference/data - io_def = get_cwl_io_type(cwl_inputs_info[input_id]) + io_def = get_cwl_io_type(cwl_inputs_info[input_id], cwl_schema_names=cwl_schema_names) if isinstance(input_i, ComplexInput) or io_def.type in PACKAGE_COMPLEX_TYPES: # extend array data that allow max_occur > 1 # drop invalid inputs returned as None diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 92f5b4bf3..1b7e3dab7 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -96,7 +96,6 @@ def __init__(self, request, update_status): self.settings = get_settings() self.update_status = update_status # type: UpdateStatusPartialFunction self.temp_staging = set() - self.stage_output_id_nested = False def execute(self, workflow_inputs, out_dir, expected_outputs): # type: (CWL_RuntimeInputsMap, str, CWL_ExpectedOutputs) -> None @@ -329,18 +328,10 @@ def stage_results(self, results, expected_outputs, out_dir): We cannot rely on specific file names to be mapped, since glob can match many (eg: ``"*.txt"``). .. seealso:: - Function :func:`weaver.processes.convert.any2cwl_io` defines a generic glob pattern using the output ID - and expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process` - doesn't necessarily produces file names with the output ID as expected to find them (could be anything), + Function :func:`weaver.processes.convert._convert_any2cwl_io_complex` defines a generic glob pattern from + the expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process` + doesn't necessarily produce file names with the output ID as expected to find them (could be anything), staging must patch locations to let :term:`CWL` runtime resolve the files according to glob definitions. - - .. warning:: - Only remote :term:`Provider` implementations (which auto-generate a pseudo :term:`CWL` to map components) - that produce outputs with inconsistent file names as described above should set attribute - :attr:`WpsProcessInterface.stage_output_id_nested` accordingly. For :term:`Process` that directly provide - an actual :term:`CWL` :term:`Application Package` definition (e.g.: Docker application), auto-mapping - of glob patterns should be avoided, as it is expected that the :term:`CWL` contains real mapping to be - respected for correct execution and retrieval of outputs from the application. """ for result in results: res_id = get_any_id(result) @@ -351,10 +342,7 @@ def stage_results(self, results, expected_outputs, out_dir): result_values = get_any_value(result) if not isinstance(result_values, list): result_values = [result_values] - if self.stage_output_id_nested: - cwl_out_dir = "/".join([out_dir.rstrip("/"), res_id]) - else: - cwl_out_dir = out_dir.rstrip("/") + cwl_out_dir = "/".join([out_dir.rstrip("/"), res_id]) os.makedirs(cwl_out_dir, mode=0o700, exist_ok=True) for value in result_values: src_name = value.split("/")[-1] diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index efd21b299..6478288e5 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -186,12 +186,32 @@ def collect_output( To let :term:`CWL` :term:`Workflow` inter-steps mapping work as intended, we must remap the locations ignoring any nested dirs where the modified *outputBindings* definition will be able to match as if each step :term:`Process` outputs were generated locally. + + .. note:: + Because the staging operation following remote :term:`Process` execution nests each output under a directory + name matching respective output IDs, globs must be update with that modified nested directory as well. + + .. seealso:: + :meth:`weaver.processes.wps_process_base.WpsProcessInterface.stage_results` """ if "outputBinding" in schema and "glob" in schema["outputBinding"]: - # in case of Directory collection with '/', use '.' because cwltool replaces it by the outdir glob = schema["outputBinding"]["glob"] - glob = os.path.split(glob)[-1] or "." - schema["outputBinding"]["glob"] = glob + glob_list = isinstance(glob, list) + glob = glob if isinstance(glob, list) else [glob] + out_id = schema["id"].rsplit("#", 1)[-1] + glob_spec = [] + for glob_item in glob: + if glob_item.startswith(outdir): + # CWL allows outputBinding to have relative or absolute starting with outdir. + # Anything else should be forbidden by the validator. + # (see ``glob`` under https://www.commonwl.org/v1.2/CommandLineTool.html#CommandOutputBinding) + # glob = outdir -> '.', which is identical to what CWL '//.' expects for a dir entry + glob_item = os.path.relpath(glob_item, outdir) + # if the glob had additional directory nesting, we must remove them, because the staging result + # operation would have brought output file/dir back under the respective dir named by output ID + glob_item = os.path.split(glob_item)[-1] or "." + glob_spec.append(os.path.join(out_id, glob_item)) + schema["outputBinding"]["glob"] = glob_spec if glob_list else glob_spec[0] output = super(WpsWorkflow, self).collect_output( schema, builder, @@ -199,7 +219,7 @@ def collect_output( fs_access, compute_checksum=compute_checksum, ) - return output or {} + return output class WpsWorkflowJob(CommandLineJob): @@ -234,7 +254,7 @@ def __init__(self, for glob in glob_spec if glob_list else [glob_spec]: # in case of Directory collection with '/', use '.' because cwltool replaces it by the outdir out_glob = glob.split("/")[-1] or "." - out_glob = f"{output_id}/{out_glob}" if self.wps_process.stage_output_id_nested else out_glob + out_glob = f"{output_id}/{out_glob}" out_globs.add(out_glob) self.expected_outputs[output_id] = out_globs if glob_list else list(out_globs)[0] diff --git a/weaver/typedefs.py b/weaver/typedefs.py index e4ed88825..b1ad7d987 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -167,6 +167,13 @@ }, total=False) CWL_Inputs = Union[List[CWL_Input_Type], Dict[str, CWL_Input_Type]] CWL_Outputs = Union[List[CWL_Output_Type], Dict[str, CWL_Output_Type]] + CWL_IO_Type = Union[CWL_Input_Type, CWL_Output_Type] + + class CWL_SchemaName(Protocol): + name: str + _props: CWL_IO_Type + + CWL_SchemaNames = Dict[str, CWL_SchemaName] # 'requirements' includes 'hints' CWL_Requirement = TypedDict("CWL_Requirement", { diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index fb4b4adca..57acf52f2 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -4619,16 +4619,28 @@ class CWLTypeSymbols(ExtendedSequenceSchema): symbol = CWLTypeSymbolValues() -class CWLTypeArray(ExtendedMappingSchema): - type = ExtendedSchemaNode(String(), example=PACKAGE_ARRAY_BASE, validator=OneOf([PACKAGE_ARRAY_BASE])) - items = CWLTypeString(title="CWLTypeArrayItems", validator=OneOf(PACKAGE_ARRAY_ITEMS)) - - class CWLTypeEnum(ExtendedMappingSchema): type = ExtendedSchemaNode(String(), example=PACKAGE_ENUM_BASE, validator=OneOf(PACKAGE_CUSTOM_TYPES)) symbols = CWLTypeSymbols(summary="Allowed values composing the enum.") +class CWLTypeArrayItemObject(ExtendedMappingSchema): + type = CWLTypeString(validator=OneOf(PACKAGE_ARRAY_ITEMS - PACKAGE_CUSTOM_TYPES)) + + +class CWLTypeArrayItems(OneOfKeywordSchema): + _one_of = [ + CWLTypeString(title="CWLTypeArrayItemsString", validator=OneOf(PACKAGE_ARRAY_ITEMS)), + CWLTypeEnum(summary="CWL type as enum of values."), + CWLTypeArrayItemObject(summary="CWL type in nested object definition."), + ] + + +class CWLTypeArray(ExtendedMappingSchema): + type = ExtendedSchemaNode(String(), example=PACKAGE_ARRAY_BASE, validator=OneOf([PACKAGE_ARRAY_BASE])) + items = CWLTypeArrayItems() + + class CWLTypeBase(OneOfKeywordSchema): _one_of = [ CWLTypeString(summary="CWL type as literal value."),