From 4ea918ae22e493e248868e1fbb369afb6e3754e8 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 2 Oct 2024 15:54:00 -0400 Subject: [PATCH] [wip] job multi-results as links inline output transitionMode conversion --- weaver/wps_restapi/jobs/utils.py | 100 +++++++++++++++++++------------ 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 01a4be7ab..ba4c349c6 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -340,6 +340,7 @@ def get_results( # pylint: disable=R1260 value_key=None, # type: Optional[str] schema=JobInputsOutputsSchema.OLD, # type: Optional[JobInputsOutputsSchemaType] link_references=False, # type: bool + convert_output_transmission=False, # type: bool ): # type: (...) -> Tuple[ExecutionResults, HeadersTupleType] """ Obtains the job results with extended full WPS output URL as applicable and according to configuration settings. @@ -353,6 +354,11 @@ def get_results( # pylint: disable=R1260 Selects which schema to employ for representing the output results (listing or mapping). :param link_references: If enabled, an output that was requested by reference instead of by value will be returned as ``Link`` header. + :param convert_output_transmission: + If disabled (default), data/link representation preserves original results as per their literal/complex type. + If enabled, an output that was requested as reference will be converted as an :term:`URL`, whereas + an output requested by value will be converted to its literal contents, both as needed according to + their original results literal/complex type. :returns: Tuple with: - List or mapping of all outputs each with minimally an ID and value under the requested key. @@ -392,19 +398,38 @@ def get_results( # pylint: disable=R1260 ) ): array = [value] # array of array such that it iterated as the array of literals directly - # Any other type of array implies complex data (bbox or file) + # Any other type of array implies complex data (bbox, collection, file, etc.) # They must be defined on their own with respective media-type/format details per item. else: array = value if isinstance(value, list) else [value] - for val_item in array: + res_multi = len(array) > 1 + for val_idx, val_item in enumerate(array): val_data = val_item if isinstance(val_item, dict) and isinstance(value, list): rtype = "href" if get_any_value(val_item, key=True, file=True, data=False) else "data" val_data = get_any_value(val_item, file=True, data=False) + if not isinstance(val_item, dict): + # use the representation that contains all metadata if possible, otherwise rely on literal data only + val_item = result if isinstance(result, dict) else {rtype: val_data} + out_key = rtype - out_mode = get_job_output_transmission(job, out_id, is_reference=(out_key == "href")) + is_ref = rtype == "href" + out_mode = get_job_output_transmission(job, out_id, is_reference=is_ref) as_ref = link_references and out_mode == ExecuteTransmissionMode.REFERENCE - if rtype == "href" and isinstance(val_data, str): + res_id = f"{out_id}{val_idx}" if res_multi else out_id + + # on-demand convertion to requested transmission mode + if convert_output_transmission: + res_hdr, val_data = generate_or_resolve_result(job, val_item, res_id, out_id, out_mode, settings) + if val_data is not None and is_ref: # data generated from reference + is_ref = as_ref = False + out_key = value_key or "data" # OGC schema overrides after as needed + elif val_data is None and not is_ref: # reference generated from data + is_ref = as_ref = True + out_key = "href" + val_data = res_hdr["Content-Location"] + + if is_ref and isinstance(val_data, str): # fix paths relative to instance endpoint, # but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.) if val_data.startswith("/"): @@ -419,18 +444,17 @@ def get_results( # pylint: disable=R1260 output = {out_key: val_data} # required for the rest to be there, other fields optional - if rtype == "href": - val_fmt = val_item if isinstance(val_item, dict) else result - if "mimeType" not in val_fmt: - val_fmt["mimeType"] = get_format(val_data, default=ContentType.TEXT_PLAIN).mime_type + if is_ref: + if "mimeType" not in val_item: + val_item["mimeType"] = get_format(val_data, default=ContentType.TEXT_PLAIN).mime_type if ogc_api or not strict: - output["type"] = val_fmt["mimeType"] + output["type"] = val_item["mimeType"] if not ogc_api or not strict or as_ref: - output["format"] = {fmt_key: val_fmt["mimeType"]} + output["format"] = {fmt_key: val_item["mimeType"]} for field in ["encoding", "schema"]: if field in result: - output["format"][field] = val_fmt[field] - elif rtype != "href": + output["format"][field] = val_item[field] + elif not is_ref: dtype = result.get("dataType", any2wps_literal_datatype(val_data, is_value=True) or "string") if ogc_api: output["dataType"] = {"name": dtype} @@ -558,19 +582,23 @@ def get_job_results_response( raise_job_bad_status(job, container) # when 'response=document', ignore 'transmissionMode=value|reference', respect it when 'response=raw' + # resolution of 'transmissionMode' for document representation will be done by its own handler function # See: # - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 (/req/core/job-results-async-document) # - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-document is_raw = get_job_return(job, results_contents, results_headers) == ExecuteResponse.RAW - # when multipart is needed (either requested explicitly or inferred), do not use references at this point - # this is to make multipart content generation simply by grouping everything under a single 'results' container + # when multipart is requested explicitly, do NOT use 'link_references' at this point + # this is to simplify multipart content generation by grouping everything under a single 'results' container is_accept_multipart = ( isinstance(job.accept_type, str) and any(ctype in job.accept_type for ctype in ContentType.ANY_MULTIPART) ) - results, refs = get_results(job, container, value_key="value", - schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details - link_references=is_raw and not is_accept_multipart) + results, refs = get_results( + job, container, value_key="value", + schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details + link_references=is_raw and not is_accept_multipart, + convert_output_transmission=is_raw and not is_accept_multipart, + ) headers = ResponseHeaders(headers or {}) headers.pop("Location", None) @@ -607,14 +635,6 @@ def get_job_results_response( headers.extend(refs) return HTTPOk(json=results_json, headers=headers) - if is_raw and not is_accept_multipart: - # FIXME: convert on-demand as per requested transmissionMode - # If "raw and not multipart" (ie: link_references=True), 'results' and 'refs' at this point would - # contain a mixture of the desired output transmissionMode and the available ones as per their - # original 'literal/complex' results, but they are not ALL converted to needed transmissionMode. - # Must convert before below empty-results check to return multi-link no-content response. - pass - if not results: # avoid schema validation error if all by reference # Status code 204 for empty body # see: @@ -675,7 +695,10 @@ def generate_or_resolve_result( :param output_id: Generic identifier of the output containing the result. :param output_mode: Desired output transmission mode. :param settings: Application settings to resolve locations. - :return: Resolved locations. + :return: + Resolved headers and data (as applicable) for the result. + If only returned by reference, ``None`` data is returned. An empty-data contents would be an empty string. + Therefore, the explicit check of ``None`` is important to identify a by-reference result. """ key = get_any_value(result, key=True) val = get_any_value(result) @@ -685,6 +708,8 @@ def generate_or_resolve_result( typ = None res_data = None c_length = None + is_val = key in ["value", "data"] + is_ref = key in ["href", "reference"] # NOTE: # work with local files (since we have them), to avoid unnecessary loopback request @@ -692,10 +717,11 @@ def generate_or_resolve_result( # FIXME: Handle S3 output storage. Should multipart response even be allowed in this case? - if key == "href": + if is_ref: url = val typ = result.get("type") or ContentType.APP_OCTET_STREAM loc = map_wps_output_location(val, settings, exists=True, url=False) + # FIXME: fails if output path is the "relative" results '/{jobID}/...' if not url: out_dir = get_wps_output_dir(settings) @@ -704,18 +730,18 @@ def generate_or_resolve_result( loc = os.path.join(out_dir, job_path) url = map_wps_output_location(loc, settings, exists=False, url=True) - if key == "value": + if is_val: res_data = io.StringIO() c_length = res_data.write(data2str(val)) typ = result.get("mediaType") or ContentType.TEXT_PLAIN - if key == "value" and output_mode == ExecuteTransmissionMode.REFERENCE: + if is_val and output_mode == ExecuteTransmissionMode.REFERENCE: if not os.path.isfile(loc): os.makedirs(os.path.dirname(loc), exist_ok=True) with open(loc, mode="w", encoding="utf-8") as out_file: out_file.write(val) - if key == "href" and output_mode == ExecuteTransmissionMode.VALUE: + if is_ref and output_mode == ExecuteTransmissionMode.VALUE: res_data = io.FileIO(loc, mode="rb") res_headers = get_href_headers( @@ -796,11 +822,11 @@ def make_result(result, result_id, output_id): return value out_results = {} - for res_id, res_val in results.items(): + for out_id, res_val in results.items(): if isinstance(res_val, list): res_data = [] for out_idx, item in enumerate(res_val): - out_id = f"{res_id}.{out_idx}" + res_id = f"{out_id}.{out_idx}" out_res = make_result(item, res_id, out_id) res_data.append(out_res) @@ -815,9 +841,9 @@ def make_result(result, result_id, output_id): ] else: - res_data = make_result(res_val, res_id, res_id) + res_data = make_result(res_val, out_id, out_id) - out_results[res_id] = res_data + out_results[out_id] = res_data return out_results @@ -839,7 +865,7 @@ def get_job_results_multipart(job, results, *, headers, container): def add_result_parts(result_parts): # type: (List[Tuple[str, str, ExecutionResultObject]]) -> MultiPartFieldsType - for res_id, out_id, result in result_parts: + for out_id, res_id, result in result_parts: if isinstance(result, list): sub_parts = [(out_id, f"{out_id}.{out_idx}", data) for out_idx, data in enumerate(result)] sub_parts = add_result_parts(sub_parts) @@ -851,7 +877,7 @@ def add_result_parts(result_parts): "Content-Location": sub_out_url, "Content-Disposition": f"attachment; name=\"{out_id}\"", } - yield out_id, (None, sub_multi, None, sub_headers) + yield res_id, (None, sub_multi, None, sub_headers) key = get_any_value(result, key=True) mode = get_job_output_transmission(job, out_id, is_reference=(key == "href")) @@ -859,7 +885,7 @@ def add_result_parts(result_parts): c_type = res_headers.get("Content-Type") c_loc = res_headers.get("Content-Location") c_fn = os.path.basename(c_loc) if c_loc else None - yield out_id, (c_fn, res_data, c_type, res_headers) + yield res_id, (c_fn, res_data, c_type, res_headers) results_parts = [(_res_id, _res_id, _res_val) for _res_id, _res_val in results.items()] results_parts = list(add_result_parts(results_parts))