diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 6c8d43ebb..d047cec16 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -7,16 +7,16 @@ .. seealso:: - :mod:`tests.processes.wps_package`. """ -import inspect import contextlib import copy +import inspect import json import logging import os +import re import shutil import tempfile -from inspect import cleandoc from typing import TYPE_CHECKING import boto3 @@ -1957,7 +1957,7 @@ def test_execute_job_with_array_input(self): "listing": [ { "entryname": "script.py", - "entry": cleandoc(""" + "entry": inspect.cleandoc(""" import json import os input = $(inputs) @@ -2129,7 +2129,7 @@ def test_execute_job_with_inline_input_values(self): "listing": [ { "entryname": "script.py", - "entry": cleandoc(""" + "entry": inspect.cleandoc(""" import json import os import ast @@ -3558,6 +3558,16 @@ def remove_result_format(results): result.pop("format", None) return results + @staticmethod + def remove_result_multipart_variable(results): + # type: (str) -> str + """ + Removes any variable headers from the multipart contents to simplify test comparison. + """ + results = re.sub(r"Date: .*\r\n", "", results) + results = re.sub(r"Last-Modified: .*\r\n", "", results) + return results.strip() + def test_execute_single_output_prefer_header_return_representation_literal(self): proc = "EchoResultsTester" p_id = self.fully_qualified_test_process_name(proc) @@ -3596,7 +3606,7 @@ def test_execute_single_output_prefer_header_return_representation_literal(self) outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json == { - "output_data": "test", + "output_data": {"value": "test"}, } def test_execute_single_output_prefer_header_return_representation_complex(self): @@ -3968,7 +3978,7 @@ def test_execute_single_output_multipart_accept_data(self): # validate the results based on original execution request results = resp assert ContentType.MULTIPART_MIXED in results.content_type - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json = json.dumps({"data": "test"}, separators=(",", ":")) results_body = inspect.cleandoc(f""" --{boundary} @@ -3977,8 +3987,9 @@ def test_execute_single_output_multipart_accept_data(self): {output_json} --{boundary}-- - """) - assert results.text == results_body + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { @@ -4040,15 +4051,17 @@ def test_execute_single_output_multipart_accept_link(self): # validate the results based on original execution request results = resp assert ContentType.MULTIPART_MIXED in results.content_type - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] results_body = inspect.cleandoc(f""" --{boundary} Content-Type: {ContentType.APP_JSON} Content-ID: + Content-Length: 0 Content-Location: {out_url}/{job_id}/output_json/result.json --{boundary}-- - """) - assert results.text == results_body + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { @@ -4109,18 +4122,20 @@ def test_execute_single_output_multipart_accept_alt_format(self): # validate the results based on original execution request results = resp assert ContentType.MULTIPART_MIXED in results.content_type - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json_as_yaml = yaml.safe_dump({"data": "test"}) results_body = inspect.cleandoc(f""" --{boundary} Content-Type: {ContentType.APP_YAML} Content-ID: + Content-Length: 12 {output_json_as_yaml} --{boundary}-- - """) + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results.text == results_body + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { @@ -4192,21 +4207,24 @@ def test_execute_multi_output_multipart_accept(self, multipart_header): job_id = status["jobID"] out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] results_body = inspect.cleandoc(f""" --{boundary} Content-Type: {ContentType.TEXT_PLAIN} Content-ID: + Content-Length: 4 test --{boundary} Content-Type: {ContentType.APP_JSON} Content-ID: + Content-Length: 0 Content-Location: {out_url}/{job_id}/output_json/result.json --{boundary}-- - """) + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results.text == results_body + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { @@ -4303,9 +4321,9 @@ def test_execute_multi_output_prefer_header_return_representation(self): self.deploy_process(body, process_id=p_id) exec_headers = { - "Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}, respond-async" + "Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}, respond-async", + "Content-Type": ContentType.APP_JSON, } - exec_headers.update(self.json_headers) exec_content = { "inputs": { "message": "test" @@ -4334,23 +4352,29 @@ def test_execute_multi_output_prefer_header_return_representation(self): job_id = status["jobID"] out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json = json.dumps({"data": "test"}, separators=(",", ":")) results_body = inspect.cleandoc(f""" --{boundary} + Content-Disposition: attachment; filename="output_data.txt" name="output_data" Content-Type: {ContentType.TEXT_PLAIN} Content-ID: + Content-Length: 4 test --{boundary} + Content-Disposition: attachment; filename="result.json" name="output_json" Content-Type: {ContentType.APP_JSON} + Content-Location: {out_url}/{job_id}/output_json/result.json Content-ID: + Content-Length: 16 {output_json} --{boundary}-- - """) + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results.text == results_body + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { @@ -4397,21 +4421,23 @@ def test_execute_multi_output_response_raw_value(self): job_id = status["jobID"] out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json = json.dumps({"data": "test"}, separators=(",", ":")) results_body = inspect.cleandoc(f""" --{boundary} Content-Type: {ContentType.TEXT_PLAIN} Content-ID: + Content-Length: 4 test --{boundary} Content-Type: {ContentType.APP_JSON} Content-ID: + Content-Length: 16 {output_json} --{boundary}-- - """) + """).replace("\n", "\r\n") assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results.text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) @@ -4460,20 +4486,23 @@ def test_execute_multi_output_response_raw_reference(self): job_id = status["jobID"] out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] results_body = inspect.cleandoc(f""" --{boundary} Content-Type: {ContentType.TEXT_PLAIN} Content-ID: + Content-Length: 0 Content-Location: {out_url}/{job_id}/output_data/result.txt --{boundary} Content-Type: {ContentType.APP_JSON} Content-ID: + Content-Length: 0 Content-Location: {out_url}/{job_id}/output_json/result.json --{boundary}-- - """) + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results.text == results_body + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { @@ -4521,31 +4550,37 @@ def test_execute_multi_output_response_raw_mixed(self): job_id = status["jobID"] out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") - boundary = parse_kvp(results.content_type)["boundary"][0] + boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json = json.dumps({"data": "test"}, separators=(",", ":")) results_body = inspect.cleandoc(f""" --{boundary} Content-Type: {ContentType.TEXT_PLAIN} Content-ID: + Content-Length: 4 test --{boundary} Content-Type: {ContentType.TEXT_PLAIN} Content-ID: + Content-Length: 0 Content-Location: {out_url}/{job_id}/output_text/result.txt --{boundary} Content-Type: {ContentType.APP_JSON} Content-ID: + Content-Length: 16 {output_json} --{boundary}-- - """) + """).replace("\n", "\r\n") + results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results.text == results_body + assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, "output_text": { "href": f"{out_url}/{job_id}/output_text/result.txt", "type": ContentType.TEXT_PLAIN, @@ -4608,7 +4643,9 @@ def test_execute_multi_output_prefer_header_return_minimal_defaults(self): outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, "output_json": { "href": f"{out_url}/{job_id}/output_json/result.json", "type": ContentType.APP_JSON, @@ -4680,7 +4717,9 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, "output_json": { "href": f"{out_url}/{job_id}/output_json/result.json", "type": ContentType.APP_JSON, @@ -4743,7 +4782,9 @@ def test_execute_multi_output_response_document_defaults(self): outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, "output_json": { "href": f"{out_url}/{job_id}/output_json/result.json", "type": ContentType.APP_JSON, @@ -4812,7 +4853,9 @@ def test_execute_multi_output_response_document_mixed(self): outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, "output_json": { "href": f"{out_url}/{job_id}/output_json/result.json", "type": ContentType.APP_JSON, diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index b0ae1465d..3cdb115f7 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -58,7 +58,7 @@ load_pywps_config ) from weaver.wps_restapi import swagger_definitions as sd -from weaver.wps_restapi.jobs.utils import get_job_results_response, get_job_submission_response +from weaver.wps_restapi.jobs.utils import get_job_results_response, get_job_submission_response, get_job_return from weaver.wps_restapi.processes.utils import resolve_process_tag LOGGER = logging.getLogger(__name__) @@ -794,9 +794,9 @@ def submit_job_handler(payload, # type: ProcessExecution # Prefer header not resolved with a valid value should still resume without error is_execute_async = mode != ExecuteMode.SYNC accept_type = validate_job_accept_header(headers, mode) + exec_resp = get_job_return(job=None, body=json_body, headers=headers) # job 'none' since still doing 1st parsing get_header("prefer", headers, pop=True) # don't care about value, just ensure removed with any header container - exec_resp = json_body.get("response") subscribers = map_job_subscribers(json_body, settings) job_inputs = json_body.get("inputs") job_outputs = json_body.get("outputs") @@ -867,6 +867,8 @@ def validate_job_accept_header(headers, execution_mode): # anything always allowed in sync, since results returned directly if execution_mode == ExecuteMode.SYNC: return accept + if ContentType.ANY in accept: + return raise HTTPNotAcceptable( json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize({ "type": "NotAcceptable", diff --git a/weaver/utils.py b/weaver/utils.py index 89345babe..1764a42f0 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -33,6 +33,7 @@ from beaker.container import MemoryNamespaceManager from beaker.exceptions import BeakerException from botocore.config import Config as S3Config +from botocore.exceptions import ClientError, HTTPClientError from bs4 import BeautifulSoup from celery.app import Celery from mypy_boto3_s3.literals import RegionName @@ -1193,7 +1194,9 @@ def get_href_headers( content_type=None, # type: Optional[str] content_disposition_type="attachment", # type: Literal["attachment", "inline"] content_location=None, # type: Optional[str] + content_name=None, # type: Optional[str] content_id=None, # type: Optional[str] + missing_ok=False, # type: bool settings=None, # type: Optional[SettingsType] **option_kwargs, # type: Unpack[Union[SchemeOptions, RequestOptions]] ): # type: (...) -> MetadataResult @@ -1219,9 +1222,19 @@ def get_href_headers( :param content_location: Override ``Content-Location`` to include in headers. Otherwise, defaults to the :paramref:`path`. Requires that :paramref:`location_headers` and :paramref:`content_headers` are enabled in each case. + :param content_name: + Optional ``name`` parameter to assign in the ``Content-Disposition`` header. + Requires that :paramref:`content_headers` and :paramref:`download_headers` are enabled. :param content_id: Optional ``Content-ID`` to include in the headers. Requires that :paramref:`content_headers` is enabled. + This should be a uniquely identifiable reference *across the server* (not just within a specific response), + which can be used for cross-referencing by ``{cid:<>}`` within and between multipart document contents. + For a generic ID or field name, employ :paramref:`content_name` instead. + :param missing_ok: + If the referenced resource does not exist (locally or remotely as applicable), and that content information + to describe it cannot be retrieved, either raise an error (default) or resume with the minimal information + details that could be resolved. :param settings: Application settings to pass down to relevant utility functions. :return: Headers for the reference. """ @@ -1229,6 +1242,9 @@ def get_href_headers( if not any(href.startswith(proto) for proto in ["file", "http", "https", "s3"]): href = f"file://{os.path.abspath(path)}" f_enc = None + f_size = None + f_type = None + f_modified = None # handle directory if path.endswith("/"): @@ -1245,30 +1261,39 @@ def get_href_headers( options["http"].update(**configs) if path.startswith("s3://") or path.startswith("https://s3."): - s3_params = resolve_s3_http_options(**options["http"], **kwargs) - s3_region = options["s3"].pop("region_name", None) - s3_client = boto3.client("s3", region_name=s3_region, **s3_params) # type: S3Client - s3_bucket, file_key = path[5:].split("/", 1) - s3_file = s3_client.head_object(Bucket=s3_bucket, Key=file_key) - f_type = content_type or s3_file["ResponseMetadata"]["HTTPHeaders"]["ContentType"] - f_size = s3_file["ResponseMetadata"]["HTTPHeaders"]["Size"] - f_modified = s3_file["ResponseMetadata"]["HTTPHeaders"]["LastModified"] + try: + s3_params = resolve_s3_http_options(**options["http"], **kwargs) + s3_region = options["s3"].pop("region_name", None) + s3_client = boto3.client("s3", region_name=s3_region, **s3_params) # type: S3Client + s3_bucket, file_key = path[5:].split("/", 1) + s3_file = s3_client.head_object(Bucket=s3_bucket, Key=file_key) + f_type = content_type or s3_file["ResponseMetadata"]["HTTPHeaders"]["ContentType"] + f_size = s3_file["ResponseMetadata"]["HTTPHeaders"]["Size"] + f_modified = s3_file["ResponseMetadata"]["HTTPHeaders"]["LastModified"] + except (ClientError, HTTPClientError): + if not missing_ok: + raise elif path.startswith("http://") or path.startswith("https://"): resp = request_extra("HEAD", href, **options["http"]) - if not resp.status_code == 200: + if resp.status_code != 200 and not missing_ok: raise ValueError(f"Could not obtain file reference metadata from [{href}]") - f_modified = resp.last_modified - f_type = content_type or resp.content_type - f_size = resp.content_length - f_enc = resp.content_encoding + if resp.status_code == 200: + f_modified = resp.last_modified + f_type = content_type or resp.content_type + f_size = resp.content_length + f_enc = resp.content_encoding else: - path = path.split("file://", 1)[-1] - stat = os.stat(path) - f_type = content_type - f_size = stat.st_size - f_modified = datetime.fromtimestamp(stat.st_mtime) + try: + path = path.split("file://", 1)[-1] + stat = os.stat(path) + f_type = content_type + f_size = stat.st_size + f_modified = datetime.fromtimestamp(stat.st_mtime) + except OSError: + if not missing_ok: + raise headers = {} if content_headers: @@ -1278,32 +1303,33 @@ def get_href_headers( if location_headers: headers["Content-Location"] = content_location or href c_type, c_enc = guess_file_contents(href) - if not f_type: + f_type = f_type or content_type # in case of error, all above failed, use provided content-type if any + if not f_type: # last resort, guess from file path if c_type == ContentType.APP_OCTET_STREAM: # default f_ext = os.path.splitext(path)[-1] f_type = get_content_type(f_ext, charset="UTF-8", default=ContentType.APP_OCTET_STREAM) else: f_type = c_type - if not f_enc: - f_enc = c_enc + f_enc = f_enc or c_enc or "" headers.update({ "Content-Type": f_type, - "Content-Encoding": f_enc or "", - "Content-Length": str(f_size), + "Content-Encoding": f_enc, }) + if f_size is not None: + headers["Content-Length"] = str(f_size) if download_headers: if os.path.splitext(path)[-1] in ["", "."]: f_ext = get_extension(f_type, dot=True) path = f"{path}{f_ext}" - headers.update({ - "Content-Disposition": f"{content_disposition_type}; filename=\"{os.path.basename(path)}\"", - }) + content_disposition_params = f"filename=\"{os.path.basename(path)}\"" + if content_name: + content_disposition_params += f"; name=\"{content_name}\"" + headers["Content-Disposition"] = f"{content_disposition_type}; {content_disposition_params}" f_current = get_file_header_datetime(now()) - f_modified = get_file_header_datetime(f_modified) - headers.update({ - "Date": f_current, - "Last-Modified": f_modified, - }) + headers["Date"] = f_current + if f_modified: + f_modified = get_file_header_datetime(f_modified) + headers["Last-Modified"] = f_modified return headers diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 012fb9e7f..fd39a8ece 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -4,9 +4,6 @@ import os import shutil from copy import deepcopy -from email.message import MIMEPart -from email.mime.multipart import MIMEMultipart -from email.policy import HTTP as PolicyHTTP from typing import TYPE_CHECKING, cast import colander @@ -64,7 +61,7 @@ from weaver.wps_restapi.providers.utils import forbid_local_only if TYPE_CHECKING: - from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union + from typing import Any, Dict, List, Optional, Sequence, Tuple, Union from weaver.execute import AnyExecuteResponse, AnyExecuteTransmissionMode from weaver.processes.constants import JobInputsOutputsSchemaType @@ -83,7 +80,6 @@ HeadersTupleType, HeadersType, JSON, - Path, PyramidRequest, SettingsType ) @@ -93,9 +89,9 @@ # filename, data/io Tuple[Optional[str], AnyDataStream], # filename, data/io, content-type - Tuple[Optional[str], AnyDataStream, Optional[str], HeadersType], + Tuple[Optional[str], AnyDataStream, Optional[str]], # filename, data/io, content-type, headers - Tuple[Optional[str], AnyDataStream, str, HeadersType], + Tuple[Optional[str], AnyDataStream, Optional[str], HeadersType], ] MultiPartFieldsType = Sequence[Tuple[str, MultiPartFieldsParamsType]] @@ -471,10 +467,14 @@ def get_results( # pylint: disable=R1260 return outputs, headers -def get_job_return(job, body=None, headers=None): - # type: (Job, Optional[JSON], Optional[AnyHeadersContainer]) -> AnyExecuteResponse +def get_job_return(job=None, body=None, headers=None): + # type: (Optional[Job], Optional[JSON], Optional[AnyHeadersContainer]) -> AnyExecuteResponse """ Obtain the :term:`Job` result representation based on the resolution order of preferences and request parameters. + + Body and header parameters are considered first, in case they provide 'overrides' for the active request. + Then, if the :paramref:`job` was already parsed from the original request, and contains pre-resolved return, + this format is employed. When doing the initial parsing, ``job=None`` MUST be used. """ body = body or {} resp = ExecuteResponse.get(body.get("response")) @@ -487,6 +487,8 @@ def get_job_return(job, body=None, headers=None): if pref == ExecuteReturnPreference.REPRESENTATION: return ExecuteResponse.RAW + if not job: + return ExecuteResponse.DOCUMENT return job.execution_response @@ -595,6 +597,11 @@ def get_job_results_response( "value": repr_json(exc.value), }) ) + + # simplify data literals if qualified value representation is not needed + # use deserialized contents such that only the applicable fields remain + results_json = get_job_results_simplified(results_json) + # note: # Cannot add "links" field in response body because variable Output ID keys are directly at the root # Possible conflict with an output that would be named "links". @@ -642,6 +649,28 @@ def get_job_results_response( return resp +def get_job_results_simplified(results): + # type: (ExecutionResults) -> ExecutionResults + """ + Removes nested literal value definitions if qualified value representation is not needed. + + Qualified value representation is not needed if no other field than ``value`` is provided with the literal data. + The simplification is applied for both literals on their own and nested array of literals. + """ + out_results = {} + for res_id, res_val in results.items(): + if isinstance(res_val, dict) and list(res_val) == ["value"]: + out_results[res_id] = res_val["value"] + elif isinstance(res_val, list): + out_results[res_id] = [ + item["value"] if isinstance(item, dict) and list(item) == ["value"] else item + for item in res_val + ] + else: + out_results[res_id] = res_val + return out_results + + def generate_or_resolve_result( job, # type: Job result, # type: ExecutionResultObject @@ -668,6 +697,7 @@ def generate_or_resolve_result( loc = None typ = None res_data = None + c_length = None # NOTE: # work with local files (since we have them), to avoid unnecessary loopback request @@ -678,18 +708,18 @@ def generate_or_resolve_result( if key == "href": url = val typ = result.get("type") or ContentType.APP_OCTET_STREAM - loc = map_wps_output_location(val, settings, exists=True, file_scheme=True, url=False) + loc = map_wps_output_location(val, settings, exists=True, url=False) if not url: out_dir = get_wps_output_dir(settings) out_name = f"{result_id}.txt" job_path = job.result_path(output_id=output_id, file_name=out_name) loc = os.path.join(out_dir, job_path) - url = map_wps_output_location(loc, settings, exists=True, url=True) + url = map_wps_output_location(loc, settings, exists=False, url=True) if key == "value": res_data = io.StringIO() - res_data.read(val) + c_length = res_data.write(val) typ = ContentType.TEXT_PLAIN if key == "value" and output_mode == ExecuteTransmissionMode.REFERENCE: @@ -700,22 +730,25 @@ def generate_or_resolve_result( if key == "href" and output_mode == ExecuteTransmissionMode.VALUE: res_data = io.FileIO(loc, mode="rb") - # with open(loc, mode="rb") as out_file: - # res_data = out_file.read() res_headers = get_href_headers( loc, download_headers=True, + missing_ok=True, # only basic details if file does not exist content_headers=True, content_type=typ, content_id=cid, - content_location=url, # rewrite back the original URL + content_name=result_id, + content_location=url, # rewrite back the original URL settings=settings, ) + if output_mode == ExecuteTransmissionMode.VALUE and not res_headers.get("Content-Length") and c_length is not None: + res_headers["Content-Length"] = str(c_length) if output_mode == ExecuteTransmissionMode.REFERENCE: res_data = None res_headers["Content-Length"] = "0" - + if not os.path.exists(loc): + res_headers.pop("Content-Location", None) return res_headers, res_data @@ -725,7 +758,8 @@ def get_job_results_multipart(job, results, headers, container): Generates the :term:`Job` results multipart response from available or requested outputs. .. seealso:: - Function :func:`get_results` should be used to avoid re-processing all output format combinations. + - Function :func:`get_results` should be used to avoid re-processing all output format combinations. + - Details of ``multipart`` (:rfc:`2046#section-5.1`) :term:`Media-Type` family. :param job: :param results: Pre-filtered and pre-processed results in a normalized format structure. @@ -734,62 +768,36 @@ def get_job_results_multipart(job, results, headers, container): """ settings = get_settings(container) - # class AnyMultipartEncoder(MultipartEncoder): - # def __init__(self, fields, content_type=ContentType.MULTIPART_MIXED, **kwargs): - # # type: (MultiPartFieldsType, str, **str) -> None - # super().__init__(fields, **kwargs) - # self._content_type = content_type - # - # @property - # def content_type(self): - # return f"{self._content_type}; boundary=\"{self.boundary_value}\"" - def add_result_parts(result_parts): # type: (List[Tuple[str, str, ExecutionResultObject]]) -> MultiPartFieldsType - #### type: (List[Tuple[str, str, ExecutionResultObject]]) -> MIMEMultipart - - ##multi = AnyMultipartEncoder("mixed", policy=PolicyHTTP) for res_id, out_id, result in result_parts: if isinstance(result, list): sub_parts = [(f"{out_id}.{i}", out_id, data) for i, data in enumerate(result)] sub_parts = add_result_parts(sub_parts) - sub_multi = MultipartEncoder(sub_parts) + sub_multi = MultipartEncoder(sub_parts, content_type=ContentType.MULTIPART_MIXED) sub_out_url = job.result_path(output_id=out_id) sub_headers = { "Content-Type": sub_multi.content_type, "Content-ID": f"<{out_id}@{job.id}>", - "Content-Location": sub_out_url + "Content-Location": sub_out_url, + "Content-Disposition": f"attachment; name=\"{out_id}\"", } yield out_id, (None, sub_multi, None, sub_headers) - ###part = add_result_parts(sub_parts) - ###multi.attach(part) - ##continue key = get_any_value(result, key=True) mode = get_job_output_transmission(job, out_id, is_reference=(key == "href")) res_headers, res_data = generate_or_resolve_result(job, result, res_id, out_id, mode, settings) - yield out_id, (None, res_data, None, res_headers) - - # part = MIMEPart(policy=PolicyHTTP) - # for hdr_key, hdr_val in res_headers.items(): - # if hdr_val: - # part.add_header(hdr_key, hdr_val) - # if res_data: - # part.set_payload(res_data) - ###multi.attach(part) - ##return multi + c_type = res_headers.get("Content-Type") + c_loc = res_headers.get("Content-Location") + c_fn = os.path.basename(c_loc) if c_loc else None + yield out_id, (c_fn, res_data, c_type, res_headers) results_parts = [(_res_id, _res_id, _res_val) for _res_id, _res_val in results.items()] results_parts = list(add_result_parts(results_parts)) - #res_multi = AnyMultipartEncoder(results_parts) - res_multi = MultipartEncoder(results_parts) - ##resp_ctype = f"{res_multi.get_content_type()}; boundary=\"{res_multi.get_boundary()}\"" + res_multi = MultipartEncoder(results_parts, content_type=ContentType.MULTIPART_MIXED) resp_headers = headers or {} - ##resp_headers.update({"Content-Type": resp_ctype}) resp_headers.update({"Content-Type": res_multi.content_type}) resp = HTTPOk(detail=f"Multipart Response for {job}", headers=resp_headers) - # drop generator contents that includes its own headers in the body, only keep nested parts - ###resp.body = res_multi.as_bytes(policy=PolicyHTTP).split(res_multi.policy.linesep.encode(), 3)[-1] resp.body = res_multi.read() return resp diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 8aa202222..fd1e09a6f 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -4174,8 +4174,7 @@ class Execute(ExecuteInputOutputs): validator=OneOf(ExecuteMode.values()) ) response = JobResponseOptionsEnum( - missing=drop, - default=ExecuteResponse.DOCUMENT, + missing=drop, # no default to ensure 'Prefer' header vs 'response' body resolution order can be performed description=( "Indicates the desired representation format of the response. " f"(see for more details: {DOC_URL}/processes.html#execution-body)." @@ -5831,22 +5830,46 @@ class JobOutputReference(ExtendedMappingSchema): format = FormatSelection(missing=drop) -class JobOutputValue(OneOfKeywordSchema): +class JobOutputArrayReference(ExtendedSequenceSchema): + item = JobOutputReference() + + +class JobOutputQualifiedValueLiteral(Format): + value = AnyLiteralType() + mediaType = MediaType(missing=drop, example=ContentType.APP_JSON) # override for optional, others already optional + format = FormatSelection(missing=drop) + + +class JobOutputQualifiedDataLiteral(Format): + data = AnyLiteralType() + mediaType = MediaType(missing=drop, example=ContentType.APP_JSON) # override for optional, others already optional + format = FormatSelection(missing=drop) + + +class JobOutputLiteral(OneOfKeywordSchema): _one_of = [ - JobOutputReference(tilte="JobOutputReference"), - AnyLiteralDataType(title="JobOutputLiteral") + # AnyLiteralType(), # NOTE: purposely omit value inline, always embed in 'value' or 'data' for job outputs + JobOutputQualifiedDataLiteral(), + JobOutputQualifiedValueLiteral(), ] -class JobOutput(AllOfKeywordSchema): - _all_of = [ - OutputIdentifierType(), - JobOutputValue(), +class JobOutputArrayLiteral(ExtendedSequenceSchema): + item = JobOutputLiteral() + + +class JobOutputValueObject(OneOfKeywordSchema): + _one_of = [ + JobOutputReference(), + JobOutputLiteral(), + # array possible since nested object under 'id' + JobOutputArrayReference(), + JobOutputArrayLiteral(), ] class JobOutputMap(ExtendedMappingSchema): - output_id = JobOutputValue( + output_id = JobOutputValueObject( variable="{output-id}", title="JobOutputData", description=( "Output data as literal value or file reference. " @@ -5855,12 +5878,28 @@ class JobOutputMap(ExtendedMappingSchema): ) +class JobOutputFields(OneOfKeywordSchema): + _one_of = [ + JobOutputReference(), + JobOutputQualifiedDataLiteral(), + JobOutputQualifiedValueLiteral(), + ] + + +class JobOutputItem(AllOfKeywordSchema): + _all_of = [ + OutputIdentifierType(), + JobOutputFields(), # cannot be an array directly, since 'id' field needed in this representation + ] + + class JobOutputList(ExtendedSequenceSchema): title = "JobOutputList" - output = JobOutput(description="Job output result with specific keyword according to represented format.") + output = JobOutputItem(description="Job output result with specific keyword according to represented format.") class JobOutputs(OneOfKeywordSchema): + description = "Job outputs with many alternate representations according to the specified 'schema' query parameter." _one_of = [ JobOutputMap(), JobOutputList(),