diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/DefaultMessageObservation.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/DefaultMessageObservation.java
new file mode 100644
index 0000000000..13176fa432
--- /dev/null
+++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/DefaultMessageObservation.java
@@ -0,0 +1,34 @@
+package io.smallrye.reactive.messaging.observation;
+
+public class DefaultMessageObservation implements MessageObservation {
+
+ private final long creation;
+ private volatile long completion;
+ private volatile Throwable nackReason;
+
+ public DefaultMessageObservation() {
+ creation = System.nanoTime();
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creation;
+ }
+
+ @Override
+ public long getCompletionTime() {
+ return completion;
+ }
+
+ @Override
+ public Throwable getReason() {
+ return nackReason;
+ }
+
+ @Override
+ public void onAckOrNack(Throwable reason) {
+ completion = System.nanoTime();
+ nackReason = reason;
+ }
+
+}
diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservation.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservation.java
new file mode 100644
index 0000000000..97d34e2e6f
--- /dev/null
+++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservation.java
@@ -0,0 +1,29 @@
+package io.smallrye.reactive.messaging.observation;
+
+/**
+ * The message observation contract
+ */
+public interface MessageObservation {
+
+ /**
+ * @return the creation time of the message
+ */
+ long getCreationTime();
+
+ /**
+ * @return the completion time of the message
+ */
+ long getCompletionTime();
+
+ /**
+ * @return the negative acknowledgement reason
+ */
+ Throwable getReason();
+
+ /**
+ * Notify the observation of acknowledgement or negative acknowledgement event
+ *
+ * @param reason the reason of the negative acknowledgement
+ */
+ void onAckOrNack(Throwable reason);
+}
diff --git a/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservationReporter.java b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservationReporter.java
new file mode 100644
index 0000000000..42cb6e269f
--- /dev/null
+++ b/api/src/main/java/io/smallrye/reactive/messaging/observation/MessageObservationReporter.java
@@ -0,0 +1,22 @@
+package io.smallrye.reactive.messaging.observation;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+/**
+ * The reporter is called with the new message and returns the message observation that will be used
+ * to observe messages from their creation until the ack or the nack event
+ *
+ * The implementation of this interface must be a CDI managed bean in order to be discovered
+ */
+public interface MessageObservationReporter {
+
+ /**
+ * Returns a new {@link MessageObservation} object on which to report the message processing events.
+ *
+ * @param channel the channel of the message
+ * @param message the message
+ * @return the message tracking object
+ */
+ MessageObservation onNewMessage(String channel, Message> message);
+
+}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java
index 0886d8abef..3d9f0a9b43 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java
@@ -36,6 +36,7 @@
import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
+import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
@@ -102,6 +103,7 @@ public void initWeld() {
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
+ weld.addBeanClass(ObservationDecorator.class);
weld.disableDiscovery();
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java
new file mode 100644
index 0000000000..c538eff86e
--- /dev/null
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/metrics/ObservationTest.java
@@ -0,0 +1,133 @@
+package io.smallrye.reactive.messaging.kafka.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.junit.jupiter.api.Test;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
+import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservationReporter;
+
+public class ObservationTest extends KafkaCompanionTestBase {
+
+ @Test
+ void testWithIndividualMessages() {
+ addBeans(MyReactiveMessagingMessageObservationReporter.class);
+ addBeans(MyApp.class);
+
+ runApplication(kafkaConfig("mp.messaging.incoming.kafka", false)
+ .with("topic", topic)
+ .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
+ .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()));
+
+ companion.produceStrings()
+ .fromMulti(Multi.createFrom().range(0, 5).map(i -> new ProducerRecord<>(topic, null, Integer.toString(i))));
+
+ MyReactiveMessagingMessageObservationReporter reporter = get(MyReactiveMessagingMessageObservationReporter.class);
+ await().untilAsserted(() -> {
+ assertThat(reporter.getObservations()).hasSize(5);
+ assertThat(reporter.getObservations()).allSatisfy(obs -> {
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime());
+ assertThat(obs.getReason()).isNull();
+ });
+ });
+ }
+
+ @Test
+ void testWithBatchesMessages() {
+ addBeans(MyReactiveMessagingMessageObservationReporter.class);
+ addBeans(MyAppUsingBatches.class);
+
+ runApplication(kafkaConfig("mp.messaging.incoming.kafka", false)
+ .with("topic", topic)
+ .with("batch", true)
+ .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
+ .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()));
+
+ companion.produceStrings()
+ .fromMulti(Multi.createFrom().range(0, 1000).map(i -> new ProducerRecord<>(topic, null, Integer.toString(i))));
+
+ MyReactiveMessagingMessageObservationReporter reporter = get(MyReactiveMessagingMessageObservationReporter.class);
+ MyAppUsingBatches batches = get(MyAppUsingBatches.class);
+ await().untilAsserted(() -> {
+ assertThat(batches.count()).isEqualTo(1000);
+ assertThat(reporter.getObservations()).allSatisfy(obs -> {
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime());
+ assertThat(obs.getReason()).isNull();
+ });
+ });
+ }
+
+ @ApplicationScoped
+ public static class MyApp {
+ @Incoming("kafka")
+ public void consume(String ignored, MyReactiveMessagingMessageObservationReporter.MyMessageObservation metadata) {
+ assertThat(metadata).isNotNull();
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyAppUsingBatches {
+
+ AtomicInteger count = new AtomicInteger();
+
+ @Incoming("kafka")
+ public void consume(List s, MyReactiveMessagingMessageObservationReporter.MyMessageObservation metadata) {
+ assertThat(metadata).isNotNull();
+ count.addAndGet(s.size());
+ }
+
+ public int count() {
+ return count.get();
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyReactiveMessagingMessageObservationReporter implements MessageObservationReporter {
+
+ private final List observations = new CopyOnWriteArrayList<>();
+
+ @Override
+ public MessageObservation onNewMessage(String channel, Message> message) {
+ MyMessageObservation observation = new MyMessageObservation(channel, message);
+ observations.add(observation);
+ return observation;
+ }
+
+ public List getObservations() {
+ return observations;
+ }
+
+ public static class MyMessageObservation extends DefaultMessageObservation {
+
+ final String channel;
+ final Message> message;
+
+ public MyMessageObservation(String channel, Message> message) {
+ super();
+ this.channel = channel;
+ this.message = message;
+ }
+ }
+ }
+
+}
diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ObservationDecorator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ObservationDecorator.java
new file mode 100644
index 0000000000..fe1921cf7d
--- /dev/null
+++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ObservationDecorator.java
@@ -0,0 +1,82 @@
+package io.smallrye.reactive.messaging.providers.extension;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Optional;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Metadata;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.unchecked.Unchecked;
+import io.smallrye.reactive.messaging.ChannelRegistry;
+import io.smallrye.reactive.messaging.PublisherDecorator;
+import io.smallrye.reactive.messaging.SubscriberDecorator;
+import io.smallrye.reactive.messaging.observation.MessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservationReporter;
+import io.smallrye.reactive.messaging.providers.ProcessingException;
+
+@ApplicationScoped
+public class ObservationDecorator implements PublisherDecorator, SubscriberDecorator {
+
+ @Inject
+ @ConfigProperty(name = "smallrye.messaging.observation.enabled", defaultValue = "true")
+ boolean enabled;
+
+ @Inject
+ ChannelRegistry registry;
+
+ @Inject
+ Instance trackingReporter;
+
+ // Observe emitter subscribers
+ @Override
+ public Multi extends Message>> decorate(Multi extends Message>> multi, List channelName,
+ boolean isConnector) {
+ String channel = channelName.isEmpty() ? null : channelName.get(0);
+ if (trackingReporter.isResolvable() && enabled && (isConnector || registry.getEmitterNames().contains(channel))) {
+ multi = multi.map(message -> message.addMetadata(trackingReporter.get().onNewMessage(channel, message))
+ .thenApply(msg -> msg.withAckWithMetadata(metadata -> msg.ack(metadata)
+ .thenAccept(Unchecked.consumer(x -> {
+ getObservationMetadata(metadata).ifPresent(track -> track.onAckOrNack(null));
+ }))))
+ .thenApply(msg -> msg.withNackWithMetadata((reason, metadata) -> msg.nack(reason, metadata)
+ .thenAccept(Unchecked.consumer(x -> {
+ getObservationMetadata(metadata).ifPresent(track -> track.onAckOrNack(extractReason(reason)));
+ })))));
+ return multi;
+ }
+ return multi;
+ }
+
+ @Override
+ public int getPriority() {
+ return PublisherDecorator.super.getPriority();
+ }
+
+ static Optional getObservationMetadata(Metadata metadata) {
+ for (Object item : metadata) {
+ if (item instanceof MessageObservation) {
+ return Optional.of((MessageObservation) item);
+ }
+ }
+ return Optional.empty();
+ }
+
+ static Throwable extractReason(Throwable reason) {
+ if (reason instanceof ProcessingException) {
+ Throwable cause = reason.getCause();
+ if (cause instanceof InvocationTargetException) {
+ cause = ((InvocationTargetException) cause).getTargetException();
+ }
+ return cause;
+ }
+ return reason;
+ }
+
+}
diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java
index dcd1257547..79ff9c1dd2 100644
--- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java
+++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java
@@ -32,6 +32,7 @@
import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
+import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
@@ -121,7 +122,8 @@ public void setUp() {
LegacyEmitterFactoryImpl.class,
OutgoingInterceptorDecorator.class,
IncomingInterceptorDecorator.class,
-
+ // Observation Decorator
+ ObservationDecorator.class,
// SmallRye config
io.smallrye.config.inject.ConfigProducer.class);
diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterObservationTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterObservationTest.java
new file mode 100644
index 0000000000..f7fc641647
--- /dev/null
+++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterObservationTest.java
@@ -0,0 +1,140 @@
+package io.smallrye.reactive.messaging.inject;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.junit.jupiter.api.Test;
+
+import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails;
+import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservationReporter;
+
+public class EmitterObservationTest extends WeldTestBaseWithoutTails {
+
+ @Test
+ void testObservationPointsWhenEmittingPayloads() {
+ addBeanClass(MyMessageObservationReporter.class);
+ addBeanClass(MyComponentWithAnEmitterOfPayload.class);
+
+ initialize();
+
+ MyMessageObservationReporter observation = container.select(MyMessageObservationReporter.class).get();
+ MyComponentWithAnEmitterOfPayload component = get(MyComponentWithAnEmitterOfPayload.class);
+
+ component.emit(1);
+ component.emit(2);
+ component.emit(3);
+
+ await().until(() -> observation.getObservations().size() == 3);
+ assertThat(observation.getObservations()).allSatisfy(obs -> {
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCompletionTime()).isNotEqualTo(-1);
+ });
+
+ }
+
+ @Test
+ void testObservationPointsWhenEmittingMessages() {
+ addBeanClass(MyComponentWithAnEmitterOfMessage.class);
+ addBeanClass(MyMessageObservationReporter.class);
+
+ initialize();
+
+ MyMessageObservationReporter observation = container.select(MyMessageObservationReporter.class).get();
+ MyComponentWithAnEmitterOfMessage component = get(MyComponentWithAnEmitterOfMessage.class);
+
+ component.emit(Message.of(1));
+ component.emit(Message.of(2));
+ component.emit(Message.of(3));
+
+ await().until(() -> observation.getObservations().size() == 3);
+ assertThat(observation.getObservations()).allSatisfy(obs -> {
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCompletionTime()).isNotEqualTo(-1);
+ });
+
+ assertThat(observation.getObservations().get(2).getReason()).isInstanceOf(IllegalArgumentException.class);
+
+ }
+
+ @ApplicationScoped
+ public static class MyComponentWithAnEmitterOfPayload {
+
+ @Inject
+ @Channel("output")
+ Emitter emitter;
+
+ public void emit(int i) {
+ emitter.send(i);
+ }
+
+ @Incoming("output")
+ public void consume(int i) {
+ // do nothing.
+ }
+
+ }
+
+ @ApplicationScoped
+ public static class MyComponentWithAnEmitterOfMessage {
+
+ @Inject
+ @Channel("output")
+ Emitter emitter;
+
+ public void emit(Message i) {
+ emitter.send(i);
+ }
+
+ @Incoming("output")
+ public void consume(int i, MyMessageObservationReporter.MyMessageObservation mo) {
+ assertThat(mo).isNotNull();
+ if (i == 3) {
+ throw new IllegalArgumentException("boom");
+ }
+ }
+
+ }
+
+ @ApplicationScoped
+ public static class MyMessageObservationReporter implements MessageObservationReporter {
+
+ private final List observations = new CopyOnWriteArrayList<>();
+
+ @Override
+ public MessageObservation onNewMessage(String channel, Message> message) {
+ MyMessageObservation observation = new MyMessageObservation(channel, message);
+ observations.add(observation);
+ return observation;
+ }
+
+ public List getObservations() {
+ return observations;
+ }
+
+ public static class MyMessageObservation extends DefaultMessageObservation {
+
+ final String channel;
+ final Message> message;
+
+ public MyMessageObservation(String channel, Message> message) {
+ super();
+ this.channel = channel;
+ this.message = message;
+ }
+
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/ObservationTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/ObservationTest.java
new file mode 100644
index 0000000000..de87ff470b
--- /dev/null
+++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/ObservationTest.java
@@ -0,0 +1,151 @@
+package io.smallrye.reactive.messaging.providers.connectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails;
+import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservation;
+import io.smallrye.reactive.messaging.observation.MessageObservationReporter;
+
+public class ObservationTest extends WeldTestBaseWithoutTails {
+
+ @BeforeEach
+ void setupConfig() {
+ installConfig("src/test/resources/config/observation.properties");
+ }
+
+ @Test
+ void testMessageObservationPointsFromPayloadConsumer() {
+ addBeanClass(MyMessageObservationReporter.class);
+ addBeanClass(MyPayloadConsumer.class);
+
+ initialize();
+
+ MyMessageObservationReporter observation = container.select(MyMessageObservationReporter.class).get();
+ MyPayloadConsumer consumer = container.select(MyPayloadConsumer.class).get();
+
+ await().until(() -> observation.getObservations().size() == 3);
+ await().until(() -> consumer.received().size() == 3);
+
+ assertThat(observation.getObservations()).allSatisfy(obs -> {
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime());
+ });
+
+ assertThat(observation.getObservations().get(0).getReason()).isNull();
+ assertThat(observation.getObservations().get(1).getReason()).isInstanceOf(IOException.class);
+ assertThat(observation.getObservations().get(2).getReason()).isInstanceOf(MalformedURLException.class);
+ }
+
+ @Test
+ void testMessageObservationPointsFromMessageConsumer() {
+ addBeanClass(MyMessageObservationReporter.class);
+ addBeanClass(MyMessageConsumer.class);
+
+ initialize();
+
+ MyMessageObservationReporter observation = container.select(MyMessageObservationReporter.class).get();
+ MyMessageConsumer consumer = container.select(MyMessageConsumer.class).get();
+
+ await().until(() -> observation.getObservations().size() == 3);
+ await().until(() -> consumer.received().size() == 3);
+
+ assertThat(observation.getObservations()).allSatisfy(obs -> {
+ assertThat(obs.getCreationTime()).isNotEqualTo(-1);
+ assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime());
+ });
+
+ assertThat(observation.getObservations().get(0).getReason()).isNull();
+ assertThat(observation.getObservations().get(1).getReason()).isInstanceOf(IOException.class);
+ assertThat(observation.getObservations().get(2).getReason()).isInstanceOf(MalformedURLException.class);
+ }
+
+ @ApplicationScoped
+ public static class MyPayloadConsumer {
+ private final List received = new CopyOnWriteArrayList<>();
+
+ @Incoming("A")
+ void consume(int payload, MyMessageObservationReporter.MyMessageObservation tracking) throws IOException {
+ received.add(payload);
+ assertThat(tracking).isNotNull();
+ if (payload == 3) {
+ throw new IOException();
+ }
+ if (payload == 4) {
+ throw new MalformedURLException();
+ }
+ }
+
+ public List received() {
+ return received;
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyMessageConsumer {
+
+ private final List received = new CopyOnWriteArrayList<>();
+
+ @Incoming("A")
+ CompletionStage consume(Message msg) {
+ int payload = msg.getPayload();
+ received.add(payload);
+ assertThat(msg.getMetadata(MessageObservation.class)).isNotEmpty();
+ if (payload == 3) {
+ return msg.nack(new IOException());
+ }
+ if (payload == 4) {
+ return msg.nack(new MalformedURLException());
+ }
+ return msg.ack();
+ }
+
+ public List received() {
+ return received;
+ }
+ }
+
+ @ApplicationScoped
+ public static class MyMessageObservationReporter implements MessageObservationReporter {
+
+ private final List observations = new CopyOnWriteArrayList<>();
+
+ @Override
+ public MessageObservation onNewMessage(String channel, Message> message) {
+ MyMessageObservation observation = new MyMessageObservation(channel, message);
+ observations.add(observation);
+ return observation;
+ }
+
+ public List getObservations() {
+ return observations;
+ }
+
+ public static class MyMessageObservation extends DefaultMessageObservation {
+
+ final String channel;
+ final Message> message;
+
+ public MyMessageObservation(String channel, Message> message) {
+ super();
+ this.channel = channel;
+ this.message = message;
+ }
+
+ }
+ }
+}
diff --git a/smallrye-reactive-messaging-provider/src/test/resources/config/observation.properties b/smallrye-reactive-messaging-provider/src/test/resources/config/observation.properties
new file mode 100644
index 0000000000..5e6090b4dd
--- /dev/null
+++ b/smallrye-reactive-messaging-provider/src/test/resources/config/observation.properties
@@ -0,0 +1,3 @@
+# You should not be able to use the same channel name in an outgoing and incoming configuration
+mp.messaging.incoming.A.connector=dummy
+