Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RabbitMQ failure handler #2087

Merged
merged 1 commit into from
Jul 4, 2023

Conversation

scrocquesel
Copy link
Contributor

#2085

Factory interface is designed just to make the actual handler works without any change.

Copy link
Collaborator

@ozangunalp ozangunalp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scrocquesel
Copy link
Contributor Author

scrocquesel commented Feb 20, 2023

I'm trying to build a reconsume later strategy to test the apis exposed by the connector. I'm really new to reactive stream and I struggle to emit delayed message to a subscriber returned by the connector getSubscriber method.

In the following code, I don't know why the multi subscribe method expect a super Message and the connector provide a extends Message. Should the connector expose another method to return a more appropriate subcriber ? Am I misusing the Multi api ?

public class RabbitMQReconsumeLater implements RabbitMQFailureHandler {
    private MultiEmitter<? super Message<?>> emitter;

    public RabbitMQReconsumeLater(RabbitMQConnector connector, RabbitMQConnectorIncomingConfiguration config) {

        // create outgoing config for a delayed channel from the incoming channel configuration
        final Config config = ...;

        // create a subscriber to push message to
        final Subscriber<? extends Message<?>> processor = connector.getSubscriber(config);

        Multi<Message<?>> multi = Multi.createFrom().<Message<?>> emitter(e -> emitter = e);      
        multi.subscribe(processor); // The method subscribe(Flow.Subscriber<? super Message<?>>) in the type Flow.Publisher<Message<?>> is not applicable for the arguments (Flow.Subscriber<capture#4-of ? extends Message<?>>)
    }

    @Override
    public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
        // new outgoing message with ttl from msg
        Message<V> message = ...;

        emitter.emit(message);

    }

@ozangunalp
Copy link
Collaborator

@scrocquesel

I wouldn't have thought of using the connector to create another outgoing channel stream. The channel will be present on the health check, other than that I don't think there'll be a negative impact.
It can be challenging to construct a Config object to pass to the getSubscriber method, you'd need to override some properties of the incoming channel config like channel-name.

For the Emitter, you can create a MutinyEmitterImpl and call getPublisher and subscribe to it using the subscriber you created earlier. To create a MutinyEmitterImpl you'd need an EmitterConfiguration implementation mostly for specifying the overflow strategy.

That being said I think in RabbitMQ one can configure the dead-letter-exchange and dead-letter-routing-key to implement redelivery.

@scrocquesel
Copy link
Contributor Author

@ozangunalp

I wouldn't have thought of using the connector to create another outgoing channel stream. The channel will be present on the health check, other than that I don't think there'll be a negative impact. It can be challenging to construct a Config object to pass to the getSubscriber method, you'd need to override some properties of the incoming channel config like channel-name.

That's right. Basicly, I would create an outgoing channel based on the incoming configuration. That's what the Kafka dead letter handler do but there is less coupling between the connector/connector config and the reactive client wrapper. For healthcheck, I guess the client should keep the channel opened and thus it could be monitored in some way. Maybe aggregating the status of both rabbitmq channels to the same connector channel status.

For the Emitter, you can create a MutinyEmitterImpl and call getPublisher and subscribe to it using the subscriber you created earlier. To create a MutinyEmitterImpl you'd need an EmitterConfiguration implementation mostly for specifying the overflow strategy.

I have seen that pattern in some test but and try it. As far as I remember, I fall on the same mismatch type (super/extends). I end up splitting getSubscriber to get a RabbitMQMessageSender directly. Still have to address reporting of the channel status.

That being said I think in RabbitMQ one can configure the dead-letter-exchange and dead-letter-routing-key to implement redelivery.

Yes and you can configure a queue TTL on the DLQ to have the message back to original queue, but then:

  • you can't have both a DLQ and a reconsume later queue.
  • you can't have a per message TTL

The idea is to duplicate the message and send it with new metadata and ack the old one when the new one is received.

@ozangunalp
Copy link
Collaborator

@scrocquesel If you'd like to work on it and contribute to the project I've continued on your idea to make it work: ozangunalp@ee54305

Please feel free to continue on it to implement your use case.

@scrocquesel
Copy link
Contributor Author

@scrocquesel If you'd like to work on it and contribute to the project I've continued on your idea to make it work: ozangunalp@ee54305

Please feel free to continue on it to implement your use case.

Thanks, I'll definitely have a look.

@scrocquesel
Copy link
Contributor Author

@ozangunalp I have added a working reconsume later strategy. I tried not to expose publicly much of RabbitMQConnector method but I was in need of a way to create the reconsume queue.

Also, I'm not sure of what the overflow buffer size should be. In my understanding, the upstream is waiting for the original message to be ack/nack. So the reconsumer emitter should not have to deal with more than the upstream buffer size which is max-incoming-internal-queue-size.

@codecov-commenter
Copy link

codecov-commenter commented Mar 6, 2023

Codecov Report

Merging #2087 (acfc352) into main (a2643dc) will increase coverage by 0.05%.
The diff coverage is 65.00%.

❗ Current head acfc352 differs from pull request most recent head b18fc1f. Consider uploading reports for the commit b18fc1f to get more accurate results

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##               main    #2087      +/-   ##
============================================
+ Coverage     77.74%   77.79%   +0.05%     
- Complexity     3698     3735      +37     
============================================
  Files           302      311       +9     
  Lines         12311    12443     +132     
  Branches       1574     1579       +5     
============================================
+ Hits           9571     9680     +109     
- Misses         2021     2041      +20     
- Partials        719      722       +3     
Impacted Files Coverage Δ
...reactive/messaging/rabbitmq/RabbitMQConnector.java 87.16% <46.15%> (-2.03%) ⬇️
...ctive/messaging/rabbitmq/fault/RabbitMQAccept.java 100.00% <100.00%> (ø)
...ive/messaging/rabbitmq/fault/RabbitMQFailStop.java 100.00% <100.00%> (ø)
...ctive/messaging/rabbitmq/fault/RabbitMQReject.java 100.00% <100.00%> (ø)

... and 21 files with indirect coverage changes

@scrocquesel
Copy link
Contributor Author

@ozangunalp I have added a working reconsume later strategy. I tried not to expose publicly much of RabbitMQConnector method but I was in need of a way to create the reconsume queue.

Also, I'm not sure of what the overflow buffer size should be. In my understanding, the upstream is waiting for the original message to be ack/nack. So the reconsumer emitter should not have to deal with more than the upstream buffer size which is max-incoming-internal-queue-size.

@ozangunalp Any progress on this one ? I rebased to latest main. It miss the doc for reconsume strategy but want to be sure you it could be part of the lib.

@ozangunalp
Copy link
Collaborator

@scrocquesel Thank you very much for this, I am sorry I couldn't respond earlier.
I still try to understand how this is different than only setting up a dead-letter-exchange. I see that you've added also this. But I fail to grasp how both work together. Would you mind adding a couple of dedicated test cases for this code?

Maybe the reconsume-later feature can be implemented only by setting up a dead-letter-exchange and a check in the failure strategy for the retry count? I'll try to set up some test cases on my side to see if we can do only that.

And if setting up a retry ttl per message is not that crucial, it wouldn't justify the complexity we are adding with this PR.

1 similar comment
@ozangunalp
Copy link
Collaborator

@scrocquesel Thank you very much for this, I am sorry I couldn't respond earlier.
I still try to understand how this is different than only setting up a dead-letter-exchange. I see that you've added also this. But I fail to grasp how both work together. Would you mind adding a couple of dedicated test cases for this code?

Maybe the reconsume-later feature can be implemented only by setting up a dead-letter-exchange and a check in the failure strategy for the retry count? I'll try to set up some test cases on my side to see if we can do only that.

And if setting up a retry ttl per message is not that crucial, it wouldn't justify the complexity we are adding with this PR.

@scrocquesel
Copy link
Contributor Author

@scrocquesel Thank you very much for this, I am sorry I couldn't respond earlier. I still try to understand how this is different than only setting up a dead-letter-exchange. I see that you've added also this. But I fail to grasp how both work together. Would you mind adding a couple of dedicated test cases for this code?

Maybe the reconsume-later feature can be implemented only by setting up a dead-letter-exchange and a check in the failure strategy for the retry count? I'll try to set up some test cases on my side to see if we can do only that.

And if setting up a retry ttl per message is not that crucial, it wouldn't justify the complexity we are adding with this PR.

A common pattern involves configuring a waiting queue with a time-to-live (TTL) and a dead-letter queue (DLQ) that targets the original queue. If a message is published to a queue with a TTL, it will expire after that duration. When a message expires in a queue, RabbitMQ will route it to its corresponding DLQ, if one exists (thereby returning it to the original queue), or delete it.

The failure strategy involves publishing a copy of the message to the waiting queue and acknowledging the original message to discard it. Later, the copy message will expire and will be reprocessed from the main queue.

Only one DLQ can be assigned to a queue. Thus, if a message is "nacked" to the DLQ as a retry mechanism, a real DLQ cannot exist. This lead to two cons:

  • when retry count is exhausted, we can only ack the message and discard it.
  • it should now be possible for users to implement their own failure strategy based on the exception and choose to either reconsume the message later or directly nack it to the DLQ/discard it using the existing set of failure strategies.

I agree for the complexity. While the strategy itself is simple, the code required to create a queue is not very reusable. Refactoring may be necessary to better share this code without resorting to the reconsume strategy trick with the config map.

@scrocquesel
Copy link
Contributor Author

scrocquesel commented Jun 23, 2023

@ozangunalp I reverted the branch to only refactor failure handlers with CDI (and add a missing settings for delivery limit). All in all with #2182 , it'll allow me to achieve what I want: Retry latter on specific exception + move poison message in a queue without consumer.

@ozangunalp
Copy link
Collaborator

@scrocquesel In my earlier comment #2087 (comment) I mentioned this possibility and tested it out, and hopefully before I had time to create a PR @andr3adc actually did it :)

Thank you very much for your contribution!

@ozangunalp ozangunalp merged commit 308c6bf into smallrye:main Jul 4, 2023
3 checks passed
@ozangunalp ozangunalp added this to the 4.9.0 milestone Jul 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants