Skip to content

Commit

Permalink
Fix rabbitmq topic hanging forever on serialization error
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Aug 10, 2023
1 parent ff5dcd0 commit 6b93dde
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async def publish(

async with self._publish_lock.reserve() as reservation:
while True:
body = self._serialize(message)
try:
await self.ensure_queue() # Ensure the queue is created.
channel = await self.channel
Expand All @@ -128,7 +129,7 @@ async def publish(

await exchange.publish(
aio_pika.Message(
body=self._serialize(message),
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
content_type=self.options.serializer.content_type,
),
Expand Down Expand Up @@ -167,16 +168,18 @@ async def publish(
has_locked = reservation.locked() or not self._publish_lock.locked()
await reservation.acquire()
if has_locked:
await self.backoff_sleep(attempt)
if not await self.backoff_sleep(attempt):
raise
attempt += 1

return False

async def backoff_sleep(self, attempt: int) -> None:
retry_delay = self.FAILURE_RETRY_BACKOFFS[-1]
if attempt < len(self.FAILURE_RETRY_BACKOFFS):
retry_delay = self.FAILURE_RETRY_BACKOFFS[attempt]
async def backoff_sleep(self, attempt: int) -> bool:
if attempt >= len(self.FAILURE_RETRY_BACKOFFS):
return False
retry_delay = self.FAILURE_RETRY_BACKOFFS[attempt]
await asyncio.sleep(retry_delay.total_seconds())
return True

@asynccontextmanager
async def message_context(
Expand Down
12 changes: 12 additions & 0 deletions tests/worker/topics/test_rabbitmq_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from collections.abc import Awaitable
from datetime import datetime
from datetime import timedelta

import asyncstdlib as alib
Expand Down Expand Up @@ -177,3 +178,14 @@ async def test_closed_rabbitmq_topic(
await topic.close()
with pytest.raises(TopicClosedError):
await topic.publish(TopicMessage(id=MessageId("0"), args={"n": 0}), wait=True)


@pytest.mark.asyncio
async def test_rabbitmq_topic_serialization_error(
rabbitmq_topic_maker: t.Callable[..., Awaitable[RabbitMQTopic]]
) -> None:
topic = await rabbitmq_topic_maker(RabbitMQTopic)
with pytest.raises(TypeError):
await topic.publish(
TopicMessage(id=MessageId("0"), args={"n": datetime.now()}), wait=True
)

0 comments on commit 6b93dde

Please sign in to comment.