Skip to content

Commit

Permalink
Merge pull request #638 from crim-ca/add-cwl-step-input-expr
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored May 9, 2024
2 parents 07698b1 + 4790f66 commit 213e8e3
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 29 deletions.
2 changes: 1 addition & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Changes

Changes:
--------
- No change.
- Add `CWL` ``StepInputExpressionRequirement`` support.

Fixes:
------
Expand Down
7 changes: 7 additions & 0 deletions tests/functional/application-packages/WorkflowEcho/deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
processDescription:
process:
id: WorkflowEcho
executionUnit:
# note: This does not work by itself! The test suite injects the file dynamically.
- test: "tests/functional/application-packages/WorkflowEcho/package.cwl"
deploymentProfileName: "http://www.opengis.net/profiles/eoc/workflow"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
inputs:
message: test-workflow-echo
14 changes: 10 additions & 4 deletions tests/functional/application-packages/WorkflowEcho/package.cwl
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
cwlVersion: v1.2
class: Workflow
doc: Workflow that simply calls the echo process twice in a chain.
doc: Workflow that calls the echo process in a chain to propagate the input value to the output.
inputs:
message: string
outputs:
output:
type: File
outputSource: echo2/output
requirements:
# required for the 'valueFrom' in the step
# (see https://www.commonwl.org/v1.0/Workflow.html#WorkflowStepInput)
StepInputExpressionRequirement: {}
steps:
echo1:
run: Echo.cwl
Expand All @@ -19,8 +23,10 @@ steps:
run: Echo.cwl
in:
# temp input to pass result 'File', then load contents for 'string' type
echo_file: echo1/output
echo_file:
source: echo1/output
loadContents: true
message:
valueFrom: $(inputs.echo_file.contents)
valueFrom: "$(inputs.echo_file.contents)"
out:
- output
90 changes: 66 additions & 24 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
from weaver.config import WeaverConfiguration
from weaver.execute import ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType
from weaver.processes.constants import CWL_REQUIREMENT_STEP_INPUT_EXPRESSION
from weaver.processes.utils import get_process_information
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.utils import fetch_file, generate_diff, get_any_id, get_weaver_url, make_dirs, now, request_extra
from weaver.visibility import Visibility
from weaver.wps.utils import map_wps_output_location
from weaver.wps_restapi.utils import get_wps_restapi_base_url

if TYPE_CHECKING:
Expand All @@ -55,6 +57,7 @@
AnyRequestMethod,
AnyResponseType,
CookiesType,
CWL,
ExecutionResults,
HeadersType,
ProcessDeployment,
Expand All @@ -72,9 +75,19 @@ class WorkflowProcesses(enum.Enum):
They will be loaded by :class:`WorkflowTestRunnerBase` derived classes in alphabetical order.
All atomic :term:`Application Package` will be loaded before :term:`Workflow` definitions.
"""

# https://github.com/crim-ca/testbed14/tree/master/application-packages
APP_STACKER = "Stacker"
APP_SFS = "SFS"
APP_FLOOD_DETECTION = "FloodDetection"
WORKFLOW_CUSTOM = "CustomWorkflow"
WORKFLOW_FLOOD_DETECTION = "WorkflowFloodDetection"
WORKFLOW_STACKER_SFS = "Workflow"
WORKFLOW_SC = "WorkflowSimpleChain"
WORKFLOW_S2P = "WorkflowS2ProbaV"

# local in 'tests/functional/application-packages'
APP_ECHO = "Echo"
APP_ICE_DAYS = "Finch_IceDays"
APP_SUBSET_BBOX = "ColibriFlyingpigeon_SubsetBbox"
APP_SUBSET_ESGF = "SubsetESGF"
Expand All @@ -87,13 +100,9 @@ class WorkflowProcesses(enum.Enum):
APP_DIRECTORY_MERGING_PROCESS = "DirectoryMergingProcess"
APP_WPS1_DOCKER_NETCDF_2_TEXT = "WPS1DockerNetCDF2Text"
APP_WPS1_JSON_ARRAY_2_NETCDF = "WPS1JsonArray2NetCDF"
WORKFLOW_STACKER_SFS = "Workflow"
WORKFLOW_SC = "WorkflowSimpleChain"
WORKFLOW_S2P = "WorkflowS2ProbaV"
WORKFLOW_CHAIN_COPY = "WorkflowChainCopy"
WORKFLOW_CUSTOM = "CustomWorkflow"
WORKFLOW_DIRECTORY_LISTING = "WorkflowDirectoryListing"
WORKFLOW_FLOOD_DETECTION = "WorkflowFloodDetection"
WORKFLOW_ECHO = "WorkflowEcho"
WORKFLOW_SUBSET_ICE_DAYS = "WorkflowSubsetIceDays"
WORKFLOW_SUBSET_PICKER = "WorkflowSubsetPicker"
WORKFLOW_SUBSET_LLNL_SUBSET_CRIM = "WorkflowSubsetLLNL_SubsetCRIM"
Expand All @@ -112,16 +121,18 @@ class ProcessInfo(object):
"""

def __init__(self,
process_id, # type: Union[str, WorkflowProcesses]
test_id=None, # type: Optional[str]
deploy_payload=None, # type: Optional[ProcessDeployment]
execute_payload=None, # type: Optional[ProcessExecution]
): # type: (...) -> None
self.pid = WorkflowProcesses(process_id) # type: WorkflowProcesses
self.id = self.pid.value # type: Optional[str] # noqa
self.test_id = test_id # type: Optional[str]
self.deploy_payload = deploy_payload # type: Optional[ProcessDeployment]
self.execute_payload = execute_payload # type: Optional[ProcessExecution]
process_id, # type: Union[str, WorkflowProcesses]
test_id=None, # type: Optional[str]
deploy_payload=None, # type: Optional[ProcessDeployment]
execute_payload=None, # type: Optional[ProcessExecution]
application_package=None, # type: Optional[CWL]
): # type: (...) -> None
self.pid = WorkflowProcesses(process_id) # type: WorkflowProcesses
self.id = self.pid.value # type: Optional[str] # noqa
self.test_id = test_id # type: Optional[str]
self.deploy_payload = deploy_payload # type: Optional[ProcessDeployment]
self.execute_payload = execute_payload # type: Optional[ProcessExecution]
self.application_package = application_package # type: Optional[CWL]


@pytest.mark.functional
Expand Down Expand Up @@ -520,7 +531,7 @@ def retrieve_process_info(cls, process_id):
1. Look in the local ``tests.functional`` directory.
2. Look in remote repository:
https://github.com/crim-ca/testbed14
i.e.: directory ``TB14`` under in
i.e.: directory ``TB14`` under
https://github.com/crim-ca/application-packages/tree/master/OGC
3. Look in remote repository directory:
https://github.com/crim-ca/application-packages/tree/master/OGC/TB16
Expand All @@ -537,17 +548,21 @@ def retrieve_process_info(cls, process_id):
- ``Execute_<PROCESS_ID>.[json|yaml|yml]``
- ``<PROCESS_ID>.[cwl|json|yaml|yml]`` (package)
2. Contents defined within a sub0directory named ``<PROCESS_ID>`` with either the previous names or simply:
2. Contents defined within a sub-directory named ``<PROCESS_ID>`` with either the previous names or simply:
- ``deploy.[json|yaml|yml]``
- ``execute.[json|yaml|yml]``
- ``package.[cwl|json|yaml|yml]``
For each group of content definitions, Deploy and Execute contents are mandatory.
The package file can be omitted if it is already explicitly embedded within the Deploy contents.
For each group of content definitions, "deploy" and "execute" contents are mandatory.
The "package" file can be omitted if it is already explicitly embedded within the "deploy" contents.
.. note::
Only when references are local (tests), the package can be referred by relative ``tests/...`` path
within the Deploy content ``executionUnit`` using ``test`` key instead of ``unit`` or ``href``.
within the "deploy" content's ``executionUnit`` using ``test`` key instead of ``unit`` or ``href``.
In such case, the "package" contents will be loaded and injected dynamically as ``unit`` in the "deploy"
body at the relevant location. Alternatively to the ``test`` key, ``href`` can also be used, but will be
loaded only if using the ``tests/..`` relative path. This is to ensure cross-support with other test
definitions using this format outside of "workflow" use cases.
:param process_id: identifier of the process to retrieve contents.
:return: found content definitions.
Expand All @@ -559,18 +574,23 @@ def retrieve_process_info(cls, process_id):
execute_payload = cls.retrieve_payload(pid, "execute")

# replace derived reference (local only, remote must use the full 'href' references)
test_app_pkg = deploy_payload.get("executionUnit", [{}])[0].pop("test", None)
test_exec_unit = deploy_payload.get("executionUnit", [{}])[0]
unit_app_pkg = None
test_app_pkg = test_exec_unit.pop("test", None)
if test_app_pkg:
unit_app_pkg = cls.retrieve_payload(pid, "package")
deploy_payload["executionUnit"][0]["unit"] = unit_app_pkg
elif "href" in test_exec_unit and str(test_exec_unit["href"]).startswith("tests/"):
unit_app_pkg = cls.retrieve_payload(pid, "package", local=True, location=test_exec_unit["href"])
if unit_app_pkg:
deploy_payload["executionUnit"][0] = {"unit": unit_app_pkg}

# Apply collection swapping
for swap in cls.swap_data_collection():
for i in execute_payload["inputs"]:
if "data" in i and i["data"] == swap[0]:
i["data"] = swap[1]

return ProcessInfo(process_id, test_process_id, deploy_payload, execute_payload)
return ProcessInfo(process_id, test_process_id, deploy_payload, execute_payload, unit_app_pkg)

@classmethod
def assert_response(cls, response, status=None, message=""):
Expand Down Expand Up @@ -938,12 +958,14 @@ class WorkflowTestCase(WorkflowTestRunnerBase):
WorkflowProcesses.APP_DIRECTORY_LISTING_PROCESS,
WorkflowProcesses.APP_DIRECTORY_MERGING_PROCESS,
WorkflowProcesses.APP_DOCKER_STAGE_IMAGES,
WorkflowProcesses.APP_ECHO,
WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT,
WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF,
}
WEAVER_TEST_WORKFLOW_SET = {
WorkflowProcesses.WORKFLOW_CHAIN_COPY,
WorkflowProcesses.WORKFLOW_DIRECTORY_LISTING,
WorkflowProcesses.WORKFLOW_ECHO,
WorkflowProcesses.WORKFLOW_STAGE_COPY_IMAGES,
WorkflowProcesses.WORKFLOW_REST_SCATTER_COPY_NETCDF,
WorkflowProcesses.WORKFLOW_REST_SELECT_COPY_NETCDF,
Expand Down Expand Up @@ -1281,8 +1303,28 @@ def mock_tmp_input(requests_mock):
f"\n{self.logger_separator_calls}"
)
# check that all expected files made it through the listing/directory input/output chaining between steps
output_files = "\n".join(os.path.join(*line.rsplit("/", 2)[-2:]) for line in output_lines)
output_files = "\n".join(os.path.join(*line.rsplit("/", 2)[-2:]) for line in output_lines) # type: ignore
expect_files = "\n".join(os.path.join("output_dir", os.path.split(file)[-1]) for file in expect_http_files)
self.assert_test(lambda: output_files == expect_files,
message="Workflow output file expected to contain single file with raw string listing of "
"input files chained from generated output directory listing of the first step.")

def test_workflow_echo_step_expression(self):
"""
Validate that a Workflow using 'StepInputExpressionRequirement' is supported.
"""
pkg = self.test_processes_info[WorkflowProcesses.WORKFLOW_ECHO].application_package
assert CWL_REQUIREMENT_STEP_INPUT_EXPRESSION in pkg["requirements"], "missing requirement for test"
result = self.workflow_runner(
WorkflowProcesses.WORKFLOW_ECHO,
[WorkflowProcesses.APP_ECHO],
log_full_trace=True,
)
assert "output" in result
path = map_wps_output_location(result["output"]["href"], container=self.settings)
with open(path, mode="r", encoding="utf-8") as out_file:
data = out_file.read()
out = data.strip() # ignore newlines added by the echo steps, good enough to test the operations worked
assert out == "test-workflow-echo", (
f"Should match the input value from 'execute' body of '{WorkflowProcesses.WORKFLOW_ECHO}'"
)
4 changes: 4 additions & 0 deletions weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class OpenSearchField(Constants):
CWL_RequirementProcessGeneratorType = Literal["ProcessGenerator"]
CWL_RequirementResourceType = Literal["ResourceRequirement"]
CWL_RequirementScatterFeatureType = Literal["ScatterFeatureRequirement"]
CWL_RequirementStepInputExpressionType = Literal["StepInputExpressionRequirement"]
CWL_RequirementSecretsType = Literal["cwltool:Secrets"]
CWL_RequirementToolTimeLimitType = Literal["ToolTimeLimit"]
CWL_RequirementWorkReuseType = Literal["WorkReuse"]
Expand Down Expand Up @@ -170,6 +171,7 @@ class OpenSearchField(Constants):
CWL_REQUIREMENT_PROCESS_GENERATOR = get_args(CWL_RequirementProcessGeneratorType)[0]
CWL_REQUIREMENT_RESOURCE = get_args(CWL_RequirementResourceType)[0]
CWL_REQUIREMENT_SCATTER = get_args(CWL_RequirementScatterFeatureType)[0]
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION = get_args(CWL_RequirementStepInputExpressionType)[0]
CWL_REQUIREMENT_SECRETS = get_args(CWL_RequirementSecretsType)[0]
CWL_REQUIREMENT_TIME_LIMIT = get_args(CWL_RequirementToolTimeLimitType)[0]
# default is to reuse, employed to explicitly disable
Expand All @@ -187,6 +189,7 @@ class OpenSearchField(Constants):
# CWL_REQUIREMENT_PROCESS_GENERATOR, # explicitly unsupported, works against Weaver's behavior
CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138)
CWL_REQUIREMENT_SCATTER,
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION,
# CWL_REQUIREMENT_SECRETS, # FIXME: support CWL Secrets (https://github.com/crim-ca/weaver/issues/511)
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE, # allow it, but makes sense only for Workflow steps if cwltool handles it by itself
Expand Down Expand Up @@ -297,6 +300,7 @@ class JobInputsOutputsSchema(Constants):
CWL_RequirementNetworkAccessType,
CWL_RequirementResourceType,
CWL_RequirementScatterFeatureType,
CWL_RequirementStepInputExpressionType,
CWL_RequirementSecretsType,
CWL_RequirementToolTimeLimitType,
CWL_RequirementWorkReuseType,
Expand Down
22 changes: 22 additions & 0 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
CWL_REQUIREMENT_NETWORK_ACCESS,
CWL_REQUIREMENT_RESOURCE,
CWL_REQUIREMENT_SCATTER,
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION,
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE,
OAS_COMPLEX_TYPES,
Expand Down Expand Up @@ -4505,6 +4506,23 @@ class ScatterFeatureRequirementClass(ScatterFeatureRequirementSpecification):
_class = RequirementClass(example=CWL_REQUIREMENT_SCATTER, validator=OneOf([CWL_REQUIREMENT_SCATTER]))


class StepInputExpressionSpecification(StrictMappingSchema):
description = inspect.cleandoc(f"""
Indicate that the workflow platform must support the 'valueFrom' field of {CWL_WORKFLOW_URL}#WorkflowStepInput.
""")


class StepInputExpressionRequirementMap(ExtendedMappingSchema):
req = StepInputExpressionSpecification(name=CWL_REQUIREMENT_STEP_INPUT_EXPRESSION)


class StepInputExpressionRequirementClass(StepInputExpressionSpecification):
_class = RequirementClass(
example=CWL_REQUIREMENT_STEP_INPUT_EXPRESSION,
validator=OneOf([CWL_REQUIREMENT_STEP_INPUT_EXPRESSION]),
)


class TimeLimitValue(OneOfKeywordSchema):
_one_of = [
ExtendedSchemaNode(Float(), validator=Range(min=0.0)),
Expand Down Expand Up @@ -4664,6 +4682,7 @@ class CWLRequirementsMap(AnyOfKeywordSchema):
NetworkAccessRequirementMap(missing=drop),
ResourceRequirementMap(missing=drop),
ScatterFeatureRequirementMap(missing=drop),
StepInputExpressionRequirementMap(missing=drop),
ToolTimeLimitRequirementMap(missing=drop),
WorkReuseRequirementMap(missing=drop),
UnknownRequirementMap(missing=drop), # allows anything, must be last
Expand All @@ -4684,6 +4703,7 @@ class CWLRequirementsItem(OneOfKeywordSchema):
NetworkAccessRequirementClass(missing=drop),
ResourceRequirementClass(missing=drop),
ScatterFeatureRequirementClass(missing=drop),
StepInputExpressionRequirementClass(missing=drop),
ToolTimeLimitRequirementClass(missing=drop),
WorkReuseRequirementClass(missing=drop),
UnknownRequirementClass(missing=drop), # allows anything, must be last
Expand Down Expand Up @@ -4714,6 +4734,7 @@ class CWLHintsMap(AnyOfKeywordSchema, PermissiveMappingSchema):
NetworkAccessRequirementMap(missing=drop),
ResourceRequirementMap(missing=drop),
ScatterFeatureRequirementMap(missing=drop),
StepInputExpressionRequirementMap(missing=drop),
ToolTimeLimitRequirementMap(missing=drop),
WorkReuseRequirementMap(missing=drop),
ESGF_CWT_RequirementMap(missing=drop),
Expand All @@ -4739,6 +4760,7 @@ class CWLHintsItem(OneOfKeywordSchema, PermissiveMappingSchema):
NetworkAccessRequirementClass(missing=drop),
ResourceRequirementClass(missing=drop),
ScatterFeatureRequirementClass(missing=drop),
StepInputExpressionRequirementClass(missing=drop),
ToolTimeLimitRequirementClass(missing=drop),
WorkReuseRequirementClass(missing=drop),
ESGF_CWT_RequirementClass(missing=drop),
Expand Down

0 comments on commit 213e8e3

Please sign in to comment.