diff --git a/src/saturn_engine/worker/executors/queue.py b/src/saturn_engine/worker/executors/queue.py index 12d28f02..f7ac6272 100644 --- a/src/saturn_engine/worker/executors/queue.py +++ b/src/saturn_engine/worker/executors/queue.py @@ -54,7 +54,7 @@ async def run_queue(self) -> None: while self.is_running: processable = await self.poll() processable._context.callback(self.queue.task_done) - try: + with contextlib.suppress(Exception): async with processable._context: @self.services.s.hooks.message_executed.emit @@ -77,11 +77,8 @@ async def scope( raise return result + output = await scope(processable) try: - output = await scope(processable) - except Exception: # noqa: S110 - pass - else: self.consuming_tasks.create_task( self.services.s.hooks.pipeline_events_emitted.emit( PipelineEventsEmitted( @@ -97,8 +94,8 @@ async def scope( ), name=f"consume-output({processable})", ) - except Exception: - self.logger.exception("Failed to process queue item") + except Exception: + self.logger.exception("Error processing outputs") async def submit(self, processable: ExecutableMessage) -> None: # Get the lock to ensure we don't acquire resource if the submit queue