Skip to content

Commit

Permalink
dead_letter_exchanges in RabbitMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud committed Nov 6, 2023
1 parent 1397706 commit 2f67bfa
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
43 changes: 43 additions & 0 deletions src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Options:
log_above_size: t.Optional[int] = None
max_publish_concurrency: int = 8
max_retry: int | None = None
dead_letter_queue: str | None = None

class TopicServices:
rabbitmq: RabbitMQService
Expand Down Expand Up @@ -248,6 +249,12 @@ async def queue(self) -> aio_pika.abc.AbstractQueue:
arguments["x-max-length-bytes"] = self.options.max_length_bytes
arguments["x-overflow"] = self.options.overflow

if self.options.dead_letter_queue:
arguments[
"x-dead-letter-exchange"
] = f"exchange-{self.options.dead_letter_queue}"
arguments["x-dead-letter-routing-key"] = "default"

channel = await self.channel
queue = await channel.declare_queue(
self.options.queue_name,
Expand Down Expand Up @@ -308,3 +315,39 @@ def _deserialize(
return deserialized

return fromdict(json.loads(message.body.decode()), TopicMessage)


class RabbitMQDeadLetterTopic(RabbitMQTopic):
class TopicServices:
rabbitmq: RabbitMQService

def __init__(
self, options: RabbitMQTopic.Options, services: Services, **kwargs: object
) -> None:
super().__init__(options=options, services=services, **kwargs)
self.exchange_name = f"exchange-{self.options.queue_name}"

@cached_property
async def queue(self) -> aio_pika.abc.AbstractQueue:
arguments: dict[str, t.Any] = {}

if self.options.max_length:
arguments["x-max-length"] = self.options.max_length
if self.options.max_length_bytes:
arguments["x-max-length-bytes"] = self.options.max_length_bytes
arguments["x-overflow"] = self.options.overflow

channel = await self.channel
await channel.declare_exchange(
name=self.exchange_name,
type=aio_pika.abc.ExchangeType.DIRECT,
)
queue = await channel.declare_queue(
self.options.queue_name,
auto_delete=self.options.auto_delete,
durable=self.options.durable,
arguments=arguments,
)

await queue.bind(self.exchange_name, routing_key="default")
return queue
42 changes: 42 additions & 0 deletions tests/worker/topics/test_rabbitmq_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from saturn_engine.worker.services.rabbitmq import RabbitMQService
from saturn_engine.worker.topic import TopicClosedError
from saturn_engine.worker.topics import RabbitMQTopic
from saturn_engine.worker.topics.rabbitmq import RabbitMQDeadLetterTopic
from saturn_engine.worker.topics.rabbitmq import RabbitMQSerializer
from tests.utils.tcp_proxy import TcpProxy
from tests.worker.topics.conftest import RabbitMQTopicMaker
Expand Down Expand Up @@ -252,3 +253,44 @@ async def test_retry(
assert message.id == "1"

await topic.close()


@pytest.mark.asyncio
async def test_dead_letter_exchanges(
rabbitmq_topic_maker: t.Callable[..., Awaitable[RabbitMQTopic]]
) -> None:
topic = await rabbitmq_topic_maker(
RabbitMQTopic,
serializer=RabbitMQSerializer.PICKLE,
dead_letter_queue="dlx_queue",
)
dlx_topic = await rabbitmq_topic_maker(
RabbitMQDeadLetterTopic,
serializer=RabbitMQSerializer.PICKLE,
queue_name="dlx_queue",
)
await topic.ensure_queue()
await dlx_topic.ensure_queue()

messages = [
TopicMessage(id=MessageId("0"), args={"n": b"1", "time": utcnow()}),
]

for message in messages:
await topic.publish(message, wait=True)

# We make the message fail
async with alib.scoped_iter(topic.run()) as topic_iter:
context = await alib.anext(topic_iter)
with pytest.raises(ValueError):
async with context as message:
raise ValueError("Exception")

# We iter the dlx_topic, ensure the failed message
async with alib.scoped_iter(dlx_topic.run()) as dlx_topic_iter:
context = await alib.anext(dlx_topic_iter)
async with context as message:
assert message.id == "0"

await topic.close()
await dlx_topic.close()

0 comments on commit 2f67bfa

Please sign in to comment.