-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor RabbitMQ failure handler #2087
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Could you add a short section in http://smallrye.io/smallrye-reactive-messaging/4.3.0/rabbitmq/receiving-messages-from-rabbitmq/#failure-management ?
I'm trying to build a reconsume later strategy to test the apis exposed by the connector. I'm really new to reactive stream and I struggle to emit delayed message to a subscriber returned by the connector getSubscriber method. In the following code, I don't know why the multi subscribe method expect a public class RabbitMQReconsumeLater implements RabbitMQFailureHandler {
private MultiEmitter<? super Message<?>> emitter;
public RabbitMQReconsumeLater(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration config) {
// create outgoing config for a delayed channel from the incoming channel configuration
final Config config = ...;
// create a subscriber to push message to
final Subscriber<? extends Message<?>> processor = connector.getSubscriber(config);
Multi<Message<?>> multi = Multi.createFrom().<Message<?>> emitter(e -> emitter = e);
multi.subscribe(processor); // The method subscribe(Flow.Subscriber<? super Message<?>>) in the type Flow.Publisher<Message<?>> is not applicable for the arguments (Flow.Subscriber<capture#4-of ? extends Message<?>>)
}
@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
// new outgoing message with ttl from msg
Message<V> message = ...;
emitter.emit(message);
} |
I wouldn't have thought of using the connector to create another outgoing channel stream. The channel will be present on the health check, other than that I don't think there'll be a negative impact. For the Emitter, you can create a That being said I think in RabbitMQ one can configure the |
That's right. Basicly, I would create an outgoing channel based on the incoming configuration. That's what the Kafka dead letter handler do but there is less coupling between the connector/connector config and the reactive client wrapper. For healthcheck, I guess the client should keep the channel opened and thus it could be monitored in some way. Maybe aggregating the status of both rabbitmq channels to the same connector channel status.
I have seen that pattern in some test but and try it. As far as I remember, I fall on the same mismatch type (super/extends). I end up splitting getSubscriber to get a
Yes and you can configure a queue TTL on the DLQ to have the message back to original queue, but then:
The idea is to duplicate the message and send it with new metadata and ack the old one when the new one is received. |
@scrocquesel If you'd like to work on it and contribute to the project I've continued on your idea to make it work: ozangunalp@ee54305 Please feel free to continue on it to implement your use case. |
Thanks, I'll definitely have a look. |
@ozangunalp I have added a working reconsume later strategy. I tried not to expose publicly much of RabbitMQConnector method but I was in need of a way to create the reconsume queue. Also, I'm not sure of what the overflow buffer size should be. In my understanding, the upstream is waiting for the original message to be ack/nack. So the reconsumer emitter should not have to deal with more than the upstream buffer size which is |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #2087 +/- ##
============================================
+ Coverage 77.74% 77.79% +0.05%
- Complexity 3698 3735 +37
============================================
Files 302 311 +9
Lines 12311 12443 +132
Branches 1574 1579 +5
============================================
+ Hits 9571 9680 +109
- Misses 2021 2041 +20
- Partials 719 722 +3
|
e4659ed
to
cfb5c5b
Compare
@ozangunalp Any progress on this one ? I rebased to latest main. It miss the doc for reconsume strategy but want to be sure you it could be part of the lib. |
@scrocquesel Thank you very much for this, I am sorry I couldn't respond earlier. Maybe the reconsume-later feature can be implemented only by setting up a And if setting up a retry ttl per message is not that crucial, it wouldn't justify the complexity we are adding with this PR. |
1 similar comment
@scrocquesel Thank you very much for this, I am sorry I couldn't respond earlier. Maybe the reconsume-later feature can be implemented only by setting up a And if setting up a retry ttl per message is not that crucial, it wouldn't justify the complexity we are adding with this PR. |
A common pattern involves configuring a waiting queue with a time-to-live (TTL) and a dead-letter queue (DLQ) that targets the original queue. If a message is published to a queue with a TTL, it will expire after that duration. When a message expires in a queue, RabbitMQ will route it to its corresponding DLQ, if one exists (thereby returning it to the original queue), or delete it. The failure strategy involves publishing a copy of the message to the waiting queue and acknowledging the original message to discard it. Later, the copy message will expire and will be reprocessed from the main queue. Only one DLQ can be assigned to a queue. Thus, if a message is "nacked" to the DLQ as a retry mechanism, a real DLQ cannot exist. This lead to two cons:
I agree for the complexity. While the strategy itself is simple, the code required to create a queue is not very reusable. Refactoring may be necessary to better share this code without resorting to the reconsume strategy trick with the config map. |
cfb5c5b
to
c352d63
Compare
c352d63
to
b18fc1f
Compare
@ozangunalp I reverted the branch to only refactor failure handlers with CDI (and add a missing settings for delivery limit). All in all with #2182 , it'll allow me to achieve what I want: Retry latter on specific exception + move poison message in a queue without consumer. |
@scrocquesel In my earlier comment #2087 (comment) I mentioned this possibility and tested it out, and hopefully before I had time to create a PR @andr3adc actually did it :) Thank you very much for your contribution! |
#2085
Factory interface is designed just to make the actual handler works without any change.