From c83931bbddfb2d84fb26c34efa87d516c9e7a3a5 Mon Sep 17 00:00:00 2001 From: Samuel Laroche Date: Mon, 7 Aug 2023 15:29:06 -0400 Subject: [PATCH] add topic message to meta args --- src/saturn_engine/worker/pipeline_message.py | 3 +++ tests/core/test_pipeline.py | 25 ++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/saturn_engine/worker/pipeline_message.py b/src/saturn_engine/worker/pipeline_message.py index 23264dae..e86f6607 100644 --- a/src/saturn_engine/worker/pipeline_message.py +++ b/src/saturn_engine/worker/pipeline_message.py @@ -16,6 +16,9 @@ class PipelineMessage: message: TopicMessage meta_args: dict[str, t.Any] = dataclasses.field(default_factory=dict) + def __post_init__(self) -> None: + self.set_meta_arg(meta_type=TopicMessage, value=self.message) + @property def id(self) -> str: return self.message.id diff --git a/tests/core/test_pipeline.py b/tests/core/test_pipeline.py index 781235df..8ce6ba6a 100644 --- a/tests/core/test_pipeline.py +++ b/tests/core/test_pipeline.py @@ -59,6 +59,11 @@ def pipeline_kwargs(a: int, *, spy: Mock, **kwargs: t.Any) -> int: return 1 +def pipeline_topic_message(a: int, topic_message: TopicMessage, *, spy: Mock) -> int: + spy(a=a, topic_message=topic_message) + return 1 + + def test_pipeline_messages_execute() -> None: spy = Mock() c = C("c") @@ -104,3 +109,23 @@ def test_pipeline_kwargs_execute() -> None: a=1, b=2, ) + + +def test_pipeline_topic_message_execute() -> None: + spy = Mock() + args: dict = { + "a": 1, + "spy": spy, + } + topic_message = TopicMessage(args=args) + + msg = PipelineMessage( + info=PipelineInfo.from_pipeline(pipeline_topic_message), + message=topic_message, + ) + assert msg.execute() == 1 + + spy.assert_called_once_with( + a=1, + topic_message=topic_message, + )