-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dead_letter_exchanges in RabbitMQ #372
Conversation
rabbitmq: RabbitMQService | ||
|
||
def __init__( | ||
self, options: RabbitMQTopic.Options, services: Services, **kwargs: object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we inherit from RabbitMQTopic.Options
we have the deat_letter_queue parameters which is not supported here... Maybe we should add a warning if the param is set.
2f67bfa
to
f50fc4f
Compare
@isra17 wdyt |
f50fc4f
to
a429131
Compare
@@ -308,3 +315,39 @@ def _deserialize( | |||
return deserialized | |||
|
|||
return fromdict(json.loads(message.body.decode()), TopicMessage) | |||
|
|||
|
|||
class RabbitMQDeadLetterTopic(RabbitMQTopic): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DeadLetterRabbitMQTopic
@@ -72,6 +72,7 @@ class Options: | |||
log_above_size: t.Optional[int] = None | |||
max_publish_concurrency: int = 8 | |||
max_retry: int | None = None | |||
dead_letter_queue: str | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest two changes that would cover everything we need:
- Add a
queue_arguments: dict
, that we can just pass as-is todeclare_queue
. This way we won't have to open a new PR each time we want something new. - Add an
exchange_name: Optional[str]
that we can use to declare and bind an exchange is notNone
?
This way we don't need to create a new class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well make it exchange: Optional[ExchangeOptions]
so we support the few exchange parameters and can do fanout with this PR as well.
@dataclass
class ExchangeOptions:
name: str
type: ExchangeType
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we still need to knew its a dead letter queue for the arguments no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(All the options: https://aio-pika.readthedocs.io/en/latest/apidoc.html#aio_pika.Channel.declare_exchange)
Seems like durable
we need to set True
a429131
to
02c773a
Compare
arguments: dict[str, t.Any] = {} | ||
arguments: dict[str, t.Any] = self.options.arguments | ||
|
||
# Backward compatible arguments | ||
if self.options.max_length: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RM these
@@ -10,7 +10,6 @@ | |||
from .null import NullTopic | |||
from .periodic import PeriodicTopic | |||
from .rabbitmq import RabbitMQTopic | |||
from .static import StaticTopic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
02c773a
to
24a9408
Compare
No description provided.