Skip to content

Commit

Permalink
[wip] job multi-results as links inline output transitionMode conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 2, 2024
1 parent 2abe870 commit 4ea918a
Showing 1 changed file with 63 additions and 37 deletions.
100 changes: 63 additions & 37 deletions weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def get_results( # pylint: disable=R1260
value_key=None, # type: Optional[str]
schema=JobInputsOutputsSchema.OLD, # type: Optional[JobInputsOutputsSchemaType]
link_references=False, # type: bool
convert_output_transmission=False, # type: bool
): # type: (...) -> Tuple[ExecutionResults, HeadersTupleType]
"""
Obtains the job results with extended full WPS output URL as applicable and according to configuration settings.
Expand All @@ -353,6 +354,11 @@ def get_results( # pylint: disable=R1260
Selects which schema to employ for representing the output results (listing or mapping).
:param link_references:
If enabled, an output that was requested by reference instead of by value will be returned as ``Link`` header.
:param convert_output_transmission:
If disabled (default), data/link representation preserves original results as per their literal/complex type.
If enabled, an output that was requested as reference will be converted as an :term:`URL`, whereas
an output requested by value will be converted to its literal contents, both as needed according to
their original results literal/complex type.
:returns:
Tuple with:
- List or mapping of all outputs each with minimally an ID and value under the requested key.
Expand Down Expand Up @@ -392,19 +398,38 @@ def get_results( # pylint: disable=R1260
)
):
array = [value] # array of array such that it iterated as the array of literals directly
# Any other type of array implies complex data (bbox or file)
# Any other type of array implies complex data (bbox, collection, file, etc.)
# They must be defined on their own with respective media-type/format details per item.
else:
array = value if isinstance(value, list) else [value]
for val_item in array:
res_multi = len(array) > 1
for val_idx, val_item in enumerate(array):
val_data = val_item
if isinstance(val_item, dict) and isinstance(value, list):
rtype = "href" if get_any_value(val_item, key=True, file=True, data=False) else "data"
val_data = get_any_value(val_item, file=True, data=False)
if not isinstance(val_item, dict):
# use the representation that contains all metadata if possible, otherwise rely on literal data only
val_item = result if isinstance(result, dict) else {rtype: val_data}

out_key = rtype
out_mode = get_job_output_transmission(job, out_id, is_reference=(out_key == "href"))
is_ref = rtype == "href"
out_mode = get_job_output_transmission(job, out_id, is_reference=is_ref)
as_ref = link_references and out_mode == ExecuteTransmissionMode.REFERENCE
if rtype == "href" and isinstance(val_data, str):
res_id = f"{out_id}{val_idx}" if res_multi else out_id

# on-demand convertion to requested transmission mode
if convert_output_transmission:
res_hdr, val_data = generate_or_resolve_result(job, val_item, res_id, out_id, out_mode, settings)
if val_data is not None and is_ref: # data generated from reference
is_ref = as_ref = False
out_key = value_key or "data" # OGC schema overrides after as needed
elif val_data is None and not is_ref: # reference generated from data
is_ref = as_ref = True
out_key = "href"
val_data = res_hdr["Content-Location"]

if is_ref and isinstance(val_data, str):
# fix paths relative to instance endpoint,
# but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.)
if val_data.startswith("/"):
Expand All @@ -419,18 +444,17 @@ def get_results( # pylint: disable=R1260
output = {out_key: val_data}

# required for the rest to be there, other fields optional
if rtype == "href":
val_fmt = val_item if isinstance(val_item, dict) else result
if "mimeType" not in val_fmt:
val_fmt["mimeType"] = get_format(val_data, default=ContentType.TEXT_PLAIN).mime_type
if is_ref:
if "mimeType" not in val_item:
val_item["mimeType"] = get_format(val_data, default=ContentType.TEXT_PLAIN).mime_type
if ogc_api or not strict:
output["type"] = val_fmt["mimeType"]
output["type"] = val_item["mimeType"]
if not ogc_api or not strict or as_ref:
output["format"] = {fmt_key: val_fmt["mimeType"]}
output["format"] = {fmt_key: val_item["mimeType"]}
for field in ["encoding", "schema"]:
if field in result:
output["format"][field] = val_fmt[field]
elif rtype != "href":
output["format"][field] = val_item[field]
elif not is_ref:
dtype = result.get("dataType", any2wps_literal_datatype(val_data, is_value=True) or "string")
if ogc_api:
output["dataType"] = {"name": dtype}
Expand Down Expand Up @@ -558,19 +582,23 @@ def get_job_results_response(
raise_job_bad_status(job, container)

# when 'response=document', ignore 'transmissionMode=value|reference', respect it when 'response=raw'
# resolution of 'transmissionMode' for document representation will be done by its own handler function
# See:
# - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 (/req/core/job-results-async-document)
# - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-document
is_raw = get_job_return(job, results_contents, results_headers) == ExecuteResponse.RAW
# when multipart is needed (either requested explicitly or inferred), do not use references at this point
# this is to make multipart content generation simply by grouping everything under a single 'results' container
# when multipart is requested explicitly, do NOT use 'link_references' at this point
# this is to simplify multipart content generation by grouping everything under a single 'results' container
is_accept_multipart = (
isinstance(job.accept_type, str) and
any(ctype in job.accept_type for ctype in ContentType.ANY_MULTIPART)
)
results, refs = get_results(job, container, value_key="value",
schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details
link_references=is_raw and not is_accept_multipart)
results, refs = get_results(
job, container, value_key="value",
schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details
link_references=is_raw and not is_accept_multipart,
convert_output_transmission=is_raw and not is_accept_multipart,
)

headers = ResponseHeaders(headers or {})
headers.pop("Location", None)
Expand Down Expand Up @@ -607,14 +635,6 @@ def get_job_results_response(
headers.extend(refs)
return HTTPOk(json=results_json, headers=headers)

if is_raw and not is_accept_multipart:
# FIXME: convert on-demand as per requested transmissionMode
# If "raw and not multipart" (ie: link_references=True), 'results' and 'refs' at this point would
# contain a mixture of the desired output transmissionMode and the available ones as per their
# original 'literal/complex' results, but they are not ALL converted to needed transmissionMode.
# Must convert before below empty-results check to return multi-link no-content response.
pass

if not results: # avoid schema validation error if all by reference
# Status code 204 for empty body
# see:
Expand Down Expand Up @@ -675,7 +695,10 @@ def generate_or_resolve_result(
:param output_id: Generic identifier of the output containing the result.
:param output_mode: Desired output transmission mode.
:param settings: Application settings to resolve locations.
:return: Resolved locations.
:return:
Resolved headers and data (as applicable) for the result.
If only returned by reference, ``None`` data is returned. An empty-data contents would be an empty string.
Therefore, the explicit check of ``None`` is important to identify a by-reference result.
"""
key = get_any_value(result, key=True)
val = get_any_value(result)
Expand All @@ -685,17 +708,20 @@ def generate_or_resolve_result(
typ = None
res_data = None
c_length = None
is_val = key in ["value", "data"]
is_ref = key in ["href", "reference"]

# NOTE:
# work with local files (since we have them), to avoid unnecessary loopback request
# then, rewrite the locations after generating their headers to obtain the final result URL

# FIXME: Handle S3 output storage. Should multipart response even be allowed in this case?

if key == "href":
if is_ref:
url = val
typ = result.get("type") or ContentType.APP_OCTET_STREAM
loc = map_wps_output_location(val, settings, exists=True, url=False)
# FIXME: fails if output path is the "relative" results '/{jobID}/...'

if not url:
out_dir = get_wps_output_dir(settings)
Expand All @@ -704,18 +730,18 @@ def generate_or_resolve_result(
loc = os.path.join(out_dir, job_path)
url = map_wps_output_location(loc, settings, exists=False, url=True)

if key == "value":
if is_val:
res_data = io.StringIO()
c_length = res_data.write(data2str(val))
typ = result.get("mediaType") or ContentType.TEXT_PLAIN

if key == "value" and output_mode == ExecuteTransmissionMode.REFERENCE:
if is_val and output_mode == ExecuteTransmissionMode.REFERENCE:
if not os.path.isfile(loc):
os.makedirs(os.path.dirname(loc), exist_ok=True)
with open(loc, mode="w", encoding="utf-8") as out_file:
out_file.write(val)

if key == "href" and output_mode == ExecuteTransmissionMode.VALUE:
if is_ref and output_mode == ExecuteTransmissionMode.VALUE:
res_data = io.FileIO(loc, mode="rb")

res_headers = get_href_headers(
Expand Down Expand Up @@ -796,11 +822,11 @@ def make_result(result, result_id, output_id):
return value

out_results = {}
for res_id, res_val in results.items():
for out_id, res_val in results.items():
if isinstance(res_val, list):
res_data = []
for out_idx, item in enumerate(res_val):
out_id = f"{res_id}.{out_idx}"
res_id = f"{out_id}.{out_idx}"
out_res = make_result(item, res_id, out_id)
res_data.append(out_res)

Expand All @@ -815,9 +841,9 @@ def make_result(result, result_id, output_id):
]

else:
res_data = make_result(res_val, res_id, res_id)
res_data = make_result(res_val, out_id, out_id)

out_results[res_id] = res_data
out_results[out_id] = res_data
return out_results


Expand All @@ -839,7 +865,7 @@ def get_job_results_multipart(job, results, *, headers, container):

def add_result_parts(result_parts):
# type: (List[Tuple[str, str, ExecutionResultObject]]) -> MultiPartFieldsType
for res_id, out_id, result in result_parts:
for out_id, res_id, result in result_parts:
if isinstance(result, list):
sub_parts = [(out_id, f"{out_id}.{out_idx}", data) for out_idx, data in enumerate(result)]
sub_parts = add_result_parts(sub_parts)
Expand All @@ -851,15 +877,15 @@ def add_result_parts(result_parts):
"Content-Location": sub_out_url,
"Content-Disposition": f"attachment; name=\"{out_id}\"",
}
yield out_id, (None, sub_multi, None, sub_headers)
yield res_id, (None, sub_multi, None, sub_headers)

key = get_any_value(result, key=True)
mode = get_job_output_transmission(job, out_id, is_reference=(key == "href"))
res_headers, res_data = generate_or_resolve_result(job, result, res_id, out_id, mode, settings)
c_type = res_headers.get("Content-Type")
c_loc = res_headers.get("Content-Location")
c_fn = os.path.basename(c_loc) if c_loc else None
yield out_id, (c_fn, res_data, c_type, res_headers)
yield res_id, (c_fn, res_data, c_type, res_headers)

results_parts = [(_res_id, _res_id, _res_val) for _res_id, _res_val in results.items()]
results_parts = list(add_result_parts(results_parts))
Expand Down

0 comments on commit 4ea918a

Please sign in to comment.