diff --git a/example/definitions/simple.yaml b/example/definitions/simple.yaml index 20774f34..48116608 100644 --- a/example/definitions/simple.yaml +++ b/example/definitions/simple.yaml @@ -114,11 +114,10 @@ spec: - topic: stdout error::ZeroDivisionError: - topic: logging-error - - error_handler: - set_handled: false - republish: - channel: default - max_retry: 5 + - set_handled: false + - republish: + channel: default + max_retry: 1 pipeline: name: example.pipelines.echo_with_error diff --git a/example/tests/pipeline_tests.yaml b/example/tests/pipeline_tests.yaml index 85922b3e..17ae78ca 100644 --- a/example/tests/pipeline_tests.yaml +++ b/example/tests/pipeline_tests.yaml @@ -73,6 +73,10 @@ spec: module: 'example.pipelines' message: division by zero traceback: __any__ + - channel: default + args: + api_key: {key: test, name: test-key-1} + message: test-hello resources: [] --- apiVersion: saturn.flared.io/v1alpha1 diff --git a/src/saturn_engine/worker/job.py b/src/saturn_engine/worker/job.py index 4fb4867b..724fbba9 100644 --- a/src/saturn_engine/worker/job.py +++ b/src/saturn_engine/worker/job.py @@ -50,11 +50,6 @@ async def item_to_topic(self, item_ctx: Item) -> t.AsyncIterator[TopicMessage]: try: async with item_ctx as item: yield item.as_topic_message() - except Exception: - self.logger.exception( - "Failed to process item", - extra={"data": {"message": {"id": item_ctx.id}}}, - ) finally: self._set_item_done(item_ctx) diff --git a/tests/worker/test_job.py b/tests/worker/test_job.py index bb9f7e19..b31e76d4 100644 --- a/tests/worker/test_job.py +++ b/tests/worker/test_job.py @@ -111,7 +111,8 @@ def fail() -> None: # |0|1|2|3|4|5|6| # -> |.|.|R|.|.|R|.| # Nothing commited. - await xmsg_ctxs[2].aclose() + with pytest.raises(ValueError): + await xmsg_ctxs[2].aclose() await xmsg_ctxs[5].aclose() assert job_state_store.job_state(job_id).cursor is None