Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ requeue with additional nack metadata #2239

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,31 @@ controlled by the `failure-strategy` channel setting:
- `reject` - this strategy marks the RabbitMQ message as rejected
(default). The processing continues with the next message.

- `requeue` - this strategy marks the RabbitMQ message as rejected
with requeue flag to true. The processing continues with the next message,
but the requeued message will be redelivered to the consumer.

When using `dead-letter-queue`, it is also possible to change some
metadata of the record that is sent to the dead letter topic. To do
that, use the `Message.nack(Throwable, Metadata)` method:

The RabbitMQ reject `requeue` flag can be controlled on different failure strategies
using the {{ javadoc('io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata') }}.
To do that, use the `Message.nack(Throwable, Metadata)` method by including the
`RabbitMQRejectMetadata` metadata with `requeue` to `true`.

``` java
{{ insert('rabbitmq/inbound/RabbitMQRejectMetadataExample.java', 'code') }}
```

!!!warning "Experimental"
`RabbitMQFailureHandler` is experimental and APIs are subject to change in the future

In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface
{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, qualified with a `@Identifier`. Set the name of the bean
as the `failure-strategy` channel setting.
In addition, you can also provide your own failure strategy.
To provide a failure strategy implement a bean exposing the interface
{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }},
qualified with a `@Identifier`.
Set the name of the bean as the `failure-strategy` channel setting.


## Configuration Reference
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package rabbitmq.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata;

@ApplicationScoped
public class RabbitMQRejectMetadataExample {

// <code>
@Incoming("in")
public CompletionStage<Void> consume(Message<String> message) {
return message.nack(new Exception("Failed!"), Metadata.of(
new RabbitMQRejectMetadata(true)));
}
// </code>

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public Uni<Void> getAck(final long deliveryTag) {
return client.basicAck(deliveryTag, false);
}

public Function<Throwable, CompletionStage<Void>> getNack(final long deliveryTag, final boolean requeue) {
return t -> client.basicNack(deliveryTag, false, requeue).subscribeAsCompletionStage();
public Function<Throwable, Uni<Void>> getNack(final long deliveryTag, final boolean requeue) {
return t -> client.basicNack(deliveryTag, false, requeue);
}

public Vertx getVertx() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Cont
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context,
Throwable reason) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down Expand Up @@ -112,7 +113,7 @@ public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
// We must switch to the context having created the message.
// This context is passed when this instance of message is created.
// It's more a Vert.x RabbitMQ client issue which should ensure calling `not accepted` on the right context.
return onNack.handle(this, context, reason);
return onNack.handle(this, metadata, context, reason);
} finally {
// Ensure ack/nack are only called once
onAck = AlreadyAcknowledgedHandler.INSTANCE;
Expand All @@ -134,7 +135,24 @@ public void acknowledgeMessage() {
* @param reason the cause of the rejection, which must not be null
*/
public void rejectMessage(Throwable reason) {
holder.getNack(this.deliveryTag, false).apply(reason);
this.rejectMessage(reason, false);
holder.getNack(this.deliveryTag, false).apply(reason).subscribeAsCompletionStage();
}

/**
* Rejects the message by nack'ing it.
* <p>
* This will either discard the message for good, requeue (if requeue=true is set)
* or (if a DLQ has been set up) send it to the DLQ.
* <p>
* Please note that requeue is potentially dangerous as it can lead to
* very high load if all consumers reject and requeue a message repeatedly.
*
* @param reason the cause of the rejection, which must not be null
* @param requeue the requeue flag
*/
public void rejectMessage(Throwable reason, boolean requeue) {
holder.getNack(this.deliveryTag, requeue).apply(reason).subscribeAsCompletionStage();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
@ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string")

// Message consumer
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default), `requeue` or name of a bean", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "keep-most-recent", direction = INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.smallrye.reactive.messaging.rabbitmq;

import org.eclipse.microprofile.reactive.messaging.Metadata;

/**
* Additional nack metadata that specifies flags for the 'basic.reject' operation.
*
* @see {@link IncomingRabbitMQMessage#nack(Throwable, Metadata)}
*/
public class RabbitMQRejectMetadata {

private final boolean requeue;

/**
* Constructor.
*
* @param requeue requeue the message
*/
public RabbitMQRejectMetadata(boolean requeue) {
this.requeue = requeue;
}

/**
* If requeue is true, the server will attempt to requeue the message.
* If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
*
* @return requeue the message
*/
public boolean isRequeue() {
return requeue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
Expand Down Expand Up @@ -39,7 +41,8 @@ public RabbitMQAccept(String channel) {
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as rejected and fail.
log.nackedAcceptMessage(channel);
log.fullIgnoredFailure(reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.*;
import io.vertx.mutiny.core.Context;

/**
Expand Down Expand Up @@ -41,10 +41,14 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) {
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as rejected and fail.
log.nackedFailMessage(channel);
connector.reportIncomingFailure(channel, reason);
return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason));
boolean requeue = Optional.ofNullable(metadata)
.flatMap(md -> md.get(RabbitMQRejectMetadata.class))
.map(RabbitMQRejectMetadata::isRequeue).orElse(false);
return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason, requeue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
Expand All @@ -20,8 +22,8 @@ public interface RabbitMQFailureHandler {
interface Strategy {
String FAIL = "fail";
String ACCEPT = "accept";
String RELEASE = "release";
String REJECT = "reject";
String REQUEUE = "requeue";
}

/**
Expand All @@ -37,11 +39,12 @@ RabbitMQFailureHandler create(
* Handle message failure.
*
* @param message the failed message
* @param metadata additional nack metadata, may be {@code null}
* @param context the {@link Context} in which the handling should be done
* @param reason the reason for the failure
* @param <V> message body type
* @return a {@link CompletionStage}
*/
<V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Context context, Throwable reason);
<V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context, Throwable reason);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.*;
import io.vertx.mutiny.core.Context;

public class RabbitMQReject implements RabbitMQFailureHandler {
Expand All @@ -36,10 +36,14 @@ public RabbitMQReject(String channel) {
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as rejected and fail.
log.nackedIgnoreMessage(channel);
log.fullIgnoredFailure(reason);
return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason));
boolean requeue = Optional.ofNullable(metadata)
.flatMap(md -> md.get(RabbitMQRejectMetadata.class))
.map(RabbitMQRejectMetadata::isRequeue).orElse(false);
return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, requeue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.smallrye.reactive.messaging.rabbitmq.fault;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata;
import io.vertx.mutiny.core.Context;

public class RabbitMQRequeue implements RabbitMQFailureHandler {
private final String channel;

@ApplicationScoped
@Identifier(Strategy.REQUEUE)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQRequeue(config.getChannel());
}
}

/**
* Constructor.
*
* @param channel the channel
*/
public RabbitMQRequeue(String channel) {
this.channel = channel;
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as requeued and fail.
log.nackedIgnoreMessage(channel);
log.fullIgnoredFailure(reason);
boolean requeue = Optional.ofNullable(metadata)
.flatMap(md -> md.get(RabbitMQRejectMetadata.class))
.map(RabbitMQRejectMetadata::isRequeue).orElse(true);
return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, requeue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.Test;

import com.rabbitmq.client.AMQP.BasicProperties;
Expand All @@ -32,7 +33,8 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Cont

RabbitMQFailureHandler doNothingNack = new RabbitMQFailureHandler() {
@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context,
Throwable reason) {
return CompletableFuture.completedFuture(null);
}
};
Expand Down
Loading
Loading