Skip to content

Commit

Permalink
[wip] generalize job result value/href resolution for single/multi/ar…
Browse files Browse the repository at this point in the history
…ray results
  • Loading branch information
fmigneault committed Sep 23, 2024
1 parent fddb3b3 commit d5d3a21
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 50 deletions.
26 changes: 26 additions & 0 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,32 @@ def results_url(self, container=None):
# type: (Optional[AnySettingsContainer]) -> str
return self.job_url(container=container, extra_path="/results")

def result_path(self, job_id=None, output_id=None, file_name=None):
# type: (Optional[AnyUUID], Optional[str], Optional[str]) -> str
"""
Obtains a *relative* result path, according to requested parameters and the :term:`Job` definition.
The generated path will automatically apply the relative job context if defined.
:param job_id: Override ID to employ for the job path. Otherwise, uses the usually job UUID by default.
This should be used for cases where the ID is "not yet" established by the job, or that an alternate
location based on a UUID established by another source must be employed.
:param output_id:
Output ID to refer to in the path. If omitted, the path prefix will stop at the job ID fragment.
:param file_name:
Output file name and extension to apply to the path. If omitted, the path prefix will stop at the output ID.
:return: Resolved *relative* result path.
"""
result_job_id = str(job_id or self.id)
result_job_path = os.path.join(self.context, result_job_id) if self.context else result_job_id
if not output_id:
return result_job_path
result_job_path = os.path.join(result_job_path, output_id)
if not file_name:
return result_job_path
result_job_path = os.path.join(result_job_path, file_name)
return result_job_path

def links(self, container=None, self_link=None):
# type: (Optional[AnySettingsContainer], Optional[str]) -> List[Link]
"""
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -2727,7 +2727,7 @@ def make_location_storage(self, storage_type, location_type):
)

output_job_id = str(self.response.uuid)
output_prefix = os.path.join(self.job.context, output_job_id) if self.job.context else output_job_id
output_prefix = self.job.result_path(job_id=output_job_id)
# pylint: disable=attribute-defined-outside-init # references to nested storage dynamically created
if storage_type == STORE_TYPE.S3:
storage.prefix = output_prefix
Expand Down
1 change: 1 addition & 0 deletions weaver/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class CWL_SchemaName(Protocol):
AnyRegistryContainer = AnyContainer
AnyDatabaseContainer = AnyContainer

AnyData = Union[str, bytes, bytearray]
CookiesType = Dict[str, str]
HeadersType = Dict[str, str]
CookiesTupleType = List[Tuple[str, str]]
Expand Down
20 changes: 18 additions & 2 deletions weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
MetadataResult = TypedDict("MetadataResult", {
"Date": str,
"Last-Modified": str,
"Content-ID": NotRequired[str],
"Content-Type": NotRequired[str],
"Content-Length": NotRequired[str],
"Content-Location": NotRequired[str],
Expand Down Expand Up @@ -1191,6 +1192,8 @@ def get_href_headers(
content_headers=False, # type: bool
content_type=None, # type: Optional[str]
content_disposition_type="attachment", # type: Literal["attachment", "inline"]
content_location=None, # type: Optional[str]
content_id=None, # type: Optional[str]
settings=None, # type: Optional[SettingsType]
**option_kwargs, # type: Unpack[Union[SchemeOptions, RequestOptions]]
): # type: (...) -> MetadataResult
Expand All @@ -1209,8 +1212,16 @@ def get_href_headers(
Explicit ``Content-Type`` to provide.
Otherwise, use default guessed by file system (often ``application/octet-stream``).
If the reference is a directory, this parameter is ignored and ``application/directory`` will be enforced.
Requires that :paramref:`content_headers` is enabled.
:param content_disposition_type:
Whether ``inline`` or ``attachment`` should be used, when enabled by :paramref:`download_headers`.
Whether ``inline`` or ``attachment`` should be used.
Requires that :paramref:`content_headers` and :paramref:`download_headers` are enabled.
:param content_location:
Override ``Content-Location`` to include in headers. Otherwise, defaults to the :paramref:`path`.
Requires that :paramref:`location_headers` and :paramref:`content_headers` are enabled in each case.
:param content_id:
Optional ``Content-ID`` to include in the headers.
Requires that :paramref:`content_headers` is enabled.
:param settings: Application settings to pass down to relevant utility functions.
:return: Headers for the reference.
"""
Expand Down Expand Up @@ -1259,8 +1270,13 @@ def get_href_headers(
f_size = stat.st_size
f_modified = datetime.fromtimestamp(stat.st_mtime)

headers = {"Content-Location": href} if location_headers else {}
headers = {}
if content_headers:
content_id = content_id.strip("<>") if isinstance(content_id, str) else ""
if content_id:
headers["Content-ID"] = f"<{content_id}>"
if location_headers:
headers["Content-Location"] = content_location or href
c_type, c_enc = guess_file_contents(href)
if not f_type:
if c_type == ContentType.APP_OCTET_STREAM: # default
Expand Down
143 changes: 96 additions & 47 deletions weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
)
from pyramid.response import FileResponse
from pyramid_celery import celery_app
from requests.structures import CaseInsensitiveDict
from webob.headers import ResponseHeaders

from weaver.database import get_db
Expand Down Expand Up @@ -66,6 +65,7 @@
from weaver.execute import AnyExecuteResponse, AnyExecuteTransmissionMode
from weaver.processes.constants import JobInputsOutputsSchemaType
from weaver.typedefs import (
AnyData,
AnyHeadersContainer,
AnyRequestType,
AnyResponseType,
Expand All @@ -77,7 +77,9 @@
ExecutionResults,
ExecutionResultValue,
HeadersTupleType,
HeadersType,
JSON,
Path,
PyramidRequest,
SettingsType
)
Expand Down Expand Up @@ -473,15 +475,20 @@ def get_job_return(job, body=None, headers=None):
return job.execution_response


def get_job_output_transmission(job, output_id):
# type: (Job, str) -> Optional[AnyExecuteTransmissionMode]
def get_job_output_transmission(job, output_id, is_reference):
# type: (Job, str, bool) -> AnyExecuteTransmissionMode
"""
Obtain the requested :term:`Job` output ``transmissionMode``.
"""
outputs = job.outputs or {}
out = outputs.get(output_id) or {}
mode = out.get("transmissionMode")
return mode
# because mode can be omitted, resolve their default explicitly
if not mode and is_reference:
return ExecuteTransmissionMode.REFERENCE
if not mode and not is_reference:
return ExecuteTransmissionMode.VALUE
return cast("AnyExecuteTransmissionMode", mode)


def get_job_results_response(
Expand Down Expand Up @@ -620,6 +627,80 @@ def get_job_results_response(
return resp


def generate_or_resolve_result(
job, # type: Job
result, # type: ExecutionResultObject
result_id, # type: str
output_id, # type: str
output_mode, # type: ExecuteTransmissionMode
settings, # type: SettingsType
): # type: (...) -> Tuple[HeadersType, AnyData]
"""
Obtains the local file path and the corresponding :term:`URL` reference for a given result, generating it as needed.
:param job: Job with results details.
:param result: The specific output value or reference (could be an item index within an array of a given output).
:param result_id: Specific identifier of the result, including any array index as applicable.
:param output_id: Generic identifier of the output containing the result.
:param output_mode: Desired output transmission mode.
:param settings: Application settings to resolve locations.
:return: Resolved locations.
"""
key = get_any_value(result, key=True)
val = get_any_value(result)
cid = f"{result_id}@{job.id}"
url = None
loc = None
typ = None
res_data = None

# NOTE:
# work with local files (since we have them), to avoid unnecessary loopback request
# then, rewrite the locations after generating their headers to obtain the final result URL

# FIXME: Handle S3 output storage. Should multipart response even be allowed in this case?

if key == "href":
url = val
typ = result.get("type") or ContentType.APP_OCTET_STREAM
loc = map_wps_output_location(val, settings, exists=True, file_scheme=True, url=False)

if not url:
out_dir = get_wps_output_dir(settings)
out_name = f"{result_id}.txt"
job_path = job.result_path(output_id=output_id, file_name=out_name)
loc = os.path.join(out_dir, job_path)
url = map_wps_output_location(loc, settings, exists=True, url=True)

if key == "value":
res_data = val
typ = ContentType.TEXT_PLAIN

if key == "value" and output_mode == ExecuteTransmissionMode.REFERENCE:
if not os.path.isfile(loc):
os.makedirs(os.path.dirname(loc), exist_ok=True)
with open(loc, mode="w", encoding="utf-8") as out_file:
out_file.write(val)

if key == "href" and output_mode == ExecuteTransmissionMode.VALUE:
with open(loc, mode="rb") as out_file:
res_data = out_file.read()

if output_mode == ExecuteTransmissionMode.REFERENCE:
res_data = ""

res_headers = get_href_headers(
loc,
download_headers=True,
content_headers=True,
content_type=typ,
content_id=cid,
content_location=url, # rewrite back the original URL
settings=settings,
)
return res_headers, res_data


def get_job_results_multipart(job, results, container):
# type: (Job, ExecutionResults, AnySettingsContainer) -> HTTPOk
"""
Expand All @@ -630,67 +711,35 @@ def get_job_results_multipart(job, results, container):
:param job:
:param results: Pre-filtered and pre-processed results in a normalized format structure.
:param container: Application settings to resolve locations.
"""
settings = get_settings(container)

def add_result_parts(result_parts):
# type: (List[Tuple[str, str, ExecutionResultObject]]) -> MIMEMultipart

multi = MIMEMultipart("mixed")
for res_id, result in result_parts.items():
for res_id, out_id, result in result_parts:
if isinstance(result, list):
sub_parts = {f"{res_id}.{i}": data for i, data in enumerate(result)}
sub_parts = [(f"{out_id}.{i}", out_id, data) for i, data in enumerate(result)]
part = add_result_parts(sub_parts)
multi.attach(part)
continue

key = get_any_value(result, key=True)
val = get_any_value(result)
mode = get_job_output_transmission(job, res_id)
# FIXME: adjust output based on transmissionMode rather than href/value key
if key == "value":
url = None
if mode == ExecuteTransmissionMode.REFERENCE:
url = None # FIXME: write file
else:
mode = ExecuteTransmissionMode.VALUE # in case unspecified, default "auto"
if key == "href":
url = val
if mode == ExecuteTransmissionMode.VALUE:
val = None # FIXME: read file
else:
mode = ExecuteTransmissionMode.REFERENCE # in case unspecified, default "auto"

# NOTE: work with local files (since we have them), to avoid unnecessary loopback request
# FIXME: Handle S3 output storage. Should multipart response even be allowed in this case?
if key == "href":
typ = result.get("type") or ContentType.APP_OCTET_STREAM
loc = map_wps_output_location(url, settings, exists=True, file_scheme=True, url=False)
res_headers = get_href_headers(
loc,
download_headers=True,
content_headers=True,
content_type=typ,
settings=settings,
)
res_headers["Content-Location"] = url # rewrite back the original URL
else:
typ = ContentType.TEXT_PLAIN
name = f"{res_id}.txt"
res_headers = get_href_headers(
name,
download_headers=True,
content_headers=True,
content_type=typ,
settings=settings,
)
mode = get_job_output_transmission(job, out_id, is_reference=(key == "href"))
res_headers, res_data = generate_or_resolve_result(job, result, res_id, out_id, mode, settings)

part = MIMEPart()
for hdr_key, hdr_val in res_headers.items():
part.add_header(hdr_key, hdr_val)
part.set_payload(val)
if res_data:
part.set_payload(res_data)
multi.attach(part)
return multi

res_multi = add_result_parts(results)
results_parts = [(_res_id, _res_id, _res_val) for _res_id, _res_val in results.items()]
res_multi = add_result_parts(results_parts)
resp = HTTPOk(
detail=f"Multipart Response for {job}",
headers={"Content-Type": res_multi.get_content_type()},
Expand Down

0 comments on commit d5d3a21

Please sign in to comment.