Skip to content

Commit

Permalink
add topic message to meta args
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Laroche authored and Samuel Laroche committed Aug 7, 2023
1 parent dcb0b3e commit c83931b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/pipeline_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class PipelineMessage:
message: TopicMessage
meta_args: dict[str, t.Any] = dataclasses.field(default_factory=dict)

def __post_init__(self) -> None:
self.set_meta_arg(meta_type=TopicMessage, value=self.message)

@property
def id(self) -> str:
return self.message.id
Expand Down
25 changes: 25 additions & 0 deletions tests/core/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def pipeline_kwargs(a: int, *, spy: Mock, **kwargs: t.Any) -> int:
return 1


def pipeline_topic_message(a: int, topic_message: TopicMessage, *, spy: Mock) -> int:
spy(a=a, topic_message=topic_message)
return 1


def test_pipeline_messages_execute() -> None:
spy = Mock()
c = C("c")
Expand Down Expand Up @@ -104,3 +109,23 @@ def test_pipeline_kwargs_execute() -> None:
a=1,
b=2,
)


def test_pipeline_topic_message_execute() -> None:
spy = Mock()
args: dict = {
"a": 1,
"spy": spy,
}
topic_message = TopicMessage(args=args)

msg = PipelineMessage(
info=PipelineInfo.from_pipeline(pipeline_topic_message),
message=topic_message,
)
assert msg.execute() == 1

spy.assert_called_once_with(
a=1,
topic_message=topic_message,
)

0 comments on commit c83931b

Please sign in to comment.