From 103e1a2b748f6ec6980f6a64c799b11899c0f6df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20K=C3=B6tter?= Date: Tue, 1 Aug 2023 12:59:39 +0200 Subject: [PATCH 1/2] RabbitMQ requeue with additional nack metadata --- .../rabbitmq/IncomingRabbitMQMessage.java | 21 ++++-- .../rabbitmq/RabbitMQRejectMetadata.java | 32 ++++++++ .../rabbitmq/fault/RabbitMQAccept.java | 3 +- .../rabbitmq/fault/RabbitMQFailStop.java | 10 +-- .../fault/RabbitMQFailureHandler.java | 4 +- .../rabbitmq/fault/RabbitMQReject.java | 10 +-- .../rabbitmq/IncomingRabbitMQMessageTest.java | 3 +- .../messaging/rabbitmq/RabbitMQTest.java | 62 +++++++++++++++ .../rabbitmq/RequeueFirstDeliveryBean.java | 75 +++++++++++++++++++ 9 files changed, 199 insertions(+), 21 deletions(-) create mode 100644 smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQRejectMetadata.java create mode 100644 smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index 13024b3d53..95055841ac 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -44,7 +44,7 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Cont } @Override - public CompletionStage handle(IncomingRabbitMQMessage message, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, Throwable reason) { return CompletableFuture.completedFuture(null); } } @@ -112,7 +112,7 @@ public CompletionStage nack(Throwable reason, Metadata metadata) { // We must switch to the context having created the message. // This context is passed when this instance of message is created. // It's more a Vert.x RabbitMQ client issue which should ensure calling `not accepted` on the right context. - return onNack.handle(this, context, reason); + return onNack.handle(this, metadata, context, reason); } finally { // Ensure ack/nack are only called once onAck = AlreadyAcknowledgedHandler.INSTANCE; @@ -128,13 +128,22 @@ public void acknowledgeMessage() { } /** - * Rejects the message by nack'ing with requeue=false; this will either discard the message for good or - * (if a DLQ has been set up) send it to the DLQ. + * Rejects the message by nack'ing it. + *

+ * This will either discard the message for good, requeue (if {@link RabbitMQRejectMetadata#isRequeue()} is set) + * or (if a DLQ has been set up) send it to the DLQ. + *

+ * Please note that requeue is potentially dangerous as it can lead to + * very high load if all consumers reject and requeue a message repeatedly. * * @param reason the cause of the rejection, which must not be null + * @param metadata additional nack metadata, may be {@code null} */ - public void rejectMessage(Throwable reason) { - holder.getNack(this.deliveryTag, false).apply(reason); + public void rejectMessage(Throwable reason, Metadata metadata) { + Optional rejectMetadata = + Optional.ofNullable(metadata).flatMap(md -> md.get(RabbitMQRejectMetadata.class)); + boolean requeue = rejectMetadata.map(RabbitMQRejectMetadata::isRequeue).orElse(false); + holder.getNack(this.deliveryTag, requeue).apply(reason); } @Override diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQRejectMetadata.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQRejectMetadata.java new file mode 100644 index 0000000000..d6aefb656f --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQRejectMetadata.java @@ -0,0 +1,32 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +/** + * Additional nack metadata that specifies flags for the 'basic.reject' operation. + * + * @see {@link IncomingRabbitMQMessage#nack(Throwable, Metadata)} + */ +public class RabbitMQRejectMetadata { + + private final boolean requeue; + + /** + * Constructor. + * + * @param requeue requeue the message + */ + public RabbitMQRejectMetadata(boolean requeue) { + this.requeue = requeue; + } + + /** + * If requeue is true, the server will attempt to requeue the message. + * If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered. + * + * @return requeue the message + */ + public boolean isRequeue() { + return requeue; + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java index 06da2bd4df..8bdd3a3a23 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java @@ -12,6 +12,7 @@ import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; +import org.eclipse.microprofile.reactive.messaging.Metadata; /** * A {@link RabbitMQFailureHandler} that in effect treats the nack as an ack. @@ -39,7 +40,7 @@ public RabbitMQAccept(String channel) { } @Override - public CompletionStage handle(IncomingRabbitMQMessage msg, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, Throwable reason) { // We mark the message as rejected and fail. log.nackedAcceptMessage(channel); log.fullIgnoredFailure(reason); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java index 917506e6c2..11fc0763a4 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java @@ -4,14 +4,12 @@ import java.util.concurrent.CompletionStage; +import io.smallrye.reactive.messaging.rabbitmq.*; import jakarta.enterprise.context.ApplicationScoped; import io.smallrye.common.annotation.Identifier; -import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; -import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; -import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; -import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; +import org.eclipse.microprofile.reactive.messaging.Metadata; /** * A {@link RabbitMQFailureHandler} that rejects the message and reports a failure. @@ -41,10 +39,10 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) { } @Override - public CompletionStage handle(IncomingRabbitMQMessage msg, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, Throwable reason) { // We mark the message as rejected and fail. log.nackedFailMessage(channel); connector.reportIncomingFailure(channel, reason); - return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason)); + return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason, metadata)); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java index 50d51e564b..0ab71706a2 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java @@ -7,6 +7,7 @@ import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; +import org.eclipse.microprofile.reactive.messaging.Metadata; /** * Implemented to provide message failure strategies. @@ -37,11 +38,12 @@ RabbitMQFailureHandler create( * Handle message failure. * * @param message the failed message + * @param metadata additional nack metadata, may be {@code null} * @param context the {@link Context} in which the handling should be done * @param reason the reason for the failure * @param message body type * @return a {@link CompletionStage} */ - CompletionStage handle(IncomingRabbitMQMessage message, Context context, Throwable reason); + CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, Throwable reason); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java index b03dbb3da2..ba704a7123 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java @@ -4,14 +4,12 @@ import java.util.concurrent.CompletionStage; +import io.smallrye.reactive.messaging.rabbitmq.*; import jakarta.enterprise.context.ApplicationScoped; import io.smallrye.common.annotation.Identifier; -import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; -import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; -import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; -import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; +import org.eclipse.microprofile.reactive.messaging.Metadata; public class RabbitMQReject implements RabbitMQFailureHandler { private final String channel; @@ -36,10 +34,10 @@ public RabbitMQReject(String channel) { } @Override - public CompletionStage handle(IncomingRabbitMQMessage msg, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, Throwable reason) { // We mark the message as rejected and fail. log.nackedIgnoreMessage(channel); log.fullIgnoredFailure(reason); - return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason)); + return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, metadata)); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java index 976b6922fd..f07bf03c03 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java @@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage; import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; import org.junit.jupiter.api.Test; import com.rabbitmq.client.AMQP.BasicProperties; @@ -32,7 +33,7 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Cont RabbitMQFailureHandler doNothingNack = new RabbitMQFailureHandler() { @Override - public CompletionStage handle(IncomingRabbitMQMessage message, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, Throwable reason) { return CompletableFuture.completedFuture(null); } }; diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index f6c21b5c6c..defb6ed93f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; import org.jboss.weld.environment.se.Weld; @@ -620,4 +621,65 @@ void testDefaultExchangeName() { assertThat(bean.getTypeCasts()).isEqualTo(0); assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } + + /** + * Verifies that messages can be requeued by RabbitMQ. + */ + @Test + void testNackWithRejectAndRequeue() { + final String exchangeName = "exchg6"; + final String queueName = "q6"; + final String dlxName = "dlx6"; + final String dlqName = "dlq6"; + final String routingKey = "xyzzy"; + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.durable", false) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.durable", false) + .put("mp.messaging.incoming.data.queue.routing-keys", routingKey) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing-enabled", false) + .put("mp.messaging.incoming.data.failure-strategy", RabbitMQFailureHandler.Strategy.REJECT) + .put("mp.messaging.incoming.data.auto-bind-dlq", true) + .put("mp.messaging.incoming.data.dead-letter-exchange", dlxName) + .put("mp.messaging.incoming.data.dead-letter-queue-name", dlqName) + .put("mp.messaging.incoming.data.dlx.declare", true) + .put("mp.messaging.incoming.data-dlq.exchange.name", dlxName) + .put("mp.messaging.incoming.data-dlq.exchange.type", "direct") + .put("mp.messaging.incoming.data-dlq.queue.name", dlqName) + .put("mp.messaging.incoming.data-dlq.queue.routing-keys", routingKey) + .put("mp.messaging.incoming.data-dlq.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data-dlq.host", host) + .put("mp.messaging.incoming.data-dlq.port", port) + .put("mp.messaging.incoming.data-dlq.tracing-enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + weld.addBeanClass(RequeueFirstDeliveryBean.class); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + RequeueFirstDeliveryBean bean = container.getBeanManager().createInstance().select(RequeueFirstDeliveryBean.class).get(); + + await().until(() -> isRabbitMQConnectorAvailable(container)); + + List list = bean.getResults(); + assertThat(list).isEmpty(); + + List dlqList = bean.getDlqResults(); + assertThat(dlqList).isEmpty(); + + AtomicInteger counter = new AtomicInteger(); + usage.produceTenIntegers(exchangeName, queueName, routingKey, counter::getAndIncrement); + + await().atMost(1, TimeUnit.MINUTES).until(() -> list.size() >= 10); + assertThat(list).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(dlqList).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java new file mode 100644 index 0000000000..61f48c0c50 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java @@ -0,0 +1,75 @@ +package io.smallrye.reactive.messaging.rabbitmq; + +import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A bean that can be registered to test rejecting and requeuing the + * first delivery attempt. Redeliveries will be nack'ed without requeue, + * so they should end up in the DLQ. + */ +@ApplicationScoped +public class RequeueFirstDeliveryBean { + private final List list = new ArrayList<>(); + private final List dlqList = new ArrayList<>(); + + private final AtomicInteger typeCastCounter = new AtomicInteger(); + + @Incoming("data") + @Outgoing("sink") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + int value = -1; + try { + value = Integer.parseInt(input.getPayload()); + } catch (ClassCastException e) { + typeCastCounter.incrementAndGet(); + } + + return Message.of(value + 1, () -> { + boolean isRedeliver = input.getMetadata(IncomingRabbitMQMetadata.class) + .map(IncomingRabbitMQMetadata::isRedeliver) + .orElse(false); + + if (isRedeliver) { + return input.nack(new RuntimeException("reject")); + } else { + return input.nack(new RuntimeException("requeue"), Metadata.of(new RabbitMQRejectMetadata(true))); + } + }); + } + + @Incoming("sink") + public void sink(int val) { + list.add(val); + } + + @Incoming("data-dlq") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public CompletionStage dlq(Message msg) { + try { + dlqList.add(Integer.parseInt(msg.getPayload())); + } catch (ClassCastException cce) { + typeCastCounter.incrementAndGet(); + } + + return msg.ack(); + } + + public List getResults() { + return list; + } + + public List getDlqResults() { + return dlqList; + } + + public int getTypeCasts() { + return typeCastCounter.get(); + } +} From 15702ef18f09e6d043bf03e91169fc647db1e453 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 25 Sep 2023 14:13:42 +0200 Subject: [PATCH 2/2] RabbitMq requeue failure-strategy --- .../receiving-messages-from-rabbitmq.md | 25 ++++++- .../RabbitMQRejectMetadataExample.java | 24 +++++++ .../messaging/rabbitmq/ConnectionHolder.java | 4 +- .../rabbitmq/IncomingRabbitMQMessage.java | 25 ++++--- .../messaging/rabbitmq/RabbitMQConnector.java | 2 +- .../rabbitmq/fault/RabbitMQAccept.java | 6 +- .../rabbitmq/fault/RabbitMQFailStop.java | 14 ++-- .../fault/RabbitMQFailureHandler.java | 5 +- .../rabbitmq/fault/RabbitMQReject.java | 14 ++-- .../rabbitmq/fault/RabbitMQRequeue.java | 53 +++++++++++++++ .../rabbitmq/IncomingRabbitMQMessageTest.java | 3 +- .../messaging/rabbitmq/RabbitMQTest.java | 26 +++++-- .../rabbitmq/RequeueFirstDeliveryBean.java | 67 ++++++------------- 13 files changed, 190 insertions(+), 78 deletions(-) create mode 100644 documentation/src/main/java/rabbitmq/inbound/RabbitMQRejectMetadataExample.java create mode 100644 smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQRequeue.java diff --git a/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md b/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md index 52b18f4954..659e73c4c1 100644 --- a/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md +++ b/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md @@ -173,12 +173,31 @@ controlled by the `failure-strategy` channel setting: - `reject` - this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message. +- `requeue` - this strategy marks the RabbitMQ message as rejected + with requeue flag to true. The processing continues with the next message, + but the requeued message will be redelivered to the consumer. + +When using `dead-letter-queue`, it is also possible to change some +metadata of the record that is sent to the dead letter topic. To do +that, use the `Message.nack(Throwable, Metadata)` method: + +The RabbitMQ reject `requeue` flag can be controlled on different failure strategies +using the {{ javadoc('io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata') }}. +To do that, use the `Message.nack(Throwable, Metadata)` method by including the +`RabbitMQRejectMetadata` metadata with `requeue` to `true`. + +``` java +{{ insert('rabbitmq/inbound/RabbitMQRejectMetadataExample.java', 'code') }} +``` + !!!warning "Experimental" `RabbitMQFailureHandler` is experimental and APIs are subject to change in the future -In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface -{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, qualified with a `@Identifier`. Set the name of the bean -as the `failure-strategy` channel setting. +In addition, you can also provide your own failure strategy. +To provide a failure strategy implement a bean exposing the interface +{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, +qualified with a `@Identifier`. +Set the name of the bean as the `failure-strategy` channel setting. ## Configuration Reference diff --git a/documentation/src/main/java/rabbitmq/inbound/RabbitMQRejectMetadataExample.java b/documentation/src/main/java/rabbitmq/inbound/RabbitMQRejectMetadataExample.java new file mode 100644 index 0000000000..25650199ec --- /dev/null +++ b/documentation/src/main/java/rabbitmq/inbound/RabbitMQRejectMetadataExample.java @@ -0,0 +1,24 @@ +package rabbitmq.inbound; + +import java.util.concurrent.CompletionStage; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata; + +@ApplicationScoped +public class RabbitMQRejectMetadataExample { + + // + @Incoming("in") + public CompletionStage consume(Message message) { + return message.nack(new Exception("Failed!"), Metadata.of( + new RabbitMQRejectMetadata(true))); + } + // + +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java index 21cd920e1d..c6fdae2484 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java @@ -91,8 +91,8 @@ public Uni getAck(final long deliveryTag) { return client.basicAck(deliveryTag, false); } - public Function> getNack(final long deliveryTag, final boolean requeue) { - return t -> client.basicNack(deliveryTag, false, requeue).subscribeAsCompletionStage(); + public Function> getNack(final long deliveryTag, final boolean requeue) { + return t -> client.basicNack(deliveryTag, false, requeue); } public Vertx getVertx() { diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java index 95055841ac..11b9fabd1d 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java @@ -44,7 +44,8 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Cont } @Override - public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, + Throwable reason) { return CompletableFuture.completedFuture(null); } } @@ -127,23 +128,31 @@ public void acknowledgeMessage() { holder.getAck(this.deliveryTag).subscribeAsCompletionStage(); } + /** + * Rejects the message by nack'ing with requeue=false; this will either discard the message for good or + * (if a DLQ has been set up) send it to the DLQ. + * + * @param reason the cause of the rejection, which must not be null + */ + public void rejectMessage(Throwable reason) { + this.rejectMessage(reason, false); + holder.getNack(this.deliveryTag, false).apply(reason).subscribeAsCompletionStage(); + } + /** * Rejects the message by nack'ing it. *

- * This will either discard the message for good, requeue (if {@link RabbitMQRejectMetadata#isRequeue()} is set) + * This will either discard the message for good, requeue (if requeue=true is set) * or (if a DLQ has been set up) send it to the DLQ. *

* Please note that requeue is potentially dangerous as it can lead to * very high load if all consumers reject and requeue a message repeatedly. * * @param reason the cause of the rejection, which must not be null - * @param metadata additional nack metadata, may be {@code null} + * @param requeue the requeue flag */ - public void rejectMessage(Throwable reason, Metadata metadata) { - Optional rejectMetadata = - Optional.ofNullable(metadata).flatMap(md -> md.get(RabbitMQRejectMetadata.class)); - boolean requeue = rejectMetadata.map(RabbitMQRejectMetadata::isRequeue).orElse(false); - holder.getNack(this.deliveryTag, requeue).apply(reason); + public void rejectMessage(Throwable reason, boolean requeue) { + holder.getNack(this.deliveryTag, requeue).apply(reason).subscribeAsCompletionStage(); } @Override diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index cb4e7cde8f..098e3ff5ba 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -128,7 +128,7 @@ @ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string") // Message consumer -@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = "reject") +@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default), `requeue` or name of a bean", type = "string", defaultValue = "reject") @ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "keep-most-recent", direction = INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false") diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java index 8bdd3a3a23..b0e92b4be6 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java @@ -6,13 +6,14 @@ import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Metadata; + import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; -import org.eclipse.microprofile.reactive.messaging.Metadata; /** * A {@link RabbitMQFailureHandler} that in effect treats the nack as an ack. @@ -40,7 +41,8 @@ public RabbitMQAccept(String channel) { } @Override - public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, + Throwable reason) { // We mark the message as rejected and fail. log.nackedAcceptMessage(channel); log.fullIgnoredFailure(reason); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java index 11fc0763a4..813c05534f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java @@ -2,14 +2,16 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import java.util.Optional; import java.util.concurrent.CompletionStage; -import io.smallrye.reactive.messaging.rabbitmq.*; import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Metadata; + import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.rabbitmq.*; import io.vertx.mutiny.core.Context; -import org.eclipse.microprofile.reactive.messaging.Metadata; /** * A {@link RabbitMQFailureHandler} that rejects the message and reports a failure. @@ -39,10 +41,14 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) { } @Override - public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, + Throwable reason) { // We mark the message as rejected and fail. log.nackedFailMessage(channel); connector.reportIncomingFailure(channel, reason); - return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason, metadata)); + boolean requeue = Optional.ofNullable(metadata) + .flatMap(md -> md.get(RabbitMQRejectMetadata.class)) + .map(RabbitMQRejectMetadata::isRequeue).orElse(false); + return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason, requeue)); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java index 0ab71706a2..d8e295567d 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java @@ -2,12 +2,13 @@ import java.util.concurrent.CompletionStage; +import org.eclipse.microprofile.reactive.messaging.Metadata; + import io.smallrye.common.annotation.Experimental; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; -import org.eclipse.microprofile.reactive.messaging.Metadata; /** * Implemented to provide message failure strategies. @@ -21,8 +22,8 @@ public interface RabbitMQFailureHandler { interface Strategy { String FAIL = "fail"; String ACCEPT = "accept"; - String RELEASE = "release"; String REJECT = "reject"; + String REQUEUE = "requeue"; } /** diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java index ba704a7123..9ab308ba26 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java @@ -2,14 +2,16 @@ import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; +import java.util.Optional; import java.util.concurrent.CompletionStage; -import io.smallrye.reactive.messaging.rabbitmq.*; import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Metadata; + import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.rabbitmq.*; import io.vertx.mutiny.core.Context; -import org.eclipse.microprofile.reactive.messaging.Metadata; public class RabbitMQReject implements RabbitMQFailureHandler { private final String channel; @@ -34,10 +36,14 @@ public RabbitMQReject(String channel) { } @Override - public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, + Throwable reason) { // We mark the message as rejected and fail. log.nackedIgnoreMessage(channel); log.fullIgnoredFailure(reason); - return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, metadata)); + boolean requeue = Optional.ofNullable(metadata) + .flatMap(md -> md.get(RabbitMQRejectMetadata.class)) + .map(RabbitMQRejectMetadata::isRequeue).orElse(false); + return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, requeue)); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQRequeue.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQRequeue.java new file mode 100644 index 0000000000..43755edf69 --- /dev/null +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQRequeue.java @@ -0,0 +1,53 @@ +package io.smallrye.reactive.messaging.rabbitmq.fault; + +import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; + +import java.util.Optional; +import java.util.concurrent.CompletionStage; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; +import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQRejectMetadata; +import io.vertx.mutiny.core.Context; + +public class RabbitMQRequeue implements RabbitMQFailureHandler { + private final String channel; + + @ApplicationScoped + @Identifier(Strategy.REQUEUE) + public static class Factory implements RabbitMQFailureHandler.Factory { + + @Override + public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) { + return new RabbitMQRequeue(config.getChannel()); + } + } + + /** + * Constructor. + * + * @param channel the channel + */ + public RabbitMQRequeue(String channel) { + this.channel = channel; + } + + @Override + public CompletionStage handle(IncomingRabbitMQMessage msg, Metadata metadata, Context context, + Throwable reason) { + // We mark the message as requeued and fail. + log.nackedIgnoreMessage(channel); + log.fullIgnoredFailure(reason); + boolean requeue = Optional.ofNullable(metadata) + .flatMap(md -> md.get(RabbitMQRejectMetadata.class)) + .map(RabbitMQRejectMetadata::isRequeue).orElse(true); + return ConnectionHolder.runOnContext(context, msg, m -> m.rejectMessage(reason, requeue)); + } +} diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java index f07bf03c03..82fce4d811 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessageTest.java @@ -33,7 +33,8 @@ public CompletionStage handle(IncomingRabbitMQMessage message, Cont RabbitMQFailureHandler doNothingNack = new RabbitMQFailureHandler() { @Override - public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, Throwable reason) { + public CompletionStage handle(IncomingRabbitMQMessage message, Metadata metadata, Context context, + Throwable reason) { return CompletableFuture.completedFuture(null); } }; diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index defb6ed93f..adda11f705 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -10,7 +10,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; import org.jboss.weld.environment.se.Weld; @@ -20,6 +19,7 @@ import io.netty.handler.codec.http.HttpHeaderValues; import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; @@ -636,7 +636,8 @@ void testNackWithRejectAndRequeue() { .put("mp.messaging.incoming.data.exchange.name", exchangeName) .put("mp.messaging.incoming.data.exchange.durable", false) .put("mp.messaging.incoming.data.queue.name", queueName) - .put("mp.messaging.incoming.data.queue.durable", false) + .put("mp.messaging.incoming.data.queue.x-queue-type", "quorum") + .put("mp.messaging.incoming.data.queue.x-delivery-limit", 2) .put("mp.messaging.incoming.data.queue.routing-keys", routingKey) .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) .put("mp.messaging.incoming.data.host", host) @@ -664,22 +665,35 @@ void testNackWithRejectAndRequeue() { container = weld.initialize(); await().until(() -> isRabbitMQConnectorAvailable(container)); - RequeueFirstDeliveryBean bean = container.getBeanManager().createInstance().select(RequeueFirstDeliveryBean.class).get(); + RequeueFirstDeliveryBean bean = container.getBeanManager().createInstance().select(RequeueFirstDeliveryBean.class) + .get(); await().until(() -> isRabbitMQConnectorAvailable(container)); List list = bean.getResults(); assertThat(list).isEmpty(); + List redelivered = bean.getRedelivered(); + assertThat(redelivered).isEmpty(); + List dlqList = bean.getDlqResults(); assertThat(dlqList).isEmpty(); AtomicInteger counter = new AtomicInteger(); usage.produceTenIntegers(exchangeName, queueName, routingKey, counter::getAndIncrement); - await().atMost(1, TimeUnit.MINUTES).until(() -> list.size() >= 10); - assertThat(list).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - assertThat(dlqList).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + assertThat(list) + .hasSizeGreaterThanOrEqualTo(30) + .containsExactlyInAnyOrder( + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(redelivered).containsExactly( + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(dlqList).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + }); } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java index 61f48c0c50..6ff0d293f7 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RequeueFirstDeliveryBean.java @@ -1,12 +1,12 @@ package io.smallrye.reactive.messaging.rabbitmq; -import jakarta.enterprise.context.ApplicationScoped; -import org.eclipse.microprofile.reactive.messaging.*; - -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.*; /** * A bean that can be registered to test rejecting and requeuing the @@ -15,50 +15,27 @@ */ @ApplicationScoped public class RequeueFirstDeliveryBean { - private final List list = new ArrayList<>(); - private final List dlqList = new ArrayList<>(); - - private final AtomicInteger typeCastCounter = new AtomicInteger(); + private final List list = new CopyOnWriteArrayList<>(); + private final List redelivered = new CopyOnWriteArrayList<>(); + private final List dlqList = new CopyOnWriteArrayList<>(); @Incoming("data") - @Outgoing("sink") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public Message process(Message input) { - int value = -1; - try { - value = Integer.parseInt(input.getPayload()); - } catch (ClassCastException e) { - typeCastCounter.incrementAndGet(); + public CompletionStage process(Message input) { + int value = Integer.parseInt(input.getPayload()); + list.add(value + 1); + + boolean redeliver = input.getMetadata(IncomingRabbitMQMetadata.class) + .map(IncomingRabbitMQMetadata::isRedeliver) + .orElse(false); + if (redeliver) { + redelivered.add(value + 1); } - - return Message.of(value + 1, () -> { - boolean isRedeliver = input.getMetadata(IncomingRabbitMQMetadata.class) - .map(IncomingRabbitMQMetadata::isRedeliver) - .orElse(false); - - if (isRedeliver) { - return input.nack(new RuntimeException("reject")); - } else { - return input.nack(new RuntimeException("requeue"), Metadata.of(new RabbitMQRejectMetadata(true))); - } - }); - } - - @Incoming("sink") - public void sink(int val) { - list.add(val); + return input.nack(new RuntimeException("requeue"), Metadata.of(new RabbitMQRejectMetadata(true))); } @Incoming("data-dlq") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) - public CompletionStage dlq(Message msg) { - try { - dlqList.add(Integer.parseInt(msg.getPayload())); - } catch (ClassCastException cce) { - typeCastCounter.incrementAndGet(); - } - - return msg.ack(); + public void dlq(String msg) { + dlqList.add(Integer.parseInt(msg)); } public List getResults() { @@ -69,7 +46,7 @@ public List getDlqResults() { return dlqList; } - public int getTypeCasts() { - return typeCastCounter.get(); + public List getRedelivered() { + return redelivered; } }