Skip to content

Commit

Permalink
RabbitMq requeue failure-strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Sep 25, 2023
1 parent 103e1a2 commit 15702ef
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,31 @@ controlled by the `failure-strategy` channel setting:
- `reject` - this strategy marks the RabbitMQ message as rejected
(default). The processing continues with the next message.

- `requeue` - this strategy marks the RabbitMQ message as rejected
with requeue flag to true. The processing continues with the next message,
but the requeued message will be redelivered to the consumer.

When using `dead-letter-queue`, it is also possible to change some
metadata of the record that is sent to the dead letter topic. To do
that, use the `Message.nack(Throwable, Metadata)` method:

The RabbitMQ reject `requeue` flag can be controlled on different failure strategies
using the {{ javadoc('io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata') }}.
To do that, use the `Message.nack(Throwable, Metadata)` method by including the
`RabbitMQRejectMetadata` metadata with `requeue` to `true`.

``` java
{{ insert('rabbitmq/inbound/RabbitMQRejectMetadataExample.java', 'code') }}
```

!!!warning "Experimental"
`RabbitMQFailureHandler` is experimental and APIs are subject to change in the future

In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface
{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, qualified with a `@Identifier`. Set the name of the bean
as the `failure-strategy` channel setting.
In addition, you can also provide your own failure strategy.
To provide a failure strategy implement a bean exposing the interface
{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }},
qualified with a `@Identifier`.
Set the name of the bean as the `failure-strategy` channel setting.


## Configuration Reference
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package rabbitmq.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata;

@ApplicationScoped
public class RabbitMQRejectMetadataExample {

// <code>
@Incoming("in")
public CompletionStage<Void> consume(Message<String> message) {
return message.nack(new Exception("Failed!"), Metadata.of(
new RabbitMQRejectMetadata(true)));
}
// </code>

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public Uni<Void> getAck(final long deliveryTag) {
return client.basicAck(deliveryTag, false);
}

public Function<Throwable, CompletionStage<Void>> getNack(final long deliveryTag, final boolean requeue) {
return t -> client.basicNack(deliveryTag, false, requeue).subscribeAsCompletionStage();
public Function<Throwable, Uni<Void>> getNack(final long deliveryTag, final boolean requeue) {
return t -> client.basicNack(deliveryTag, false, requeue);
}

public Vertx getVertx() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Cont
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context,
Throwable reason) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down Expand Up @@ -127,23 +128,31 @@ public void acknowledgeMessage() {
holder.getAck(this.deliveryTag).subscribeAsCompletionStage();
}

/**
* Rejects the message by nack'ing with requeue=false; this will either discard the message for good or
* (if a DLQ has been set up) send it to the DLQ.
*
* @param reason the cause of the rejection, which must not be null
*/
public void rejectMessage(Throwable reason) {
this.rejectMessage(reason, false);
holder.getNack(this.deliveryTag, false).apply(reason).subscribeAsCompletionStage();
}

/**
* Rejects the message by nack'ing it.
* <p>
* This will either discard the message for good, requeue (if {@link RabbitMQRejectMetadata#isRequeue()} is set)
* This will either discard the message for good, requeue (if requeue=true is set)
* or (if a DLQ has been set up) send it to the DLQ.
* <p>
* Please note that requeue is potentially dangerous as it can lead to
* very high load if all consumers reject and requeue a message repeatedly.
*
* @param reason the cause of the rejection, which must not be null
* @param metadata additional nack metadata, may be {@code null}
* @param requeue the requeue flag
*/
public void rejectMessage(Throwable reason, Metadata metadata) {
Optional<RabbitMQRejectMetadata> rejectMetadata =
Optional.ofNullable(metadata).flatMap(md -> md.get(RabbitMQRejectMetadata.class));
boolean requeue = rejectMetadata.map(RabbitMQRejectMetadata::isRequeue).orElse(false);
holder.getNack(this.deliveryTag, requeue).apply(reason);
public void rejectMessage(Throwable reason, boolean requeue) {
holder.getNack(this.deliveryTag, requeue).apply(reason).subscribeAsCompletionStage();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
@ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string")

// Message consumer
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default), `requeue` or name of a bean", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "keep-most-recent", direction = INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/**
* A {@link RabbitMQFailureHandler} that in effect treats the nack as an ack.
Expand Down Expand Up @@ -40,7 +41,8 @@ public RabbitMQAccept(String channel) {
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as rejected and fail.
log.nackedAcceptMessage(channel);
log.fullIgnoredFailure(reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import io.smallrye.reactive.messaging.rabbitmq.*;
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.*;
import io.vertx.mutiny.core.Context;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/**
* A {@link RabbitMQFailureHandler} that rejects the message and reports a failure.
Expand Down Expand Up @@ -39,10 +41,14 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) {
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as rejected and fail.
log.nackedFailMessage(channel);
connector.reportIncomingFailure(channel, reason);
return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason, metadata));
boolean requeue = Optional.ofNullable(metadata)
.flatMap(md -> md.get(RabbitMQRejectMetadata.class))
.map(RabbitMQRejectMetadata::isRequeue).orElse(false);
return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason, requeue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/**
* Implemented to provide message failure strategies.
Expand All @@ -21,8 +22,8 @@ public interface RabbitMQFailureHandler {
interface Strategy {
String FAIL = "fail";
String ACCEPT = "accept";
String RELEASE = "release";
String REJECT = "reject";
String REQUEUE = "requeue";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import io.smallrye.reactive.messaging.rabbitmq.*;
import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.*;
import io.vertx.mutiny.core.Context;
import org.eclipse.microprofile.reactive.messaging.Metadata;

public class RabbitMQReject implements RabbitMQFailureHandler {
private final String channel;
Expand All @@ -34,10 +36,14 @@ public RabbitMQReject(String channel) {
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as rejected and fail.
log.nackedIgnoreMessage(channel);
log.fullIgnoredFailure(reason);
return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, metadata));
boolean requeue = Optional.ofNullable(metadata)
.flatMap(md -> md.get(RabbitMQRejectMetadata.class))
.map(RabbitMQRejectMetadata::isRequeue).orElse(false);
return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, requeue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.smallrye.reactive.messaging.rabbitmq.fault;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata;
import io.vertx.mutiny.core.Context;

public class RabbitMQRequeue implements RabbitMQFailureHandler {
private final String channel;

@ApplicationScoped
@Identifier(Strategy.REQUEUE)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQRequeue(config.getChannel());
}
}

/**
* Constructor.
*
* @param channel the channel
*/
public RabbitMQRequeue(String channel) {
this.channel = channel;
}

@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Metadata metadata, Context context,
Throwable reason) {
// We mark the message as requeued and fail.
log.nackedIgnoreMessage(channel);
log.fullIgnoredFailure(reason);
boolean requeue = Optional.ofNullable(metadata)
.flatMap(md -> md.get(RabbitMQRejectMetadata.class))
.map(RabbitMQRejectMetadata::isRequeue).orElse(true);
return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, requeue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Cont

RabbitMQFailureHandler doNothingNack = new RabbitMQFailureHandler() {
@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context, Throwable reason) {
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> message, Metadata metadata, Context context,
Throwable reason) {
return CompletableFuture.completedFuture(null);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral;
import org.jboss.weld.environment.se.Weld;
Expand All @@ -20,6 +19,7 @@

import io.netty.handler.codec.http.HttpHeaderValues;
import io.smallrye.config.SmallRyeConfigProviderResolver;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -636,7 +636,8 @@ void testNackWithRejectAndRequeue() {
.put("mp.messaging.incoming.data.exchange.name", exchangeName)
.put("mp.messaging.incoming.data.exchange.durable", false)
.put("mp.messaging.incoming.data.queue.name", queueName)
.put("mp.messaging.incoming.data.queue.durable", false)
.put("mp.messaging.incoming.data.queue.x-queue-type", "quorum")
.put("mp.messaging.incoming.data.queue.x-delivery-limit", 2)
.put("mp.messaging.incoming.data.queue.routing-keys", routingKey)
.put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.put("mp.messaging.incoming.data.host", host)
Expand Down Expand Up @@ -664,22 +665,35 @@ void testNackWithRejectAndRequeue() {

container = weld.initialize();
await().until(() -> isRabbitMQConnectorAvailable(container));
RequeueFirstDeliveryBean bean = container.getBeanManager().createInstance().select(RequeueFirstDeliveryBean.class).get();
RequeueFirstDeliveryBean bean = container.getBeanManager().createInstance().select(RequeueFirstDeliveryBean.class)
.get();

await().until(() -> isRabbitMQConnectorAvailable(container));

List<Integer> list = bean.getResults();
assertThat(list).isEmpty();

List<Integer> redelivered = bean.getRedelivered();
assertThat(redelivered).isEmpty();

List<Integer> dlqList = bean.getDlqResults();
assertThat(dlqList).isEmpty();

AtomicInteger counter = new AtomicInteger();
usage.produceTenIntegers(exchangeName, queueName, routingKey, counter::getAndIncrement);

await().atMost(1, TimeUnit.MINUTES).until(() -> list.size() >= 10);
assertThat(list).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(dlqList).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
assertThat(list)
.hasSizeGreaterThanOrEqualTo(30)
.containsExactlyInAnyOrder(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(redelivered).containsExactly(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(dlqList).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
});
}

}
Loading

0 comments on commit 15702ef

Please sign in to comment.