Skip to content

Commit

Permalink
[wip] refector job results to resolve representation after extracting…
Browse files Browse the repository at this point in the history
… their definitions
  • Loading branch information
fmigneault committed Oct 4, 2024
1 parent dc749de commit 1ef76d3
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 120 deletions.
9 changes: 5 additions & 4 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -3673,6 +3673,7 @@ def test_execute_single_output_prefer_header_return_representation_complex(self)
job_id = status["jobID"]
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
assert results.status_code == 200, f"Failed with: [{results.status_code}]\nReason:\n{resp.text}"
assert results.content_type.startswith(ContentType.APP_JSON)
assert results.text == "{\"data\":\"test\"}"
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
Expand Down Expand Up @@ -4783,10 +4784,9 @@ def test_execute_multi_output_prefer_header_return_representation(self):
out_url = get_wps_output_url(self.settings)
results = self.app.get(f"/jobs/{job_id}/results")
boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0]
output_json = repr_json({"data": "test"}, indent=None, separators=(",", ":"), force_string=True)
results_body = self.fix_result_multipart_indent(f"""
--{boundary}
Content-Disposition: attachment; name="output_data"; filename="output_data.txt"
Content-Disposition: attachment; name="output_data"
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Length: 4
Expand All @@ -4799,7 +4799,7 @@ def test_execute_multi_output_prefer_header_return_representation(self):
Content-ID: <output_json@{job_id}>
Content-Length: 16
{output_json}
{{"data":"test"}}
--{boundary}--
""")
results_text = self.remove_result_multipart_variable(results.text)
Expand Down Expand Up @@ -4873,8 +4873,9 @@ def test_execute_multi_output_response_raw_value(self):
{output_json}
--{boundary}--
""")
results_text = self.remove_result_multipart_variable(results.text)
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
assert results.text == results_body
assert results_text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down
38 changes: 36 additions & 2 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@

from weaver import xml_util
from weaver.exceptions import ProcessInstanceError, ServiceParsingError
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.execute import (
ExecuteControlOption,
ExecuteMode,
ExecuteResponse,
ExecuteReturnPreference,
ExecuteTransmissionMode
)
from weaver.formats import AcceptLanguage, ContentType, OutputFormat, repr_json
from weaver.processes.constants import (
CWL_NAMESPACE_WEAVER_ID,
Expand Down Expand Up @@ -81,7 +87,13 @@

from owslib.wps import WebProcessingService

from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode
from weaver.execute import (
AnyExecuteControlOption,
AnyExecuteMode,
AnyExecuteResponse,
AnyExecuteReturnPreference,
AnyExecuteTransmissionMode
)
from weaver.processes.constants import ProcessSchemaType
from weaver.processes.types import AnyProcessType
from weaver.quotation.status import AnyQuoteStatus
Expand Down Expand Up @@ -1080,6 +1092,27 @@ def execution_response(self, response):
raise ValueError(f"Invalid value for '{self.__name__}.execution_response'. Must be one of {resp}")
self["execution_response"] = exec_resp

@property
def execution_return(self):
# type: () -> AnyExecuteReturnPreference
ret = self.setdefault("execution_return", ExecuteReturnPreference.MINIMAL) # almost equivalent to 'document'
if ret not in ExecuteReturnPreference.values():
ret = ExecuteReturnPreference.MINIMAL
self["execution_return"] = ret
return ret

@execution_return.setter
def execution_return(self, return_preference):
# type: (Optional[Union[AnyExecuteReturnPreference, str]]) -> None
if return_preference is None:
exec_ret = ExecuteReturnPreference.MINIMAL
else:
exec_ret = ExecuteReturnPreference.get(return_preference)
if exec_ret not in ExecuteReturnPreference:
return_prefs = list(ExecuteReturnPreference.values())
raise ValueError(f"Invalid value for '{self.__name__}.execution_return'. Must be one of {return_prefs}")
self["execution_return"] = exec_ret

@property
def is_local(self):
# type: () -> bool
Expand Down Expand Up @@ -1509,6 +1542,7 @@ def params(self):
"status_message": self.status_message,
"status_location": self.status_location,
"execution_response": self.execution_response,
"execution_return": self.execution_return,
"execution_mode": self.execution_mode,
"is_workflow": self.is_workflow,
"created": self.created,
Expand Down
6 changes: 3 additions & 3 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ def submit_job_handler(payload, # type: ProcessExecution
# Prefer header not resolved with a valid value should still resume without error
is_execute_async = mode != ExecuteMode.SYNC
accept_type = validate_job_accept_header(headers, mode)
exec_resp = get_job_return(job=None, body=json_body, headers=headers) # job 'none' since still doing 1st parsing
exec_resp, exec_return = get_job_return(job=None, body=json_body, headers=headers) # job 'None' since still parsing
get_header("prefer", headers, pop=True) # don't care about value, just ensure removed with any header container

subscribers = map_job_subscribers(json_body, settings)
Expand All @@ -803,8 +803,8 @@ def submit_job_handler(payload, # type: ProcessExecution
store = db.get_store(StoreJobs) # type: StoreJobs
job = store.save_job(task_id=Status.ACCEPTED, process=process, service=provider_id,
inputs=job_inputs, outputs=job_outputs, is_workflow=is_workflow, is_local=is_local,
execute_async=is_execute_async, execute_response=exec_resp, custom_tags=tags, user_id=user,
access=visibility, context=context, subscribers=subscribers,
execute_async=is_execute_async, execute_response=exec_resp, execute_return=exec_return,
custom_tags=tags, user_id=user, access=visibility, context=context, subscribers=subscribers,
accept_type=accept_type, accept_language=language)
job.save_log(logger=LOGGER, message="Job task submitted for execution.", status=Status.ACCEPTED, progress=0)
job = store.update_job(job)
Expand Down
3 changes: 2 additions & 1 deletion weaver/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pywps import Process as ProcessWPS

from weaver.datatype import Bill, Job, Process, Quote, Service, VaultFile
from weaver.execute import AnyExecuteResponse
from weaver.execute import AnyExecuteResponse, AnyExecuteReturnPreference
from weaver.sort import AnySortType
from weaver.status import AnyStatusSearch
from weaver.typedefs import (
Expand Down Expand Up @@ -176,6 +176,7 @@ def save_job(self,
is_local=False, # type: bool
execute_async=True, # type: bool
execute_response=None, # type: Optional[AnyExecuteResponse]
execute_return=None, # type: Optional[AnyExecuteReturnPreference]
custom_tags=None, # type: Optional[List[str]]
user_id=None, # type: Optional[int]
access=None, # type: Optional[AnyVisibility]
Expand Down
4 changes: 3 additions & 1 deletion weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

from pymongo.collection import Collection

from weaver.execute import AnyExecuteResponse
from weaver.execute import AnyExecuteResponse, AnyExecuteReturnPreference
from weaver.processes.types import AnyProcessType
from weaver.sort import AnySortType
from weaver.status import AnyStatusSearch
Expand Down Expand Up @@ -792,6 +792,7 @@ def save_job(self,
is_local=False, # type: bool
execute_async=True, # type: bool
execute_response=None, # type: Optional[AnyExecuteResponse]
execute_return=None, # type: Optional[AnyExecuteReturnPreference]
custom_tags=None, # type: Optional[List[str]]
user_id=None, # type: Optional[int]
access=None, # type: Optional[AnyVisibility]
Expand Down Expand Up @@ -830,6 +831,7 @@ def save_job(self,
"status": map_status(Status.ACCEPTED),
"execute_async": execute_async,
"execution_response": execute_response,
"execution_return": execute_return,
"is_workflow": is_workflow,
"is_local": is_local,
"created": created if created else now(),
Expand Down
Loading

0 comments on commit 1ef76d3

Please sign in to comment.