Skip to content

Commit

Permalink
shared_lock_max_reservations in rabbitMq topic options
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud authored and infherny committed Sep 29, 2023
1 parent f49f4f4 commit 1f20091
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Options:
prefetch_count: t.Optional[int] = None
serializer: RabbitMQSerializer = RabbitMQSerializer.JSON
log_above_size: t.Optional[int] = None
max_publish_concurrency: int = 8

class TopicServices:
rabbitmq: RabbitMQService
Expand All @@ -80,7 +81,9 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
self.exit_stack = contextlib.AsyncExitStack()
self.is_closed = False
self._queue: t.Optional[aio_pika.abc.AbstractQueue] = None
self._publish_lock = SharedLock(max_reservations=8)
self._publish_lock = SharedLock(
max_reservations=options.max_publish_concurrency
)

async def run(self) -> AsyncGenerator[t.AsyncContextManager[TopicMessage], None]:
if self.is_closed:
Expand Down

0 comments on commit 1f20091

Please sign in to comment.