Skip to content

Commit

Permalink
Add timeout on publish
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Aug 14, 2024
1 parent 5d5ed28 commit 2c79613
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class RabbitMQTopic(Topic):
"""A queue that consume message from RabbitMQ"""

RETRY_PUBLISH_DELAY = timedelta(seconds=10)
PUBLISH_TIMEOUT = timedelta(seconds=30)
FAILURE_RETRY_BACKOFFS = [timedelta(seconds=s) for s in (0, 5, 10, 20, 30, 60)]

@dataclasses.dataclass
Expand Down Expand Up @@ -180,14 +181,18 @@ async def publish(
try:
await self.ensure_queue() # Ensure the queue is created.
exchange = await self.exchange
await exchange.publish(
aio_pika.Message(
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
content_type=self.options.serializer.content_type,
expiration=message.expire_after,
await asyncio.wait_for(
exchange.publish(
aio_pika.Message(
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
content_type=self.options.serializer.content_type,
expiration=message.expire_after,
),
routing_key=self.options.routing_key
or self.options.queue_name,
),
routing_key=self.options.routing_key or self.options.queue_name,
timeout=self.PUBLISH_TIMEOUT.total_seconds(),
)
self.publish_bytes_counter.add(len(body), {"topic": self.name})
return True
Expand Down

0 comments on commit 2c79613

Please sign in to comment.