Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RabbitMQ failure handler #2087

Merged
merged 1 commit into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ controlled by the `failure-strategy` channel setting:
- `reject` - this strategy marks the RabbitMQ message as rejected
(default). The processing continues with the next message.

!!!warning "Experimental"
`RabbitMQFailureHandler` is experimental and APIs are subject to change in the future

In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface
{{ javadoc('io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler') }}, qualified with a `@Identifier`. Set the name of the bean
as the `failure-strategy` channel setting.


## Configuration Reference

{{ insert('../../../target/connectors/smallrye-rabbitmq-incoming.md') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -114,6 +112,7 @@
@ConnectorAttribute(name = "max-incoming-internal-queue-size", direction = INCOMING, description = "The maximum size of the incoming internal queue", type = "int", defaultValue = "500000")
@ConnectorAttribute(name = "connection-count", direction = INCOMING, description = "The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.", type = "int", defaultValue = "1")
@ConnectorAttribute(name = "queue.x-max-priority", direction = INCOMING, description = "Define priority level queue consumer", type = "int")
@ConnectorAttribute(name = "queue.x-delivery-limit", direction = INCOMING, description = "If queue.x-queue-type is quorum, when a message has been returned more times than the limit the message will be dropped or dead-lettered", type = "long")

// DLQs
@ConnectorAttribute(name = "auto-bind-dlq", direction = INCOMING, description = "Whether to automatically declare the DLQ and bind it to the binder DLX", type = "boolean", defaultValue = "false")
Expand All @@ -129,7 +128,7 @@
@ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string")

// Message consumer
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default)", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "failure-strategy", direction = INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = "reject")
@ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "keep-most-recent", direction = INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false")
Expand Down Expand Up @@ -190,6 +189,10 @@ private enum ChannelStatus {

private volatile RabbitMQOpenTelemetryInstrumenter instrumenter;

@Inject
@Any
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories;

RabbitMQConnector() {
// used for proxies
}
Expand Down Expand Up @@ -464,6 +467,13 @@ public void addClient(String channel, RabbitMQClient client) {
clients.put(channel, client);
}

public void establishQueue(String channel, RabbitMQConnectorIncomingConfiguration ic) {
final RabbitMQClient client = clients.get(channel);
client.getDelegate().addConnectionEstablishedCallback(promise -> {
establishQueue(client, ic).subscribe().with((ignored) -> promise.complete(), promise::fail);
});
}

/**
* Uses a {@link RabbitMQClient} to ensure the required exchange is created.
*
Expand Down Expand Up @@ -521,7 +531,9 @@ private Uni<String> establishQueue(
}
});
//x-max-priority
ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", ic.getQueueXMaxPriority()));
ic.getQueueXMaxPriority().ifPresent(maxPriority -> queueArgs.put("x-max-priority", maxPriority));
//x-delivery-limit
ic.getQueueXDeliveryLimit().ifPresent(deliveryLimit -> queueArgs.put("x-delivery-limit", deliveryLimit));

return establishExchange(client, ic)
.onItem().transform(v -> Boolean.TRUE.equals(ic.getQueueDeclare()) ? null : queueName)
Expand Down Expand Up @@ -596,23 +608,13 @@ private Map<String, Object> parseArguments(

private RabbitMQFailureHandler createFailureHandler(RabbitMQConnectorIncomingConfiguration config) {
String strategy = config.getFailureStrategy();
RabbitMQFailureHandler.Strategy actualStrategy = RabbitMQFailureHandler.Strategy.from(strategy);
switch (actualStrategy) {
case FAIL:
return new RabbitMQFailStop(this, config.getChannel());
case ACCEPT:
return new RabbitMQAccept(config.getChannel());
case REJECT:
return new RabbitMQReject(config.getChannel());
default:
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
Instance<RabbitMQFailureHandler.Factory> failureHandlerFactory = CDIUtils.getInstanceById(failureHandlerFactories,
strategy);
if (failureHandlerFactory.isResolvable()) {
return failureHandlerFactory.get().create(config, this);
} else {
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
}

}

private RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
return (Boolean.TRUE.equals(ic.getAutoAcknowledgement())) ? new RabbitMQAutoAck(ic.getChannel())
: new RabbitMQAck(ic.getChannel());
}

private String serverQueueName(String name) {
Expand All @@ -635,4 +637,9 @@ public void reportIncomingFailure(String channel, Throwable reason) {
client.stopAndForget();
}
}

public RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration ic) {
return (Boolean.TRUE.equals(ic.getAutoAcknowledgement())) ? new RabbitMQAutoAck(ic.getChannel())
: new RabbitMQAck(ic.getChannel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

/**
Expand All @@ -14,6 +19,16 @@
public class RabbitMQAccept implements RabbitMQFailureHandler {
private final String channel;

@ApplicationScoped
@Identifier(Strategy.ACCEPT)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQAccept(config.getChannel());
}
}

/**
* Constructor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.smallrye.reactive.messaging.rabbitmq.fault;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging.log;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

/**
Expand All @@ -15,6 +20,16 @@ public class RabbitMQFailStop implements RabbitMQFailureHandler {
private final String channel;
private final RabbitMQConnector connector;

@ApplicationScoped
@Identifier(Strategy.FAIL)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQFailStop(connector, config.getChannel());
}
}

/**
* Constructor.
*
Expand All @@ -28,7 +43,7 @@ public RabbitMQFailStop(RabbitMQConnector connector, String channel) {
@Override
public <V> CompletionStage<Void> handle(IncomingRabbitMQMessage<V> msg, Context context, Throwable reason) {
// We mark the message as rejected and fail.
RabbitMQLogging.log.nackedFailMessage(channel);
log.nackedFailMessage(channel);
connector.reportIncomingFailure(channel, reason);
return ConnectionHolder.runOnContextAndReportFailure(context, reason, msg, (m) -> m.rejectMessage(reason));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,36 @@
package io.smallrye.reactive.messaging.rabbitmq.fault;

import static io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions.ex;

import java.util.concurrent.CompletionStage;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

/**
* Implemented to provide message failure strategies.
*/
@Experimental("Experimental API")
public interface RabbitMQFailureHandler {

enum Strategy {
/**
* Mark the message as {@code rejected} and die.
*/
FAIL,
/**
* Mark the message as {@code accepted} and continue.
*/
ACCEPT,
/**
* Mark the message as {@code released} and continue.
*/
RELEASE,
/**
* Mark the message as {@code rejected} and continue.
*/
REJECT;
/**
* Identifiers of default failure strategies
*/
interface Strategy {
String FAIL = "fail";
String ACCEPT = "accept";
String RELEASE = "release";
String REJECT = "reject";
}

public static Strategy from(String s) {
if (s == null || s.equalsIgnoreCase("fail")) {
return FAIL;
}
if (s.equalsIgnoreCase("accept")) {
return ACCEPT;
}
if (s.equalsIgnoreCase("release")) {
return RELEASE;
}
if (s.equalsIgnoreCase("reject")) {
return REJECT;
}
throw ex.illegalArgumentUnknownFailureStrategy(s);
}
/**
* Factory interface for {@link RabbitMQFailureHandler}
*/
interface Factory {
RabbitMQFailureHandler create(
RabbitMQConnectorIncomingConfiguration config,
RabbitMQConnector connector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.ConnectionHolder;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Context;

public class RabbitMQReject implements RabbitMQFailureHandler {
private final String channel;

@ApplicationScoped
@Identifier(Strategy.REJECT)
public static class Factory implements RabbitMQFailureHandler.Factory {

@Override
public RabbitMQFailureHandler create(RabbitMQConnectorIncomingConfiguration config, RabbitMQConnector connector) {
return new RabbitMQReject(config.getChannel());
}
}

/**
* Constructor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,50 @@ void testIncomingDeclarationsWithDLQ() throws Exception {
assertThat(dlqBinding1.getString("routing_key")).isEqualTo(dlxRoutingKey);
}

/**
* Verifies that Exchanges, Queues and Bindings are correctly declared as a result of
* incoming connector configuration that specifies Quorum/Delivery limit overrides.
*/
@Test
void testIncomingDeclarationsWithQuorum() throws Exception {

final String queueName = "qIncomingDeclareTestWithDeliveryLimit";
final boolean queueDurable = true;
final String queueType = "quorum";
final long queueDeliveryLimit = 10;

weld.addBeanClass(IncomingBean.class);

new MapBasedConfig()
.put("mp.messaging.incoming.data.queue.name", queueName)
.put("mp.messaging.incoming.data.queue.declare", true)
.put("mp.messaging.incoming.data.queue.durable", queueDurable)
.put("mp.messaging.incoming.data.queue.x-queue-type", queueType)
.put("mp.messaging.incoming.data.queue.x-delivery-limit", queueDeliveryLimit)
.put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.put("mp.messaging.incoming.data.host", host)
.put("mp.messaging.incoming.data.port", port)
.put("mp.messaging.incoming.data.tracing.enabled", false)
.put("rabbitmq-username", username)
.put("rabbitmq-password", password)
.put("rabbitmq-reconnect-attempts", 0)
.write();

container = weld.initialize();
await().until(() -> isRabbitMQConnectorAvailable(container));

// verify queue
final JsonObject queue = usage.getQueue(queueName);
assertThat(queue).isNotNull();
assertThat(queue.getString("name")).isEqualTo(queueName);
assertThat(queue.getBoolean("durable")).isEqualTo(queueDurable);

final JsonObject queueArguments = queue.getJsonObject("arguments");
assertThat(queueArguments).isNotNull();
assertThat(queueArguments.getString("x-queue-type")).isEqualTo(queueType);
assertThat(queueArguments.getLong("x-delivery-limit")).isEqualTo(queueDeliveryLimit);
}

/**
* Verifies that messages can be sent to RabbitMQ.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQAccept;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailStop;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQReject;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class WeldTestBase extends RabbitMQBrokerTestBase {
Expand Down Expand Up @@ -69,6 +72,9 @@ public void initWeld() {
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(RabbitMQAccept.Factory.class);
weld.addBeanClass(RabbitMQFailStop.Factory.class);
weld.addBeanClass(RabbitMQReject.Factory.class);
weld.disableDiscovery();
}

Expand Down