diff --git a/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md b/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md index b2d6990156..52b18f4954 100644 --- a/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md +++ b/documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md @@ -173,6 +173,14 @@ controlled by the `failure-strategy` channel setting: - `reject` - this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message. +!!!warning "Experimental" + `RabbitMQFailureHandler` is experimental and APIs are subject to change in the future + +In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface +{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, qualified with a `@Identifier`. Set the name of the bean +as the `failure-strategy` channel setting. + + ## Configuration Reference {{ insert('../../../target/connectors/smallrye-rabbitmq-incoming.md') }} diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index c9733b5414..cb4e7cde8f 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -46,14 +46,12 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; +import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck; -import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept; -import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop; import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler; -import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject; import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter; import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace; import io.vertx.core.json.JsonObject; @@ -114,6 +112,7 @@ @ConnectorAttribute(name = "max-incoming-internal-queue-size", direction = INCOMING, description = "The maximum size of the incoming internal queue", type = "int", defaultValue = "500000") @ConnectorAttribute(name = "connection-count", direction = INCOMING, description = "The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.", type = "int", defaultValue = "1") @ConnectorAttribute(name = "queue.x-max-priority", direction = INCOMING, description = "Define priority level queue consumer", type = "int") +@ConnectorAttribute(name = "queue.x-delivery-limit", direction = INCOMING, description = "If queue.x-queue-type is quorum, when a message has been returned more times than the limit the message will be dropped or dead-lettered", type = "long") // DLQs @ConnectorAttribute(name = "auto-bind-dlq", direction = INCOMING, description = "Whether to automatically declare the DLQ and bind it to the binder DLX", type = "boolean", defaultValue = "false") @@ -129,7 +128,7 @@ @ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string") // Message consumer -@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default)", type = "string", defaultValue = "reject") +@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = "reject") @ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "keep-most-recent", direction = INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false") @@ -190,6 +189,10 @@ private enum ChannelStatus { private volatile RabbitMQOpenTelemetryInstrumenter instrumenter; + @Inject + @Any + Instance failureHandlerFactories; + RabbitMQConnector() { // used for proxies } @@ -464,6 +467,13 @@ public void addClient(String channel, RabbitMQClient client) { clients.put(channel, client); } + public void establishQueue(String channel, RabbitMQConnectorIncomingConfiguration ic) { + final RabbitMQClient client = clients.get(channel); + client.getDelegate().addConnectionEstablishedCallback(promise -> { + establishQueue(client, ic).subscribe().with((ignored) -> promise.complete(), promise::fail); + }); + } + /** * Uses a {@link RabbitMQClient} to ensure the required exchange is created. * @@ -521,7 +531,9 @@ private Uni establishQueue( } }); //x-max-priority - ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", ic.getQueueXMaxPriority())); + ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", maxPriority)); + //x-delivery-limit + ic.getQueueXDeliveryLimit().ifPresent(deliveryLimit -> queueArgs.put("x-delivery-limit", deliveryLimit)); return establishExchange(client, ic) .onItem().transform(v -> Boolean.TRUE.equals(ic.getQueueDeclare()) ? null : queueName) @@ -596,23 +608,13 @@ private Map parseArguments( private RabbitMQFailureHandler createFailureHandler(RabbitMQConnectorIncomingConfiguration config) { String strategy = config.getFailureStrategy(); - RabbitMQFailureHandler.Strategy actualStrategy = RabbitMQFailureHandler.Strategy.from(strategy); - switch (actualStrategy) { - case FAIL: - return new RabbitMQFailStop(this, config.getChannel()); - case ACCEPT: - return new RabbitMQAccept(config.getChannel()); - case REJECT: - return new RabbitMQReject(config.getChannel()); - default: - throw ex.illegalArgumentInvalidFailureStrategy(strategy); + Instance failureHandlerFactory = CDIUtils.getInstanceById(failureHandlerFactories, + strategy); + if (failureHandlerFactory.isResolvable()) { + return failureHandlerFactory.get().create(config, this); + } else { + throw ex.illegalArgumentInvalidFailureStrategy(strategy); } - - } - - private RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) { - return (Boolean.TRUE.equals(ic.getAutoAcknowledgement())) ? new RabbitMQAutoAck(ic.getChannel()) - : new RabbitMQAck(ic.getChannel()); } private String serverQueueName(String name) { @@ -635,4 +637,9 @@ public void reportIncomingFailure(String channel, Throwable reason) { client.stopAndForget(); } } + + public RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) { + return (Boolean.TRUE.equals(ic.getAutoAcknowledgement())) ? new RabbitMQAutoAck(ic.getChannel()) + : new RabbitMQAck(ic.getChannel()); + } } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java index 77e7fa2582..06da2bd4df 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQAccept.java @@ -4,8 +4,13 @@ import java.util.concurrent.CompletionStage; +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; /** @@ -14,6 +19,16 @@ public class RabbitMQAccept implements RabbitMQFailureHandler { private final String channel; + @ApplicationScoped + @Identifier(Strategy.ACCEPT) + public static class Factory implements RabbitMQFailureHandler.Factory { + + @Override + public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) { + return new RabbitMQAccept(config.getChannel()); + } + } + /** * Constructor. * diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java index 7f97fbb5c1..917506e6c2 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailStop.java @@ -1,11 +1,16 @@ package io.smallrye.reactive.messaging.rabbitmq.fault; +import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log; + import java.util.concurrent.CompletionStage; +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; -import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; /** @@ -15,6 +20,16 @@ public class RabbitMQFailStop implements RabbitMQFailureHandler { private final String channel; private final RabbitMQConnector connector; + @ApplicationScoped + @Identifier(Strategy.FAIL) + public static class Factory implements RabbitMQFailureHandler.Factory { + + @Override + public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) { + return new RabbitMQFailStop(connector, config.getChannel()); + } + } + /** * Constructor. * @@ -28,7 +43,7 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) { @Override public CompletionStage handle(IncomingRabbitMQMessage msg, Context context, Throwable reason) { // We mark the message as rejected and fail. - RabbitMQLogging.log.nackedFailMessage(channel); + log.nackedFailMessage(channel); connector.reportIncomingFailure(channel, reason); return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason)); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java index 4a55c4ca5e..50d51e564b 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQFailureHandler.java @@ -1,50 +1,36 @@ package io.smallrye.reactive.messaging.rabbitmq.fault; -import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex; - import java.util.concurrent.CompletionStage; +import io.smallrye.common.annotation.Experimental; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; /** * Implemented to provide message failure strategies. */ +@Experimental("Experimental API") public interface RabbitMQFailureHandler { - enum Strategy { - /** - * Mark the message as {@code rejected} and die. - */ - FAIL, - /** - * Mark the message as {@code accepted} and continue. - */ - ACCEPT, - /** - * Mark the message as {@code released} and continue. - */ - RELEASE, - /** - * Mark the message as {@code rejected} and continue. - */ - REJECT; + /** + * Identifiers of default failure strategies + */ + interface Strategy { + String FAIL = "fail"; + String ACCEPT = "accept"; + String RELEASE = "release"; + String REJECT = "reject"; + } - public static Strategy from(String s) { - if (s == null || s.equalsIgnoreCase("fail")) { - return FAIL; - } - if (s.equalsIgnoreCase("accept")) { - return ACCEPT; - } - if (s.equalsIgnoreCase("release")) { - return RELEASE; - } - if (s.equalsIgnoreCase("reject")) { - return REJECT; - } - throw ex.illegalArgumentUnknownFailureStrategy(s); - } + /** + * Factory interface for {@link RabbitMQFailureHandler} + */ + interface Factory { + RabbitMQFailureHandler create( + RabbitMQConnectorIncomingConfiguration config, + RabbitMQConnector connector); } /** diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java index a19846efa1..b03dbb3da2 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/fault/RabbitMQReject.java @@ -4,13 +4,28 @@ import java.util.concurrent.CompletionStage; +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; +import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration; import io.vertx.mutiny.core.Context; public class RabbitMQReject implements RabbitMQFailureHandler { private final String channel; + @ApplicationScoped + @Identifier(Strategy.REJECT) + public static class Factory implements RabbitMQFailureHandler.Factory { + + @Override + public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) { + return new RabbitMQReject(config.getChannel()); + } + } + /** * Constructor. * diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index 4e08d313e1..f6c21b5c6c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -347,6 +347,50 @@ void testIncomingDeclarationsWithDLQ() throws Exception { assertThat(dlqBinding1.getString("routing_key")).isEqualTo(dlxRoutingKey); } + /** + * Verifies that Exchanges, Queues and Bindings are correctly declared as a result of + * incoming connector configuration that specifies Quorum/Delivery limit overrides. + */ + @Test + void testIncomingDeclarationsWithQuorum() throws Exception { + + final String queueName = "qIncomingDeclareTestWithDeliveryLimit"; + final boolean queueDurable = true; + final String queueType = "quorum"; + final long queueDeliveryLimit = 10; + + weld.addBeanClass(IncomingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.queue.durable", queueDurable) + .put("mp.messaging.incoming.data.queue.x-queue-type", queueType) + .put("mp.messaging.incoming.data.queue.x-delivery-limit", queueDeliveryLimit) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + await().until(() -> isRabbitMQConnectorAvailable(container)); + + // verify queue + final JsonObject queue = usage.getQueue(queueName); + assertThat(queue).isNotNull(); + assertThat(queue.getString("name")).isEqualTo(queueName); + assertThat(queue.getBoolean("durable")).isEqualTo(queueDurable); + + final JsonObject queueArguments = queue.getJsonObject("arguments"); + assertThat(queueArguments).isNotNull(); + assertThat(queueArguments.getString("x-queue-type")).isEqualTo(queueType); + assertThat(queueArguments.getLong("x-delivery-limit")).isEqualTo(queueDeliveryLimit); + } + /** * Verifies that messages can be sent to RabbitMQ. * diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java index 03d3131825..b6a98b62f0 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java @@ -30,6 +30,9 @@ import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator; import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator; import io.smallrye.reactive.messaging.providers.wiring.Wiring; +import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept; +import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop; +import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; public class WeldTestBase extends RabbitMQBrokerTestBase { @@ -69,6 +72,9 @@ public void initWeld() { weld.addBeanClass(MetricDecorator.class); weld.addBeanClass(MicrometerDecorator.class); weld.addBeanClass(ContextDecorator.class); + weld.addBeanClass(RabbitMQAccept.Factory.class); + weld.addBeanClass(RabbitMQFailStop.Factory.class); + weld.addBeanClass(RabbitMQReject.Factory.class); weld.disableDiscovery(); }