Skip to content

Commit

Permalink
Refactor RabbitMQ failure handler
Browse files Browse the repository at this point in the history
  • Loading branch information
scrocquesel committed Jun 23, 2023
1 parent a2643dc commit b18fc1f
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ controlled by the `failure-strategy` channel setting:
- `reject` - this strategy marks the RabbitMQ message as rejected
(default). The processing continues with the next message.

!!!warning "Experimental"
`RabbitMQFailureHandler` is experimental and APIs are subject to change in the future

In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface
{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, qualified with a `@Identifier`. Set the name of the bean
as the `failure-strategy` channel setting.


## Configuration Reference

{{ insert('../../../target/connectors/smallrye-rabbitmq-incoming.md') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -114,6 +112,7 @@
@ConnectorAttribute(name = "max-incoming-internal-queue-size", direction = INCOMING, description = "The maximum size of the incoming internal queue", type = "int", defaultValue = "500000")
@ConnectorAttribute(name = "connection-count", direction = INCOMING, description = "The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.", type = "int", defaultValue = "1")
@ConnectorAttribute(name = "queue.x-max-priority", direction = INCOMING, description = "Define priority level queue consumer", type = "int")
@ConnectorAttribute(name = "queue.x-delivery-limit", direction = INCOMING, description = "If queue.x-queue-type is quorum, when a message has been returned more times than the limit the message will be dropped or dead-lettered", type = "long")

// DLQs
@ConnectorAttribute(name = "auto-bind-dlq", direction = INCOMING, description = "Whether to automatically declare the DLQ and bind it to the binder DLX", type = "boolean", defaultValue = "false")
Expand All @@ -129,7 +128,7 @@
@ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string")

// Message consumer
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default)", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "keep-most-recent", direction = INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false")
Expand Down Expand Up @@ -190,6 +189,10 @@ private enum ChannelStatus {

private volatile RabbitMQOpenTelemetryInstrumenter instrumenter;

@Inject
@Any
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories;

RabbitMQConnector() {
// used for proxies
}
Expand Down Expand Up @@ -464,6 +467,13 @@ public void addClient(String channel, RabbitMQClient client) {
clients.put(channel, client);
}

public void establishQueue(String channel, RabbitMQConnectorIncomingConfiguration ic) {
final RabbitMQClient client = clients.get(channel);
client.getDelegate().addConnectionEstablishedCallback(promise -> {
establishQueue(client, ic).subscribe().with((ignored) -> promise.complete(), promise::fail);
});
}

/**
* Uses a {@link RabbitMQClient} to ensure the required exchange is created.
*
Expand Down Expand Up @@ -521,7 +531,9 @@ private Uni<String> establishQueue(
}
});
//x-max-priority
ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", ic.getQueueXMaxPriority()));
ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", maxPriority));
//x-delivery-limit
ic.getQueueXDeliveryLimit().ifPresent(deliveryLimit -> queueArgs.put("x-delivery-limit", deliveryLimit));

return establishExchange(client, ic)
.onItem().transform(v -> Boolean.TRUE.equals(ic.getQueueDeclare()) ? null : queueName)
Expand Down Expand Up @@ -596,23 +608,13 @@ private Map<String, Object> parseArguments(

private RabbitMQFailureHandler createFailureHandler(RabbitMQConnectorIncomingConfiguration config) {
String strategy = config.getFailureStrategy();
RabbitMQFailureHandler.Strategy actualStrategy = RabbitMQFailureHandler.Strategy.from(strategy);
switch (actualStrategy) {
case FAIL:
return new RabbitMQFailStop(this, config.getChannel());
case ACCEPT:
return new RabbitMQAccept(config.getChannel());
case REJECT:
return new RabbitMQReject(config.getChannel());
default:
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactory = CDIUtils.getInstanceById(failureHandlerFactories,
strategy);
if (failureHandlerFactory.isResolvable()) {
return failureHandlerFactory.get().create(config, this);
} else {
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
}

}

private RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
return (Boolean.TRUE.equals(ic.getAutoAcknowledgement())) ? new RabbitMQAutoAck(ic.getChannel())
: new RabbitMQAck(ic.getChannel());
}

private String serverQueueName(String name) {
Expand All @@ -635,4 +637,9 @@ public void reportIncomingFailure(String channel, Throwable reason) {
client.stopAndForget();
}
}

public RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
return (Boolean.TRUE.equals(ic.getAutoAcknowledgement())) ? new RabbitMQAutoAck(ic.getChannel())
: new RabbitMQAck(ic.getChannel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

/**
Expand All @@ -14,6 +19,16 @@
public class RabbitMQAccept implements RabbitMQFailureHandler {
private final String channel;

@ApplicationScoped
@Identifier(Strategy.ACCEPT)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQAccept(config.getChannel());
}
}

/**
* Constructor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.smallrye.reactive.messaging.rabbitmq.fault;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

/**
Expand All @@ -15,6 +20,16 @@ public class RabbitMQFailStop implements RabbitMQFailureHandler {
private final String channel;
private final RabbitMQConnector connector;

@ApplicationScoped
@Identifier(Strategy.FAIL)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQFailStop(connector, config.getChannel());
}
}

/**
* Constructor.
*
Expand All @@ -28,7 +43,7 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) {
@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
// We mark the message as rejected and fail.
RabbitMQLogging.log.nackedFailMessage(channel);
log.nackedFailMessage(channel);
connector.reportIncomingFailure(channel, reason);
return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,36 @@
package io.smallrye.reactive.messaging.rabbitmq.fault;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;

import java.util.concurrent.CompletionStage;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

/**
* Implemented to provide message failure strategies.
*/
@Experimental("Experimental API")
public interface RabbitMQFailureHandler {

enum Strategy {
/**
* Mark the message as {@code rejected} and die.
*/
FAIL,
/**
* Mark the message as {@code accepted} and continue.
*/
ACCEPT,
/**
* Mark the message as {@code released} and continue.
*/
RELEASE,
/**
* Mark the message as {@code rejected} and continue.
*/
REJECT;
/**
* Identifiers of default failure strategies
*/
interface Strategy {
String FAIL = "fail";
String ACCEPT = "accept";
String RELEASE = "release";
String REJECT = "reject";
}

public static Strategy from(String s) {
if (s == null || s.equalsIgnoreCase("fail")) {
return FAIL;
}
if (s.equalsIgnoreCase("accept")) {
return ACCEPT;
}
if (s.equalsIgnoreCase("release")) {
return RELEASE;
}
if (s.equalsIgnoreCase("reject")) {
return REJECT;
}
throw ex.illegalArgumentUnknownFailureStrategy(s);
}
/**
* Factory interface for {@link RabbitMQFailureHandler}
*/
interface Factory {
RabbitMQFailureHandler create(
RabbitMQConnectorIncomingConfiguration config,
RabbitMQConnector connector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

public class RabbitMQReject implements RabbitMQFailureHandler {
private final String channel;

@ApplicationScoped
@Identifier(Strategy.REJECT)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQReject(config.getChannel());
}
}

/**
* Constructor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,50 @@ void testIncomingDeclarationsWithDLQ() throws Exception {
assertThat(dlqBinding1.getString("routing_key")).isEqualTo(dlxRoutingKey);
}

/**
* Verifies that Exchanges, Queues and Bindings are correctly declared as a result of
* incoming connector configuration that specifies Quorum/Delivery limit overrides.
*/
@Test
void testIncomingDeclarationsWithQuorum() throws Exception {

final String queueName = "qIncomingDeclareTestWithDeliveryLimit";
final boolean queueDurable = true;
final String queueType = "quorum";
final long queueDeliveryLimit = 10;

weld.addBeanClass(IncomingBean.class);

new MapBasedConfig()
.put("mp.messaging.incoming.data.queue.name", queueName)
.put("mp.messaging.incoming.data.queue.declare", true)
.put("mp.messaging.incoming.data.queue.durable", queueDurable)
.put("mp.messaging.incoming.data.queue.x-queue-type", queueType)
.put("mp.messaging.incoming.data.queue.x-delivery-limit", queueDeliveryLimit)
.put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.put("mp.messaging.incoming.data.host", host)
.put("mp.messaging.incoming.data.port", port)
.put("mp.messaging.incoming.data.tracing.enabled", false)
.put("rabbitmq-username", username)
.put("rabbitmq-password", password)
.put("rabbitmq-reconnect-attempts", 0)
.write();

container = weld.initialize();
await().until(() -> isRabbitMQConnectorAvailable(container));

// verify queue
final JsonObject queue = usage.getQueue(queueName);
assertThat(queue).isNotNull();
assertThat(queue.getString("name")).isEqualTo(queueName);
assertThat(queue.getBoolean("durable")).isEqualTo(queueDurable);

final JsonObject queueArguments = queue.getJsonObject("arguments");
assertThat(queueArguments).isNotNull();
assertThat(queueArguments.getString("x-queue-type")).isEqualTo(queueType);
assertThat(queueArguments.getLong("x-delivery-limit")).isEqualTo(queueDeliveryLimit);
}

/**
* Verifies that messages can be sent to RabbitMQ.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class WeldTestBase extends RabbitMQBrokerTestBase {
Expand Down Expand Up @@ -69,6 +72,9 @@ public void initWeld() {
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(RabbitMQAccept.Factory.class);
weld.addBeanClass(RabbitMQFailStop.Factory.class);
weld.addBeanClass(RabbitMQReject.Factory.class);
weld.disableDiscovery();
}

Expand Down

0 comments on commit b18fc1f

Please sign in to comment.