Skip to content

Commit

Permalink
fix multipart generation - WIP tests results checks
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Sep 25, 2024
1 parent 1d87f58 commit fc75d93
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 130 deletions.
113 changes: 78 additions & 35 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
.. seealso::
- :mod:`tests.processes.wps_package`.
"""
import inspect

import contextlib
import copy
import inspect
import json
import logging
import os
import re
import shutil
import tempfile
from inspect import cleandoc
from typing import TYPE_CHECKING

import boto3
Expand Down Expand Up @@ -1957,7 +1957,7 @@ def test_execute_job_with_array_input(self):
"listing": [
{
"entryname": "script.py",
"entry": cleandoc("""
"entry": inspect.cleandoc("""
import json
import os
input = $(inputs)
Expand Down Expand Up @@ -2129,7 +2129,7 @@ def test_execute_job_with_inline_input_values(self):
"listing": [
{
"entryname": "script.py",
"entry": cleandoc("""
"entry": inspect.cleandoc("""
import json
import os
import ast
Expand Down Expand Up @@ -3558,6 +3558,16 @@ def remove_result_format(results):
result.pop("format", None)
return results

@staticmethod
def remove_result_multipart_variable(results):
# type: (str) -> str
"""
Removes any variable headers from the multipart contents to simplify test comparison.
"""
results = re.sub(r"Date: .*\r\n", "", results)
results = re.sub(r"Last-Modified: .*\r\n", "", results)
return results.strip()

def test_execute_single_output_prefer_header_return_representation_literal(self):
proc = "EchoResultsTester"
p_id = self.fully_qualified_test_process_name(proc)
Expand Down Expand Up @@ -3596,7 +3606,7 @@ def test_execute_single_output_prefer_header_return_representation_literal(self)
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json == {
"output_data": "test",
"output_data": {"value": "test"},
}

def test_execute_single_output_prefer_header_return_representation_complex(self):
Expand Down Expand Up @@ -3968,7 +3978,7 @@ def test_execute_single_output_multipart_accept_data(self):
# validate the results based on original execution request
results = resp
assert ContentType.MULTIPART_MIXED in results.content_type
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
output_json = json.dumps({"data": "test"}, separators=(",", ":"))
results_body = inspect.cleandoc(f"""
--{boundary}
Expand All @@ -3977,8 +3987,9 @@ def test_execute_single_output_multipart_accept_data(self):
{output_json}
--{boundary}--
""")
assert results.text == results_body
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -4040,15 +4051,17 @@ def test_execute_single_output_multipart_accept_link(self):
# validate the results based on original execution request
results = resp
assert ContentType.MULTIPART_MIXED in results.content_type
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Length: 0
Content-Location: {out_url}/{job_id}/output_json/result.json
--{boundary}--
""")
assert results.text == results_body
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -4109,18 +4122,20 @@ def test_execute_single_output_multipart_accept_alt_format(self):
# validate the results based on original execution request
results = resp
assert ContentType.MULTIPART_MIXED in results.content_type
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
output_json_as_yaml = yaml.safe_dump({"data": "test"})
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Type: {ContentType.APP_YAML}
Content-ID: <output_json@{job_id}>
Content-Length: 12
{output_json_as_yaml}
--{boundary}--
""")
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -4192,21 +4207,24 @@ def test_execute_multi_output_multipart_accept(self, multipart_header):
job_id = status["jobID"]
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Length: 4
test
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Length: 0
Content-Location: {out_url}/{job_id}/output_json/result.json
--{boundary}--
""")
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -4303,9 +4321,9 @@ def test_execute_multi_output_prefer_header_return_representation(self):
self.deploy_process(body, process_id=p_id)

exec_headers = {
"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}, respond-async"
"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}, respond-async",
"Content-Type": ContentType.APP_JSON,
}
exec_headers.update(self.json_headers)
exec_content = {
"inputs": {
"message": "test"
Expand Down Expand Up @@ -4334,23 +4352,29 @@ def test_execute_multi_output_prefer_header_return_representation(self):
job_id = status["jobID"]
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
output_json = json.dumps({"data": "test"}, separators=(",", ":"))
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Disposition: attachment; filename="output_data.txt" name="output_data"
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Length: 4
test
--{boundary}
Content-Disposition: attachment; filename="result.json" name="output_json"
Content-Type: {ContentType.APP_JSON}
Content-Location: {out_url}/{job_id}/output_json/result.json
Content-ID: <output_json@{job_id}>
Content-Length: 16
{output_json}
--{boundary}--
""")
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -4397,21 +4421,23 @@ def test_execute_multi_output_response_raw_value(self):
job_id = status["jobID"]
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
output_json = json.dumps({"data": "test"}, separators=(",", ":"))
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Length: 4
test
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Length: 16
{output_json}
--{boundary}--
""")
""").replace("\n", "\r\n")
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
Expand Down Expand Up @@ -4460,20 +4486,23 @@ def test_execute_multi_output_response_raw_reference(self):
job_id = status["jobID"]
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Length: 0
Content-Location: {out_url}/{job_id}/output_data/result.txt
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Length: 0
Content-Location: {out_url}/{job_id}/output_json/result.json
--{boundary}--
""")
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -4521,31 +4550,37 @@ def test_execute_multi_output_response_raw_mixed(self):
job_id = status["jobID"]
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
boundary = parse_kvp(results.content_type)["boundary"][0]
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
output_json = json.dumps({"data": "test"}, separators=(",", ":"))
results_body = inspect.cleandoc(f"""
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Length: 4
test
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_text@{job_id}>
Content-Length: 0
Content-Location: {out_url}/{job_id}/output_text/result.txt
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Length: 16
{output_json}
--{boundary}--
""")
""").replace("\n", "\r\n")
results_text = self.remove_result_multipart_variable(results.text)
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_data": "test",
"output_data": {
"value": "test"
},
"output_text": {
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
Expand Down Expand Up @@ -4608,7 +4643,9 @@ def test_execute_multi_output_prefer_header_return_minimal_defaults(self):
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_data": "test",
"output_data": {
"value": "test"
},
"output_json": {
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
Expand Down Expand Up @@ -4680,7 +4717,9 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_data": "test",
"output_data": {
"value": "test"
},
"output_json": {
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
Expand Down Expand Up @@ -4743,7 +4782,9 @@ def test_execute_multi_output_response_document_defaults(self):
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_data": "test",
"output_data": {
"value": "test"
},
"output_json": {
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
Expand Down Expand Up @@ -4812,7 +4853,9 @@ def test_execute_multi_output_response_document_mixed(self):
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_data": "test",
"output_data": {
"value": "test"
},
"output_json": {
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
Expand Down
6 changes: 4 additions & 2 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
load_pywps_config
)
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.jobs.utils import get_job_results_response, get_job_submission_response
from weaver.wps_restapi.jobs.utils import get_job_results_response, get_job_submission_response, get_job_return
from weaver.wps_restapi.processes.utils import resolve_process_tag

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -794,9 +794,9 @@ def submit_job_handler(payload, # type: ProcessExecution
# Prefer header not resolved with a valid value should still resume without error
is_execute_async = mode != ExecuteMode.SYNC
accept_type = validate_job_accept_header(headers, mode)
exec_resp = get_job_return(job=None, body=json_body, headers=headers) # job 'none' since still doing 1st parsing
get_header("prefer", headers, pop=True) # don't care about value, just ensure removed with any header container

exec_resp = json_body.get("response")
subscribers = map_job_subscribers(json_body, settings)
job_inputs = json_body.get("inputs")
job_outputs = json_body.get("outputs")
Expand Down Expand Up @@ -867,6 +867,8 @@ def validate_job_accept_header(headers, execution_mode):
# anything always allowed in sync, since results returned directly
if execution_mode == ExecuteMode.SYNC:
return accept
if ContentType.ANY in accept:
return
raise HTTPNotAcceptable(
json=sd.ErrorJsonResponseBodySchema(schema_include=True).deserialize({
"type": "NotAcceptable",
Expand Down
Loading

0 comments on commit fc75d93

Please sign in to comment.