From 52720e267d53d384498e9ddeb3ee61b4c5191bcf Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Thu, 18 Aug 2022 23:52:57 -0400 Subject: [PATCH 1/8] =?UTF-8?q?[WIP]=C2=A0workflows=20ScatterFeatureRequir?= =?UTF-8?q?ement=20(relates=20to=20#105)=20+=20func=20workflow=20tests=20(?= =?UTF-8?q?relates=20to=20#11)=20+=20OGC=20Media-Types=20format=20for=20CW?= =?UTF-8?q?L?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.rst | 7 ++- .../DockerNetCDF2Text/package.cwl | 3 ++ tests/functional/test_workflow.py | 47 +++++++++++++++++-- tests/wps_restapi/test_processes.py | 34 +++++++++++++- weaver/formats.py | 24 +++++++--- weaver/processes/builtin/jsonarray2netcdf.cwl | 4 +- weaver/processes/builtin/jsonarray2netcdf.py | 2 +- weaver/processes/constants.py | 2 +- 8 files changed, 105 insertions(+), 18 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 0f3d31251..a10e23008 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,11 +12,14 @@ Changes Changes: -------- -- No change. +- Support `CWL` definition for ``ScatterFeatureRequirement`` + (resolves `#105 `_). +- Add `OGC` Media-Type ontology for ``File`` format references within `CWL` definition. +- Adjust ``builtin`` process ``jsonarray2netcdf`` (version ``2.0``) to employ `OGC` Media-Type for NetCDF. Fixes: ------ -- No change. +- Fix implementation of various functional test cases for `Workflow` execution. .. _changes_4.23.0: diff --git a/tests/functional/application-packages/DockerNetCDF2Text/package.cwl b/tests/functional/application-packages/DockerNetCDF2Text/package.cwl index 6781fd9df..6c48aeb81 100644 --- a/tests/functional/application-packages/DockerNetCDF2Text/package.cwl +++ b/tests/functional/application-packages/DockerNetCDF2Text/package.cwl @@ -22,8 +22,11 @@ inputs: type: File inputBinding: position: 1 + format: ogc:netcdf outputs: output_txt: type: File outputBinding: glob: "*.txt" +$namespaces: + ogc: "http://www.opengis.net/def/media-type/ogc/1.0/" diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index e61ce93f6..11dff2e06 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -970,9 +970,9 @@ def mock_tmp_input(requests_mock): "workflow of different process types.") # FIXME: implement + re-enable 'CWL_REQUIREMENT_SCATTER' - @pytest.mark.xfail( - reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)" - ) + #@pytest.mark.xfail( + # reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)" + #) def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): """ Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`. @@ -981,6 +981,44 @@ def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): 1. Convert JSON array of NetCDF references to corresponding NetCDF files (process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``). 2. Convert NetCDF file to raw text data dumps (using scattered applications per-file). + + .. seealso:: + Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from + :meth:`test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements`. + """ + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowScatterCopyNestedOutDir.json' + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) + nc_refs = [] + for i in range(3): + nc_name = f"test-file-{i}.nc" + nc_refs.append(os.path.join(tmp_host, nc_name)) + with open(os.path.join(tmp_dir, nc_name), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(f"DUMMY NETCDF DATA #{i}") + with open(os.path.join(tmp_dir, "netcdf-array.json"), mode="w", encoding="utf-8") as tmp_file: + json.dump(nc_refs, tmp_file) # must match execution body + + def mock_tmp_input(requests_mock): + mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) + + self.workflow_runner(WorkflowProcesses.WORKFLOW_WPS1_SCATTER_COPY_NETCDF, + [WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT, # required for reference by WPS below + WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT], + log_full_trace=True, requests_mock_callback=mock_tmp_input) + + def test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements(self): + """ + Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`. + + Steps: + 1. Convert JSON array of NetCDF references to corresponding NetCDF files + (process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``). + 2. Convert NetCDF file to raw text data dumps (using scattered applications per-file). + + .. seealso:: + Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from + :meth:`test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements`. """ with contextlib.ExitStack() as stack: @@ -999,7 +1037,8 @@ def mock_tmp_input(requests_mock): mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) self.workflow_runner(WorkflowProcesses.WORKFLOW_REST_SCATTER_COPY_NETCDF, - [WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT], + [WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF, # no need to register its builtin ref + WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT], log_full_trace=True, requests_mock_callback=mock_tmp_input) def test_workflow_docker_applications(self): diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index d703ab15b..d8aa85676 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -1260,9 +1260,39 @@ def test_deploy_process_WPS1_GetCapabilities_executionUnit(self): self.deploy_process_make_visible_and_fetch_deployed(body, resources.TEST_REMOTE_SERVER_WPS1_PROCESS_ID) # FIXME: implement - @pytest.mark.skip(reason="not implemented") + @pytest.mark.skip(reason="not implemented - experimental") def test_deploy_process_WPS3_DescribeProcess_href(self): - raise NotImplementedError + path = f"{self.url}/processes/jsonarray2netcdf" # use builtin, re-deploy as "remote process" + p_id = "new-test-wps3" + body = { + "processDescription": {"process": {"id": p_id}}, + "executionUnit": [{"href": path}], + } + desc = self.deploy_process_make_visible_and_fetch_deployed(body, p_id, assert_io=False) + assert desc["deploymentProfile"] == "http://www.opengis.net/profiles/eoc/ogcapiApplication" + + # process description should have been generated with relevant I/O + proc = desc["process"] + assert proc["id"] == p_id + assert proc["inputs"] == [] + assert proc["outputs"] == [{ + "id": "output", + "title": "output", + "schema": {"type": "string", "contentMediaType": "text/plain"}, + "formats": [{"default": True, "mediaType": "text/plain"}] + }] + + # package should have been generated with corresponding I/O from "remote process" + ref = self.get_application_package("jsonarray2netcdf") + pkg = self.get_application_package(p_id) + # add the missing remote reference to the local definition to compare them + ref["hints"] = { # expected to be defined in + "OGCAPIRequirement": { # FIXME: implement, aka 'Wps3Process' dispatched step + "process": "jsonarray2netcdf", + "provider": self.url + } + } + assert pkg == ref # FIXME: implement @pytest.mark.skip(reason="not implemented") diff --git a/weaver/formats.py b/weaver/formats.py index a3cf8ca77..3e314a21c 100644 --- a/weaver/formats.py +++ b/weaver/formats.py @@ -64,6 +64,7 @@ class ContentType(Constants): APP_YAML = "application/x-yaml" APP_ZIP = "application/zip" IMAGE_GEOTIFF = "image/tiff; subtype=geotiff" + IMAGE_OGC_GEOTIFF = "mage/tiff; application=geotiff" IMAGE_JPEG = "image/jpeg" IMAGE_GIF = "image/gif" IMAGE_PNG = "image/png" @@ -348,23 +349,32 @@ class SchemaRole(Constants): ContentType.IMAGE_JPEG: "format_3579", ContentType.APP_HDF5: "format_3590", ContentType.APP_JSON: "format_3464", - ContentType.APP_NETCDF: "format_3650", ContentType.APP_YAML: "format_3750", ContentType.TEXT_PLAIN: "format_1964", } # Official links to be employed in definitions must be formed as: -# http://www.opengis.net/def/glossary/... +# http://www.opengis.net/def/... # But they should be redirected to full definitions as: -# https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/glossary/... +# https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/... +# See common locations: +# https://www.opengis.net/def/media-type OPENGIS_NAMESPACE = "opengis" OPENGIS_NAMESPACE_URL = "http://www.opengis.net/" OPENGIS_NAMESPACE_DEFINITION = {OPENGIS_NAMESPACE: OPENGIS_NAMESPACE_URL} -OPENGIS_MAPPING = { - ContentType.IMAGE_GEOTIFF: "def/glossary/term/Geotiff" +OPENGIS_MAPPING = {} +# shorthand notation directly scoped under OGC Media-Types to allow: 'ogc:' +OGC_NAMESPACE = "ogc" +OGC_NAMESPACE_URL = f"{OPENGIS_NAMESPACE_URL}def/media-type/ogc/1.0/" +OGC_NAMESPACE_DEFINITION = {OGC_NAMESPACE: OGC_NAMESPACE_URL} +OGC_MAPPING = { + ContentType.IMAGE_GEOTIFF: "geotiff", + ContentType.IMAGE_OGC_GEOTIFF: "geotiff", + ContentType.APP_NETCDF: "netcdf", } FORMAT_NAMESPACE_DEFINITIONS = { **IANA_NAMESPACE_DEFINITION, **EDAM_NAMESPACE_DEFINITION, + **OGC_NAMESPACE_DEFINITION, **OPENGIS_NAMESPACE_DEFINITION } FORMAT_NAMESPACES = frozenset(FORMAT_NAMESPACE_DEFINITIONS) @@ -537,6 +547,8 @@ def _search_explicit_mappings(_mime_type): return _make_if_ref(IANA_NAMESPACE_DEFINITION, IANA_NAMESPACE, IANA_MAPPING[_mime_type]) if _mime_type in EDAM_MAPPING: # prefer real reference if available return _make_if_ref(EDAM_NAMESPACE_DEFINITION, EDAM_NAMESPACE, EDAM_MAPPING[_mime_type]) + if _mime_type in OGC_MAPPING: # prefer real reference if available + return _make_if_ref(OGC_NAMESPACE_DEFINITION, OGC_NAMESPACE, OGC_MAPPING[_mime_type]) if _mime_type in OPENGIS_MAPPING: # prefer real reference if available return _make_if_ref(OPENGIS_NAMESPACE_DEFINITION, OPENGIS_NAMESPACE, OPENGIS_MAPPING[_mime_type]) return None @@ -651,7 +663,7 @@ def clean_mime_type_format(mime_type, suffix_subtype=False, strip_parameters=Fal mime_type = mime_type.replace(v + ":", "") break search = True - for _map in [EDAM_MAPPING, OPENGIS_MAPPING]: + for _map in [EDAM_MAPPING, OGC_MAPPING, OPENGIS_MAPPING]: if not search: break for v in _map.values(): diff --git a/weaver/processes/builtin/jsonarray2netcdf.cwl b/weaver/processes/builtin/jsonarray2netcdf.cwl index a4ad3308f..fb334df23 100644 --- a/weaver/processes/builtin/jsonarray2netcdf.cwl +++ b/weaver/processes/builtin/jsonarray2netcdf.cwl @@ -16,7 +16,7 @@ inputs: prefix: "-i" outputs: output: - format: edam:format_3650 + format: ogc:netcdf type: type: array items: File @@ -24,4 +24,4 @@ outputs: glob: "*.nc" $namespaces: iana: "https://www.iana.org/assignments/media-types/" - edam: "http://edamontology.org/" + ogc: "http://www.opengis.net/def/media-type/ogc/1.0/" diff --git a/weaver/processes/builtin/jsonarray2netcdf.py b/weaver/processes/builtin/jsonarray2netcdf.py index b46fffdfc..bd1c15c18 100644 --- a/weaver/processes/builtin/jsonarray2netcdf.py +++ b/weaver/processes/builtin/jsonarray2netcdf.py @@ -28,7 +28,7 @@ LOGGER.setLevel(logging.INFO) # process details -__version__ = "1.3" +__version__ = "2.0" __title__ = "JSON array to NetCDF" __abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative diff --git a/weaver/processes/constants.py b/weaver/processes/constants.py index b89a373a1..e7cbe9b25 100644 --- a/weaver/processes/constants.py +++ b/weaver/processes/constants.py @@ -81,7 +81,7 @@ class OpenSearchField(Constants): CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_INIT_WORKDIR, CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138) - # CWL_REQUIREMENT_SCATTER, # FIXME: see workflow test + fix https://github.com/crim-ca/weaver/issues/105 + CWL_REQUIREMENT_SCATTER, ]) """ Set of :term:`CWL` requirements that corresponds to extra functionalities not completely defining From de18c13020d626c43fe8117804cb8c5586bf17c5 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 22 Aug 2022 20:53:17 -0400 Subject: [PATCH 2/8] fix test EDAM-NetCDF to OGC-NetCDF --- CHANGES.rst | 2 ++ tests/functional/test_wps_package.py | 8 +++-- tests/processes/test_convert.py | 38 ++++++++++----------- tests/test_formats.py | 51 +++++++++++++++++++++------- 4 files changed, 65 insertions(+), 34 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index a10e23008..9f5593c32 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -15,6 +15,8 @@ Changes: - Support `CWL` definition for ``ScatterFeatureRequirement`` (resolves `#105 `_). - Add `OGC` Media-Type ontology for ``File`` format references within `CWL` definition. +- Replace `EDAM` NetCDF format reference by `OGC` NetCDF Media-Type with expected ontology definitions by processes. + For backward compatibility, corresponding `EDAM` references will be converted to `OGC` Media-Type whenever possible. - Adjust ``builtin`` process ``jsonarray2netcdf`` (version ``2.0``) to employ `OGC` Media-Type for NetCDF. Fixes: diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 271d57e54..3253910b4 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -41,6 +41,8 @@ EDAM_MAPPING, EDAM_NAMESPACE, IANA_NAMESPACE, + OGC_MAPPING, + OGC_NAMESPACE, AcceptLanguage, ContentType, get_cwl_file_format @@ -57,7 +59,7 @@ from weaver.typedefs import JSON EDAM_PLAIN = EDAM_NAMESPACE + ":" + EDAM_MAPPING[ContentType.TEXT_PLAIN] -EDAM_NETCDF = EDAM_NAMESPACE + ":" + EDAM_MAPPING[ContentType.APP_NETCDF] +OGC_NETCDF = OGC_NAMESPACE + ":" + OGC_MAPPING[ContentType.APP_NETCDF] # note: x-tar cannot be mapped during CWL format resolution (not official schema), # it remains explicit tar definition in WPS context IANA_TAR = IANA_NAMESPACE + ":" + ContentType.APP_TAR # noqa # pylint: disable=unused-variable @@ -2497,7 +2499,7 @@ def test_deploy_literal_and_complex_io_from_wps_xml_reference(self): assert isinstance(pkg["inputs"], list) assert pkg["inputs"][0]["id"] == "tasmax" assert "default" not in pkg["inputs"][0] - assert pkg["inputs"][0]["format"] == EDAM_NETCDF + assert pkg["inputs"][0]["format"] == OGC_NETCDF assert isinstance(pkg["inputs"][0]["type"], list), "since minOccurs=1, single value non-array must be allowed" assert len(pkg["inputs"][0]["type"]) == 2, "single type and array type of same base" assert pkg["inputs"][0]["type"][0] == "File", "since minOccurs=1, should be type directly" @@ -2515,7 +2517,7 @@ def test_deploy_literal_and_complex_io_from_wps_xml_reference(self): assert isinstance(pkg["outputs"], list) assert pkg["outputs"][0]["id"] == "output_netcdf" assert "default" not in pkg["outputs"][0] - assert pkg["outputs"][0]["format"] == EDAM_NETCDF + assert pkg["outputs"][0]["format"] == OGC_NETCDF assert pkg["outputs"][0]["type"] == "File" assert pkg["outputs"][0]["outputBinding"]["glob"] == "output_netcdf/*.nc" assert pkg["outputs"][1]["id"] == "output_log" diff --git a/tests/processes/test_convert.py b/tests/processes/test_convert.py index 39ef502db..d1004042a 100644 --- a/tests/processes/test_convert.py +++ b/tests/processes/test_convert.py @@ -15,7 +15,7 @@ from pywps.validator.mode import MODE from weaver.exceptions import PackageTypeError -from weaver.formats import EDAM_MAPPING, EDAM_NAMESPACE_DEFINITION, ContentType +from weaver.formats import OGC_MAPPING, OGC_NAMESPACE_DEFINITION, ContentType from weaver.processes.constants import WPS_INPUT, WPS_LITERAL, WPS_OUTPUT, ProcessSchema from weaver.processes.convert import _are_different_and_set # noqa: W0212 from weaver.processes.convert import ( @@ -102,9 +102,9 @@ def test_any2cwl_io_from_wps(): assert cwl_io == { "id": "test", "type": "File", - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}" + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}" } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION # retry by manually injecting the type to validate that # pre-resolved type can also be converted directly from object @@ -115,10 +115,10 @@ def test_any2cwl_io_from_wps(): assert cwl_io == { "id": "test", "type": "File", - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}", + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}", "default": None, } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION wps_io.min_occurs = 10 wps_io.max_occurs = 20 @@ -127,10 +127,10 @@ def test_any2cwl_io_from_wps(): assert cwl_io == { "id": "test", "type": {"type": "array", "items": "File"}, - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}", + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}", "default": None, } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION class MockElementXML(dict): @@ -161,10 +161,10 @@ def test_any2cwl_io_from_ows(): assert cwl_io == { "id": "test", "type": "File", - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}", + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}", "default": None, } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION ows_io = OWSInput(MockElementXML({})) # skip parsing from XML, inject corresponding results directly ows_io.identifier = "test" @@ -177,10 +177,10 @@ def test_any2cwl_io_from_ows(): assert cwl_io == { "id": "test", "type": {"type": "array", "items": "File"}, - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}", + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}", "default": None, } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION def test_any2cwl_io_from_json(): @@ -196,9 +196,9 @@ def test_any2cwl_io_from_json(): assert cwl_io == { "id": "test", "type": "File", - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}" + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}" } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION json_io["minOccurs"] = 10 json_io["maxOccurs"] = 20 @@ -207,9 +207,9 @@ def test_any2cwl_io_from_json(): assert cwl_io == { "id": "test", "type": {"type": "array", "items": "File"}, - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}", + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}", } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION def test_any2cwl_io_from_oas(): @@ -225,9 +225,9 @@ def test_any2cwl_io_from_oas(): assert cwl_io == { "id": "test", "type": "File", - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}" + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}" } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION json_io["minOccurs"] = 10 json_io["maxOccurs"] = 20 @@ -236,9 +236,9 @@ def test_any2cwl_io_from_oas(): assert cwl_io == { "id": "test", "type": {"type": "array", "items": "File"}, - "format": f"edam:{EDAM_MAPPING[ContentType.APP_NETCDF]}", + "format": f"ogc:{OGC_MAPPING[ContentType.APP_NETCDF]}", } - assert cwl_ns == EDAM_NAMESPACE_DEFINITION + assert cwl_ns == OGC_NAMESPACE_DEFINITION def test_json2wps_datatype(): diff --git a/tests/test_formats.py b/tests/test_formats.py index 9fc3aa198..eb24d2b11 100644 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -1,6 +1,7 @@ import os import mock +import pytest from pyramid.httpexceptions import HTTPOk, HTTPRequestTimeout from pyramid.response import Response from pywps.inout.formats import Format @@ -69,8 +70,13 @@ def test_get_format_default_no_extension(): def test_get_cwl_file_format_tuple(): - tested = set(f.FORMAT_NAMESPACES) - for mime_type in [f.ContentType.APP_JSON, f.ContentType.APP_NETCDF, f.ContentType.IMAGE_GEOTIFF]: + untested = set(f.FORMAT_NAMESPACES) + tests = [ + f.ContentType.APP_JSON, + f.ContentType.APP_NETCDF, + f.ContentType.APP_HDF5, + ] + for mime_type in tests: res = f.get_cwl_file_format(mime_type, make_reference=False) assert isinstance(res, tuple) and len(res) == 2 ns, fmt = res @@ -79,24 +85,34 @@ def test_get_cwl_file_format_tuple(): assert list(ns.values())[0].startswith("http") ns_name = list(ns.keys())[0] assert fmt.startswith(f"{ns_name}:") - tested.remove(ns_name) - assert len(tested) == 0, "test did not evaluate every namespace variation" + untested.remove(ns_name) + for ns in list(untested): + ns_map_name = f"{ns.upper()}_MAPPING" + ns_map = getattr(f, ns_map_name, None) + if ns_map is not None and len(ns_map) == 0: + untested.remove(ns) # ignore empty mappings + assert len(untested) == 0, "test did not evaluate every namespace variation" def test_get_cwl_file_format_reference(): - tested = set(f.FORMAT_NAMESPACES) + untested = set(f.FORMAT_NAMESPACES) tests = [ (f.IANA_NAMESPACE_DEFINITION, f.ContentType.APP_JSON), - (f.EDAM_NAMESPACE_DEFINITION, f.ContentType.APP_NETCDF), - (f.OPENGIS_NAMESPACE_DEFINITION, f.ContentType.IMAGE_GEOTIFF), + (f.EDAM_NAMESPACE_DEFINITION, f.ContentType.APP_HDF5), + (f.OGC_NAMESPACE_DEFINITION, f.ContentType.IMAGE_OGC_GEOTIFF), ] for ns, mime_type in tests: res = f.get_cwl_file_format(mime_type, make_reference=True) ns_name, ns_url = list(ns.items())[0] assert isinstance(res, str) - assert res.startswith(ns_url) - tested.remove(ns_name) - assert len(tested) == 0, "test did not evaluate every namespace variation" + assert res.startswith(ns_url), f"[{res}] does not start with [{ns_url}]" + untested.remove(ns_name) + for ns in list(untested): + ns_map_name = f"{ns.upper()}_MAPPING" + ns_map = getattr(f, ns_map_name, None) + if ns_map is not None and len(ns_map) == 0: + untested.remove(ns) # ignore empty mappings + assert len(untested) == 0, "test did not evaluate every namespace variation" def test_get_cwl_file_format_unknown(): @@ -196,16 +212,27 @@ def test_clean_mime_type_format_edam(): assert res_type == mime_type # application/x-type +@pytest.mark.skipif(condition=not f.OPENGIS_MAPPING, reason="No OpenGIS format mappings defined to test") def test_clean_mime_type_format_opengis(): mime_type, fmt = list(f.OPENGIS_MAPPING.items())[0] - gis_fmt = f"{f.OPENGIS_NAMESPACE}:{fmt}" # "edam:format_####" + gis_fmt = f"{f.OPENGIS_NAMESPACE}:{fmt}" # "opengis:####" res_type = f.clean_mime_type_format(gis_fmt) assert res_type == mime_type - gis_fmt = os.path.join(list(f.OPENGIS_NAMESPACE_DEFINITION.values())[0], fmt) # "edam-url/format_####" + gis_fmt = os.path.join(list(f.OPENGIS_NAMESPACE_DEFINITION.values())[0], fmt) res_type = f.clean_mime_type_format(gis_fmt) assert res_type == mime_type # application/x-type +def test_clean_mime_type_format_ogc(): + mime_type, fmt = list(f.OGC_MAPPING.items())[0] + ogc_fmt = f"{f.OGC_NAMESPACE}:{fmt}" # "ogc:####" + res_type = f.clean_mime_type_format(ogc_fmt) + assert res_type == mime_type + ogc_fmt = os.path.join(list(f.OGC_NAMESPACE_DEFINITION.values())[0], fmt) + res_type = f.clean_mime_type_format(ogc_fmt) + assert res_type == mime_type # application/x-type + + def test_clean_mime_type_format_io_remove_extra_parameters(): test_input_formats = [ (f.ContentType.APP_JSON, f.ContentType.APP_JSON), From 41e7c566eec8aff97e82141e6b218fda568a4c42 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Wed, 14 Sep 2022 01:10:20 -0400 Subject: [PATCH 3/8] working CWL ScatterFeatureRequirement with dispatched WPS-1/OGC-API processes (fixes #105) --- CHANGES.rst | 5 +- .../WorkflowWPS1ScatterCopyNetCDF/package.cwl | 6 +- tests/functional/test_workflow.py | 23 ++++--- weaver/processes/builtin/jsonarray2netcdf.py | 16 ++--- weaver/processes/wps3_process.py | 8 +-- weaver/processes/wps_package.py | 62 ++++++++++++++----- weaver/typedefs.py | 6 +- 7 files changed, 87 insertions(+), 39 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 9f5593c32..fd41792dd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,8 +12,9 @@ Changes Changes: -------- -- Support `CWL` definition for ``ScatterFeatureRequirement`` - (resolves `#105 `_). +- Support `CWL` definition for ``ScatterFeatureRequirement`` for `Workflow` parallel step distribution of an + input array (resolves `#105 `_). +- Add formatter and better logging details when executing ``builtin`` `Process` ``jsonarray2netcdf``. - Add `OGC` Media-Type ontology for ``File`` format references within `CWL` definition. - Replace `EDAM` NetCDF format reference by `OGC` NetCDF Media-Type with expected ontology definitions by processes. For backward compatibility, corresponding `EDAM` references will be converted to `OGC` Media-Type whenever possible. diff --git a/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl b/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl index 0ec0210a1..2c9cb06fa 100644 --- a/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl +++ b/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl @@ -3,7 +3,9 @@ class: Workflow requirements: ScatterFeatureRequirement: {} inputs: - input_json: File + input_json: + type: File + format: "iana:application/json" outputs: output: type: @@ -24,3 +26,5 @@ steps: input_nc: parse/output out: - output_txt +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 11dff2e06..7e561e167 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -940,7 +940,8 @@ def test_workflow_mixed_rest_builtin_wps1_docker_select_requirements(self): nc_refs = [] for i in range(3): nc_name = f"test-file-{i}.nc" - nc_refs.append(os.path.join("file://" + tmp_dir, nc_name)) + nc_path = os.path.join(tmp_dir, nc_name) + nc_refs.append(f"file://{nc_path}") with open(os.path.join(tmp_dir, nc_name), mode="w", encoding="utf-8") as tmp_file: tmp_file.write(f"DUMMY NETCDF DATA #{i}") with open(os.path.join(tmp_dir, "netcdf-array.json"), mode="w", encoding="utf-8") as tmp_file: @@ -969,10 +970,6 @@ def mock_tmp_input(requests_mock): message="Workflow output data should have made it through the " "workflow of different process types.") - # FIXME: implement + re-enable 'CWL_REQUIREMENT_SCATTER' - #@pytest.mark.xfail( - # reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)" - #) def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): """ Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`. @@ -982,18 +979,23 @@ def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): (process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``). 2. Convert NetCDF file to raw text data dumps (using scattered applications per-file). + .. note:: + Because ``jsonarray2netcdf`` is running in subprocess instantiated by :mod:`cwltool`, file-server + location cannot be mocked by the test suite. Employ local test paths as if they where already fetched. + .. seealso:: Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from :meth:`test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements`. """ with contextlib.ExitStack() as stack: - tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowScatterCopyNestedOutDir.json' + tmp_host = "https://mocked-file-server.com" # must match in 'WorkflowWPS1ScatterCopyNetCDF/execute.yml' tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) nc_refs = [] for i in range(3): nc_name = f"test-file-{i}.nc" - nc_refs.append(os.path.join(tmp_host, nc_name)) + nc_path = os.path.join(tmp_dir, nc_name) + nc_refs.append(f"file://{nc_path}") with open(os.path.join(tmp_dir, nc_name), mode="w", encoding="utf-8") as tmp_file: tmp_file.write(f"DUMMY NETCDF DATA #{i}") with open(os.path.join(tmp_dir, "netcdf-array.json"), mode="w", encoding="utf-8") as tmp_file: @@ -1001,6 +1003,7 @@ def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): def mock_tmp_input(requests_mock): mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) + mocked_wps_output(self.settings, requests_mock=requests_mock) self.workflow_runner(WorkflowProcesses.WORKFLOW_WPS1_SCATTER_COPY_NETCDF, [WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT, # required for reference by WPS below @@ -1016,13 +1019,17 @@ def test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements(self): (process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``). 2. Convert NetCDF file to raw text data dumps (using scattered applications per-file). + .. note:: + Because ``jsonarray2netcdf`` is running in subprocess instantiated by :mod:`cwltool`, file-server + location cannot be mocked by the test suite. Employ local test paths as if they where already fetched. + .. seealso:: Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from :meth:`test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements`. """ with contextlib.ExitStack() as stack: - tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowScatterCopyNestedOutDir.json' + tmp_host = "https://mocked-file-server.com" # must match in 'WorkflowRESTScatterCopyNetCDF/execute.yml' tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) nc_refs = [] for i in range(3): diff --git a/weaver/processes/builtin/jsonarray2netcdf.py b/weaver/processes/builtin/jsonarray2netcdf.py index bd1c15c18..0b1e565c9 100644 --- a/weaver/processes/builtin/jsonarray2netcdf.py +++ b/weaver/processes/builtin/jsonarray2netcdf.py @@ -23,12 +23,14 @@ PACKAGE_NAME = os.path.split(os.path.splitext(__file__)[0])[-1] # setup logger since it is not run from the main 'weaver' app -LOGGER = logging.getLogger(__name__) -LOGGER.addHandler(logging.StreamHandler(sys.stdout)) -LOGGER.setLevel(logging.INFO) +LOGGER = logging.getLogger(PACKAGE_NAME) +_handler = logging.StreamHandler(sys.stdout) # noqa +_handler.setFormatter(logging.Formatter(fmt="[%(name)s] %(levelname)-8s %(message)s")) +LOGGER.addHandler(_handler) +LOGGER.setLevel(logging.DEBUG) # process details -__version__ = "2.0" +__version__ = "2.1" __title__ = "JSON array to NetCDF" __abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative @@ -52,10 +54,10 @@ def j2n(json_reference, output_dir): LOGGER.debug("Parsing JSON file references.") for file_url in json_content: LOGGER.debug("Fetching NetCDF reference from JSON file: [%s]", file_url) - fetch_file(file_url, output_dir, timeout=10, retry=3) + fetched_nc = fetch_file(file_url, output_dir, timeout=10, retry=3) + LOGGER.debug("Fetched NetCDF output location: [%s]", fetched_nc) except Exception as exc: - # log only debug for tracking, re-raise and actual error wil be logged by top process monitor - LOGGER.debug("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc) + LOGGER.error("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc) raise LOGGER.info("Process '%s' execution completed.", PACKAGE_NAME) diff --git a/weaver/processes/wps3_process.py b/weaver/processes/wps3_process.py index 49fa8e285..075fc37ea 100644 --- a/weaver/processes/wps3_process.py +++ b/weaver/processes/wps3_process.py @@ -63,7 +63,7 @@ class Wps3RemoteJobProgress(WpsRemoteJobProgress): class Wps3Process(WpsProcessInterface): def __init__(self, step_payload, # type: JSON - joborder, # type: CWL_RuntimeInputsMap + job_order, # type: CWL_RuntimeInputsMap process, # type: str request, # type: WorkerRequest update_status, # type: UpdateStatusPartialFunction @@ -74,10 +74,10 @@ def __init__(self, _message, _progress, _status, self.provider or "local", *args, **kwargs ) ) - self.provider, self.url, self.deploy_body = self.resolve_data_source(step_payload, joborder) + self.provider, self.url, self.deploy_body = self.resolve_data_source(step_payload, job_order) self.process = process - def resolve_data_source(self, step_payload, joborder): + def resolve_data_source(self, step_payload, job_order): # type: (CWL, CWL_RuntimeInputsMap) -> Tuple[str, str, JSON] try: # Presume that all EOImage given as input can be resolved to the same ADES @@ -86,7 +86,7 @@ def resolve_data_source(self, step_payload, joborder): data_url = "" # data_source will be set to the default ADES if no EOImages (anything but `None`) if eodata_inputs: step_payload = opensearch.alter_payload_after_query(step_payload) - value = joborder[eodata_inputs[0]] + value = job_order[eodata_inputs[0]] if isinstance(value, list): value = value[0] # Use the first value to determine the data source data_url = value["location"] diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 735311339..bfc7e051f 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -61,6 +61,7 @@ CWL_REQUIREMENT_APP_WPS1, CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_RESOURCE, + CWL_REQUIREMENT_SCATTER, CWL_REQUIREMENTS_SUPPORTED, WPS_INPUT, WPS_OUTPUT @@ -137,6 +138,7 @@ CWL_RequirementsList, CWL_Results, CWL_ToolPathObjectType, + CWL_WorkflowStepPackage, CWL_WorkflowStepPackageMap, CWL_WorkflowStepReference, JSON, @@ -672,7 +674,7 @@ def get_application_requirement(package, search=None, default=None, validate=Tru hints = package.get("hints", {}) all_hints = _get_package_requirements_as_class_list(reqs) + _get_package_requirements_as_class_list(hints) if search: - app_hints = list(filter(lambda h: h == search, all_hints)) + app_hints = list(filter(lambda h: h["class"] == search, all_hints)) else: app_hints = list(filter(lambda h: any(h["class"].endswith(t) for t in CWL_REQUIREMENT_APP_TYPES), all_hints)) if len(app_hints) > 1: @@ -1800,7 +1802,34 @@ def make_tool(self, toolpath_object, loading_context): from weaver.processes.wps_workflow import default_make_tool return default_make_tool(toolpath_object, loading_context, self.get_job_process_definition) - def get_job_process_definition(self, jobname, joborder, tool): # noqa: E811 + def get_workflow_step_package(self, job_name): + # type: (str) -> CWL_WorkflowStepPackage + """ + Resolve the step :term:`CWL` definition under a :term:`Workflow`. + """ + try: + step_details = self.step_packages[job_name] + except KeyError: # Perform check directly first in case a step was called literally as '_' + # In case of Workflow with scattering, job name might be suffixed with an index + req = get_application_requirement(self.package, CWL_REQUIREMENT_SCATTER, default={}, validate=False) + if not (req and "_" in job_name): + raise + job_name, job_index = job_name.rsplit("_", 1) + if not job_index.isnumeric(): + raise + LOGGER.debug("Resolved step name with index from scattering: [%s](%s)", job_name, job_index) + step_details = self.step_packages[job_name] + if "scatter" not in self.package["steps"][job_name]: + self.log_message( + self.job.status, + f"Expected scatter feature to match resolved name [{job_name}]({job_index}) " + "but no scatter specification was found in that step's package.", + level=logging.ERROR, + ) + raise + return step_details + + def get_job_process_definition(self, job_name, job_order, tool): # noqa: E811 # type: (str, JSON, CWL) -> WpsPackage """ Obtain the execution job definition for the given process (:term:`Workflow` step implementation). @@ -1811,48 +1840,49 @@ def get_job_process_definition(self, jobname, joborder, tool): # noqa: E811 It must return a :class:`weaver.processes.wps_process.WpsProcess` instance configured with the proper :term:`CWL` package definition, :term:`ADES` target and cookies to access it (if protected). - :param jobname: The workflow step or the package id that must be launched on an ADES :class:`string` - :param joborder: The params for the job :class:`dict {input_name: input_value}` - input_value is one of `input_object` or `array [input_object]` - input_object is one of `string` or `dict {class: File, location: string}` - in our case input are expected to be File object + :param job_name: The workflow step or the package id that must be launched on an ADES :class:`string` + :param job_order: The params for the job :class:`dict {input_name: input_value}` + input_value is one of `input_object` or `array [input_object]` + input_object is one of `string` or `dict {class: File, location: string}` + in our case input are expected to be File object :param tool: Whole `CWL` config including hints requirement (see: :py:data:`weaver.processes.constants.CWL_REQUIREMENT_APP_TYPES`) """ - if jobname == self.package_id: + if job_name == self.package_id: # A step is the package itself only for non-workflow package being executed on the EMS # default action requires ADES dispatching but hints can indicate also WPS1 or ESGF-CWT provider step_package_type = self.package_type step_payload = self.payload step_package = self.package step_process = self.package_id - jobtype = "package" + job_type = "package" else: # Here we got a step part of a workflow (self is the workflow package) - step_details = self.step_packages[jobname] + step_details = self.get_workflow_step_package(job_name) step_process = step_details["id"] step_package = step_details["package"] step_package_type = _get_package_type(step_package) step_payload = {} # defer until package requirement resolve to avoid unnecessary fetch - jobtype = "step" + job_type = "step" # Progress made with steps presumes that they are done sequentially and have the same progress weight start_step_progress = self.map_step_progress(len(self.step_launched), max(1, len(self.step_packages))) end_step_progress = self.map_step_progress(len(self.step_launched) + 1, max(1, len(self.step_packages))) - self.step_launched.append(jobname) - self.update_status(f"Preparing to launch {jobtype} {jobname}.", start_step_progress, Status.RUNNING) + self.step_launched.append(job_name) + self.update_status(f"Preparing to launch {job_type} {job_name}.", start_step_progress, Status.RUNNING) def _update_status_dispatch(_message, _progress, _status, _provider, *_, error=None, **__): # type: (str, Number, AnyStatusType, str, Any, Optional[Exception], Any) -> None if LOGGER.isEnabledFor(logging.DEBUG) and (_ or __): LOGGER.debug("Received additional unhandled args/kwargs to dispatched update status: %s, %s", _, __) self.step_update_status( - _message, _progress, start_step_progress, end_step_progress, jobname, _provider, _status, error=error + _message, _progress, start_step_progress, end_step_progress, job_name, _provider, _status, error=error ) def _get_wps1_params(_requirement): + # type: (CWL_AnyRequirements) -> CWL_Requirement _wps_params = {} required_params = ["provider", "process"] for _param in required_params: @@ -1869,7 +1899,7 @@ def _get_wps1_params(_requirement): req_class = ProcessType.WORKFLOW req_source = "tool class" - if jobtype == "step" and not any( + if job_type == "step" and not any( req_class.endswith(req) for req in [CWL_REQUIREMENT_APP_WPS1, CWL_REQUIREMENT_APP_ESGF_CWT] ): LOGGER.debug("Retrieve WPS-3 process payload for potential Data Source definitions to resolve.") @@ -1900,7 +1930,7 @@ def _get_wps1_params(_requirement): self.logger.info("WPS-3 Package resolved from %s: %s", req_source, req_class) from weaver.processes.wps3_process import Wps3Process return Wps3Process(step_payload=step_payload, - joborder=joborder, + job_order=job_order, process=step_process, request=self.request, update_status=_update_status_dispatch) diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 685cc5434..5470d970f 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -152,7 +152,11 @@ CWL_Outputs = Union[List[CWL_Output_Type], Dict[str, CWL_Output_Type]] # 'requirements' includes 'hints' - CWL_Requirement = TypedDict("CWL_Requirement", {"class": CWL_RequirementNames}, total=False) # type: ignore + CWL_Requirement = TypedDict("CWL_Requirement", { + "class": CWL_RequirementNames, # type: ignore + "provider": NotRequired[str], + "process": NotRequired[str], + }, total=False) CWL_RequirementsDict = Dict[CWL_RequirementNames, Dict[str, str]] # {'': {: }} CWL_RequirementsList = List[CWL_Requirement] # [{'class': , : }] CWL_AnyRequirements = Union[CWL_RequirementsDict, CWL_RequirementsList] From f9476fdea4dec474c8215d86f0221e6ab1813d3f Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Wed, 14 Sep 2022 01:25:10 -0400 Subject: [PATCH 4/8] ignore lint complexity of nested defs --- weaver/formats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/formats.py b/weaver/formats.py index 3e314a21c..b62890a0d 100644 --- a/weaver/formats.py +++ b/weaver/formats.py @@ -490,7 +490,7 @@ def add_content_type_charset(content_type, charset): return content_type -def get_cwl_file_format(mime_type, make_reference=False, must_exist=True, allow_synonym=True): +def get_cwl_file_format(mime_type, make_reference=False, must_exist=True, allow_synonym=True): # pylint: disable=R1260 # type: (str, bool, bool, bool) -> Union[Tuple[Optional[JSON], Optional[str]], Optional[str]] """ Obtains the extended schema reference from the media-type identifier. From b7b948c0dedcf95d0304a7b786bfe6098b64d488 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Wed, 14 Sep 2022 01:28:52 -0400 Subject: [PATCH 5/8] fix workflow test wps-outputs mock --- tests/functional/test_workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 7e561e167..f35950490 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -949,6 +949,7 @@ def test_workflow_mixed_rest_builtin_wps1_docker_select_requirements(self): def mock_tmp_input(requests_mock): mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) + mocked_wps_output(self.settings, requests_mock=requests_mock) results = self.workflow_runner(WorkflowProcesses.WORKFLOW_REST_SELECT_COPY_NETCDF, [WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT, # indirectly needed by WPS-1 From 99a8512fd03e9f3d92d10753e1e2b10407c802db Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Wed, 14 Sep 2022 01:46:48 -0400 Subject: [PATCH 6/8] fix uniquename step retrieval when shared across workflows --- .../WPS1JsonArray2NetCDF/deploy.yml | 5 ++++ .../WorkflowRESTScatterCopyNetCDF/package.cwl | 7 +++++- .../WorkflowWPS1ScatterCopyNetCDF/package.cwl | 1 + .../WorkflowWPS1SelectCopyNetCDF/package.cwl | 1 + tests/functional/test_workflow.py | 4 +++- weaver/processes/wps_package.py | 14 ++++------- weaver/processes/wps_workflow.py | 24 +++++++++---------- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/tests/functional/application-packages/WPS1JsonArray2NetCDF/deploy.yml b/tests/functional/application-packages/WPS1JsonArray2NetCDF/deploy.yml index 26b88e14f..a30464f58 100644 --- a/tests/functional/application-packages/WPS1JsonArray2NetCDF/deploy.yml +++ b/tests/functional/application-packages/WPS1JsonArray2NetCDF/deploy.yml @@ -28,9 +28,14 @@ executionUnit: inputs: input: type: File + format: "iana:application/json" outputs: output: type: File[] + format: "ogc:netcdf" outputBinding: glob: "*.nc" + $namespaces: + iana: "https://www.iana.org/assignments/media-types/" + ogc: "http://www.opengis.net/def/media-type/ogc/1.0/" deploymentProfileName: http://www.opengis.net/profiles/eoc/wpsApplication diff --git a/tests/functional/application-packages/WorkflowRESTScatterCopyNetCDF/package.cwl b/tests/functional/application-packages/WorkflowRESTScatterCopyNetCDF/package.cwl index 4f19df629..08c0a3451 100644 --- a/tests/functional/application-packages/WorkflowRESTScatterCopyNetCDF/package.cwl +++ b/tests/functional/application-packages/WorkflowRESTScatterCopyNetCDF/package.cwl @@ -1,9 +1,12 @@ +#!/usr/bin/env cwl-runner cwlVersion: v1.0 class: Workflow requirements: ScatterFeatureRequirement: {} inputs: - input_json: File + input_json: + type: File + format: "iana:application/json" outputs: output: type: @@ -25,3 +28,5 @@ steps: input_nc: parse/output out: - output_txt +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl b/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl index 2c9cb06fa..5c3e84aef 100644 --- a/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl +++ b/tests/functional/application-packages/WorkflowWPS1ScatterCopyNetCDF/package.cwl @@ -1,3 +1,4 @@ +#!/usr/bin/env cwl-runner cwlVersion: v1.0 class: Workflow requirements: diff --git a/tests/functional/application-packages/WorkflowWPS1SelectCopyNetCDF/package.cwl b/tests/functional/application-packages/WorkflowWPS1SelectCopyNetCDF/package.cwl index b9c9c81aa..3a8ca695e 100644 --- a/tests/functional/application-packages/WorkflowWPS1SelectCopyNetCDF/package.cwl +++ b/tests/functional/application-packages/WorkflowWPS1SelectCopyNetCDF/package.cwl @@ -1,3 +1,4 @@ +#!/usr/bin/env cwl-runner cwlVersion: v1.0 class: Workflow inputs: diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index f35950490..979d7ec20 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -1035,7 +1035,8 @@ def test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements(self): nc_refs = [] for i in range(3): nc_name = f"test-file-{i}.nc" - nc_refs.append(os.path.join(tmp_host, nc_name)) + nc_path = os.path.join(tmp_dir, nc_name) + nc_refs.append(f"file://{nc_path}") with open(os.path.join(tmp_dir, nc_name), mode="w", encoding="utf-8") as tmp_file: tmp_file.write(f"DUMMY NETCDF DATA #{i}") with open(os.path.join(tmp_dir, "netcdf-array.json"), mode="w", encoding="utf-8") as tmp_file: @@ -1043,6 +1044,7 @@ def test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements(self): def mock_tmp_input(requests_mock): mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) + mocked_wps_output(self.settings, requests_mock=requests_mock) self.workflow_runner(WorkflowProcesses.WORKFLOW_REST_SCATTER_COPY_NETCDF, [WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF, # no need to register its builtin ref diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index bfc7e051f..ee86f7309 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -1811,22 +1811,16 @@ def get_workflow_step_package(self, job_name): step_details = self.step_packages[job_name] except KeyError: # Perform check directly first in case a step was called literally as '_' # In case of Workflow with scattering, job name might be suffixed with an index - req = get_application_requirement(self.package, CWL_REQUIREMENT_SCATTER, default={}, validate=False) - if not (req and "_" in job_name): + # Also, to avoid ambiguous references of Workflow steps running in parallel (distinct jobs), + # unique keys are generated for matching step names, since their sub-CWL might differ. + # (see 'cwltool.process.uniquename') + if "_" not in job_name: raise job_name, job_index = job_name.rsplit("_", 1) if not job_index.isnumeric(): raise LOGGER.debug("Resolved step name with index from scattering: [%s](%s)", job_name, job_index) step_details = self.step_packages[job_name] - if "scatter" not in self.package["steps"][job_name]: - self.log_message( - self.job.status, - f"Expected scatter feature to match resolved name [{job_name}]({job_index}) " - "but no scatter specification was found in that step's package.", - level=logging.ERROR, - ) - raise return step_details def get_job_process_definition(self, job_name, job_order, tool): # noqa: E811 diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 42dc71d44..729800d8f 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -139,14 +139,14 @@ def __init__(self, toolpath_object, loading_context, get_job_process_definition) # pylint: disable=W0221,W0237 # naming using python like arguments def job(self, - joborder, # type: Dict[Text, AnyValueType] + job_order, # type: Dict[Text, AnyValueType] output_callbacks, # type: Callable[[Any, Any], Any] runtime_context, # type: RuntimeContext ): # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] """ Workflow job generator. - :param joborder: inputs of the job submission + :param job_order: inputs of the job submission :param output_callbacks: method to fetch step outputs and corresponding step details :param runtime_context: configs about execution environment :return: @@ -155,18 +155,18 @@ def job(self, if self.metadata["cwlVersion"] == "v1.0": require_prefix = "http://commonwl.org/cwltool#" - jobname = uniquename(runtime_context.name or shortname(self.tool.get("id", "job"))) + job_name = uniquename(runtime_context.name or shortname(self.tool.get("id", "job"))) # outdir must be served by the EMS because downstream step will need access to upstream steps output weaver_out_dir = get_wps_output_dir(get_settings()) runtime_context.outdir = tempfile.mkdtemp( prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX), dir=weaver_out_dir) - builder = self._init_job(joborder, runtime_context) + builder = self._init_job(job_order, runtime_context) - # `jobname` is the step name and `joborder` is the actual step inputs - wps_workflow_job = WpsWorkflowJob(builder, builder.job, self.requirements, self.hints, jobname, - self.get_job_process_definition(jobname, joborder, self.tool), + # `job_name` is the step name and `job_order` is the actual step inputs + wps_workflow_job = WpsWorkflowJob(builder, builder.job, self.requirements, self.hints, job_name, + self.get_job_process_definition(job_name, job_order, self.tool), self.tool["outputs"]) wps_workflow_job.prov_obj = self.prov_obj wps_workflow_job.successCodes = self.tool.get("successCodes") @@ -194,7 +194,7 @@ def job(self, wps_workflow_job.collect_outputs = partial( self.collect_output_ports, self.tool["outputs"], builder, compute_checksum=getdefault(runtime_context.compute_checksum, True), - jobname=jobname, + job_name=job_name, readers=readers) wps_workflow_job.output_callback = output_callbacks @@ -205,7 +205,7 @@ def collect_output_ports(self, builder, # type: Builder outdir, # type: Text compute_checksum=True, # type: bool - jobname="", # type: Text + job_name="", # type: Text readers=None # type: Dict[Text, Any] ): # type: (...) -> OutputPorts ret = {} # type: OutputPorts @@ -252,7 +252,7 @@ def make_workflow_exception(msg): finally: if builder.mutation_manager and readers: for reader in readers.values(): - builder.mutation_manager.release_reader(jobname, reader) + builder.mutation_manager.release_reader(job_name, reader) def collect_output(self, schema, # type: Dict[Text, Any] @@ -436,14 +436,14 @@ def collect_output(self, class WpsWorkflowJob(JobBase): def __init__(self, builder, # type: Builder - joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]] + job_order, # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]] requirements, # type: List[Dict[Text, Text]] hints, # type: List[Dict[Text, Text]] name, # type: Text wps_process, # type: WpsProcessInterface expected_outputs, # type: List[CWL_Output_Type] ): # type: (...) -> None - super(WpsWorkflowJob, self).__init__(builder, joborder, None, requirements, hints, name) + super(WpsWorkflowJob, self).__init__(builder, job_order, None, requirements, hints, name) self.wps_process = wps_process self.expected_outputs = {} # type: CWL_ExpectedOutputs # {id: file-pattern} for output in expected_outputs: From acdc93bcc344d394fe2206d6db2eae40022549b1 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Wed, 14 Sep 2022 10:26:56 -0400 Subject: [PATCH 7/8] fix lint unused import --- weaver/processes/wps_package.py | 1 - 1 file changed, 1 deletion(-) diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index ee86f7309..9b472f701 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -61,7 +61,6 @@ CWL_REQUIREMENT_APP_WPS1, CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_RESOURCE, - CWL_REQUIREMENT_SCATTER, CWL_REQUIREMENTS_SUPPORTED, WPS_INPUT, WPS_OUTPUT From a463c794c9fdb38edb11ef74d8b1ef8120aef9e2 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Wed, 14 Sep 2022 11:13:26 -0400 Subject: [PATCH 8/8] ignore coverage of logging calls with self reference --- setup.cfg | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/setup.cfg b/setup.cfg index 16da8174e..5307dc942 100644 --- a/setup.cfg +++ b/setup.cfg @@ -131,5 +131,11 @@ exclude_lines = LOGGER.error LOGGER.exception LOGGER.log + self.logger.debug + self.logger.info + self.logger.warning + self.logger.error + self.logger.exception + self.logger.log @overload if not result.success: