Skip to content

Commit

Permalink
Bubble-up unhandled error to the message context
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Sep 21, 2023
1 parent b6861f4 commit f4f45d3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
8 changes: 8 additions & 0 deletions src/saturn_engine/worker/executors/queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import typing as t

import asyncio
import contextlib
import datetime
Expand Down Expand Up @@ -99,6 +101,9 @@ async def scope(
# Transfer the message context to the results
# processing scope.
context=processable._context.pop_all(),
context_error=error
if error and not error.handled
else None,
)
)
if error:
Expand All @@ -110,6 +115,7 @@ async def process_results(
xmsg: ExecutableMessage,
results: PipelineResults,
context: contextlib.AsyncExitStack,
context_error: t.Optional[HandledError],
) -> None:
@self.services.s.hooks.results_processed.emit
async def scope(msg: ResultsProcessed) -> None:
Expand All @@ -129,6 +135,8 @@ async def scope(msg: ResultsProcessed) -> None:
results=results,
)
)
if context_error:
context_error.reraise()

async def submit(self, processable: ExecutableMessage) -> None:
# Get the lock to ensure we don't acquire resource if the submit queue
Expand Down
15 changes: 8 additions & 7 deletions tests/worker/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
from functools import partial
from unittest.mock import AsyncMock

import pytest

Expand Down Expand Up @@ -308,12 +309,10 @@ async def test_executor_error_handler_unhandled(
xmsg = fake_executable_maker_with_output(
output=output_topics,
)
exc_infos = []

async def collect_exit(*args: t.Any) -> None:
exc_infos.append(args)

xmsg._executing_context.push_async_exit(collect_exit)
mock = AsyncMock()
xmsg._executing_context.push_async_exit(mock.executing)
xmsg._context.push_async_exit(mock.context)

# Execute our failing message
async with event_loop.until_idle():
Expand All @@ -323,8 +322,10 @@ async def collect_exit(*args: t.Any) -> None:
assert output_queue.qsize() == 1

# The exception raised to the hooks and contexts is the original one.
assert len(exc_infos) == 1
e = exc_infos[0][1]
mock.executing.__aexit__.assert_awaited_once()
mock.context.__aexit__.assert_awaited_once()
e = mock.executing.__aexit__.call_args[0][2]
assert e is mock.context.__aexit__.call_args[0][2]
assert repr(e) == "Exception('TEST_EXCEPTION')"
assert repr(e.__cause__) == "ValueError('CAUSE')"
assert repr(e.__cause__.__context__) == "ValueError('CAUSE_CONTEXT')"
Expand Down

0 comments on commit f4f45d3

Please sign in to comment.