Skip to content

Commit

Permalink
[wip] resolving workflow step outputBinding.glob pattern chaining fro…
Browse files Browse the repository at this point in the history
…m CWL/WPS outputs
  • Loading branch information
fmigneault committed Sep 21, 2023
1 parent e654184 commit 80f322f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ processDescription:
- mimeType: text/plain
default: true
minOccurs: 1
maxOccurs: "unbounded"
outputs:
- id: output_files
formats:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ inputs:
position: 1
outputs:
output_files:
# NOTE: always one, but using array to allow chaining itself any amount of times
type:
type: array
items: File
Expand Down
3 changes: 1 addition & 2 deletions tests/processes/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
.. seealso::
- :mod:`tests.functional.wps_package`.
"""
import warnings

import contextlib
import copy
import io
Expand All @@ -16,6 +14,7 @@
import shutil
import sys
import tempfile
import warnings
from typing import TYPE_CHECKING

import cwltool.process
Expand Down
25 changes: 19 additions & 6 deletions weaver/processes/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select):
"""
Converts the :term:`WPS`-like I/O definition and defines them inplace into the :term:`CWL` containers.
.. seealso::
See :meth:`weaver.processes.wps_process_base.WpsProcessInterface.stage_results` which thightly interacts
with the produced ``outputBinding.glob`` patterns generated here. Methodology should align between them.
:param cwl_io: Basic :term:`CWL` I/O container (only ID needed) where to write conversion results.
:param cwl_ns: Namespaces to gradually update when encountering new format Media-Type definitions.
:param wps_io: Original :term:`WPS`-like I/O to be converted.
Expand Down Expand Up @@ -583,12 +587,21 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select):
cwl_io_ext = [cwl_io_any]

# Method 'weaver.processes.wps_process_base.WpsProcessInterface.stage_results' uses the produced glob
# pattern below of generated output definitions from WPS items that don't offer any hint about the expected
# file naming format or specification.
# Require that the file is nested in a directory named as the output ID (to isolate against conflict by
# other outputs) and has the expected extension based on the file format/schema/media-type.
# Any character can be employed for the file name within the sub-dir as generated by the remote process.
cwl_glob = [f"{cwl_id}/*{ext}" for ext in cwl_io_ext]
# pattern(s) below of generated output definitions from WPS items that don't offer any hint about the
# expected file naming format or specification (because we cannot guess what will be produced as output
# from the remote process definitions alone). We can only provide expected extension based on the file
# format/schema/media-type of the output definition.
# To avoid potential naming clashes or conflicting matching from generic patterns when CWL tries to resolve
# paths, that staging operation stage outputs and adjust each glob pattern under a directory named by the
# respective output ID.
# However, it is very important **NOT** to add the output ID directory nesting approach here, otherwise it
# will confuse the staging process between Workflow steps, since it won't be able to distinguish whether the
# nesting was already applied by Weaver (here), or provided by an user-provided CWL Application Package, since
# WPS-based. OGC-based, CWL-based, (or any future implementation) can be combined within a same Workflow.
cwl_glob = [
f"*{ext}" if ext != "/" else "./" # handle special case of "extension" for 'Directory' type
for ext in cwl_io_ext
]
cwl_io["outputBinding"] = {
"glob": cwl_glob[0] if len(cwl_glob) == 1 else cwl_glob
}
Expand Down
1 change: 0 additions & 1 deletion weaver/processes/wps1_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def __init__(self,
_message, _progress, _status, self.provider, *args, **kwargs
)
)
self.stage_output_id_nested = True # set after __init__ to avoid reset by base class

def format_inputs(self, workflow_inputs):
# type: (CWL_RuntimeInputList) -> OWS_InputDataValues
Expand Down
6 changes: 3 additions & 3 deletions weaver/processes/wps_process_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(self, request, update_status):
self.settings = get_settings()
self.update_status = update_status # type: UpdateStatusPartialFunction
self.temp_staging = set()
self.stage_output_id_nested = False
self.stage_output_id_nested = True # FIXME: deprecate everywhere (force: True) to unify procedure

def execute(self, workflow_inputs, out_dir, expected_outputs):
# type: (CWL_RuntimeInputsMap, str, CWL_ExpectedOutputs) -> None
Expand Down Expand Up @@ -329,8 +329,8 @@ def stage_results(self, results, expected_outputs, out_dir):
We cannot rely on specific file names to be mapped, since glob can match many (eg: ``"*.txt"``).
.. seealso::
Function :func:`weaver.processes.convert.any2cwl_io` defines a generic glob pattern using the output ID
and expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process`
Function :func:`weaver.processes.convert._convert_any2cwl_io_complex` defines a generic glob pattern from
the expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process`
doesn't necessarily produce file names with the output ID as expected to find them (could be anything),
staging must patch locations to let :term:`CWL` runtime resolve the files according to glob definitions.
Expand Down
20 changes: 14 additions & 6 deletions weaver/processes/wps_workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections.abc
import logging
import os
import tempfile
from functools import partial
from typing import TYPE_CHECKING, cast # these are actually used in the code
Expand Down Expand Up @@ -186,19 +187,26 @@ def collect_output(
ignoring any nested dirs where the modified *outputBindings* definition will be able to match as if each
step :term:`Process` outputs were generated locally.
"""
# if "outputBinding" in schema and "glob" in schema["outputBinding"]:
# # in case of Directory collection with '<dir>/', use '.' because cwltool replaces it by the outdir
# glob = schema["outputBinding"]["glob"]
# glob = os.path.split(glob)[-1] or "."
# schema["outputBinding"]["glob"] = glob
if "outputBinding" in schema and "glob" in schema["outputBinding"]:
glob = schema["outputBinding"]["glob"]
glob_list = isinstance(glob, list)
glob = glob if isinstance(glob, list) else [glob]
out_id = schema["id"].rsplit("#", 1)[-1]
glob_spec = []
for glob_item in glob:
if glob_item.startswith(outdir):
# if equal -> '.', which is identical to what CWL '<dir>/.' expects for a dir entry
glob_item = os.path.relpath(glob_item, outdir)
glob_spec.append(os.path.join(out_id, glob_item))
schema["outputBinding"]["glob"] = glob_spec if glob_list else glob_spec[0]
output = super(WpsWorkflow, self).collect_output(
schema,
builder,
outdir,
fs_access,
compute_checksum=compute_checksum,
)
return output or {}
return output


class WpsWorkflowJob(CommandLineJob):
Expand Down

0 comments on commit 80f322f

Please sign in to comment.