Skip to content

Commit

Permalink
Ensure excpetion make it to the message context
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jul 31, 2023
1 parent 699587f commit 39d2ecc
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions src/saturn_engine/worker/executors/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def run_queue(self) -> None:
while self.is_running:
processable = await self.poll()
processable._context.callback(self.queue.task_done)
try:
with contextlib.suppress(Exception):
async with processable._context:

@self.services.s.hooks.message_executed.emit
Expand All @@ -77,11 +77,8 @@ async def scope(
raise
return result

output = await scope(processable)
try:
output = await scope(processable)
except Exception: # noqa: S110
pass
else:
self.consuming_tasks.create_task(
self.services.s.hooks.pipeline_events_emitted.emit(
PipelineEventsEmitted(
Expand All @@ -97,8 +94,8 @@ async def scope(
),
name=f"consume-output({processable})",
)
except Exception:
self.logger.exception("Failed to process queue item")
except Exception:
self.logger.exception("Error processing outputs")

async def submit(self, processable: ExecutableMessage) -> None:
# Get the lock to ensure we don't acquire resource if the submit queue
Expand Down

0 comments on commit 39d2ecc

Please sign in to comment.