Skip to content

Commit

Permalink
Message Observation API
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Oct 13, 2023
1 parent 9ef0d9a commit 3c634a3
Show file tree
Hide file tree
Showing 10 changed files with 599 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.smallrye.reactive.messaging.observation;

public class DefaultMessageObservation implements MessageObservation {

private final long creation;
private volatile long completion;
private volatile Throwable nackReason;

public DefaultMessageObservation() {
creation = System.nanoTime();
}

@Override
public long getCreationTime() {
return creation;
}

@Override
public long getCompletionTime() {
return completion;
}

@Override
public Throwable getReason() {
return nackReason;
}

@Override
public void onAckOrNack(Throwable reason) {
completion = System.nanoTime();
nackReason = reason;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.smallrye.reactive.messaging.observation;

/**
* The message observation contract
*/
public interface MessageObservation {

/**
* @return the creation time of the message
*/
long getCreationTime();

/**
* @return the completion time of the message
*/
long getCompletionTime();

/**
* @return the negative acknowledgement reason
*/
Throwable getReason();

/**
* Notify the observation of acknowledgement or negative acknowledgement event
*
* @param reason the reason of the negative acknowledgement
*/
void onAckOrNack(Throwable reason);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.smallrye.reactive.messaging.observation;

import org.eclipse.microprofile.reactive.messaging.Message;

/**
* The reporter is called with the new message and returns the message observation that will be used
* to observe messages from their creation until the ack or the nack event
* <p>
* The implementation of this interface must be a CDI managed bean in order to be discovered
*/
public interface MessageObservationReporter {

/**
* Returns a new {@link MessageObservation} object on which to report the message processing events.
*
* @param channel the channel of the message
* @param message the message
* @return the message tracking object
*/
MessageObservation onNewMessage(String channel, Message<?> message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
Expand Down Expand Up @@ -102,6 +103,7 @@ public void initWeld() {
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(ObservationDecorator.class);
weld.disableDiscovery();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package io.smallrye.reactive.messaging.kafka.metrics;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationReporter;

public class ObservationTest extends KafkaCompanionTestBase {

@Test
void testWithIndividualMessages() {
addBeans(MyReactiveMessagingMessageObservationReporter.class);
addBeans(MyApp.class);

runApplication(kafkaConfig("mp.messaging.incoming.kafka", false)
.with("topic", topic)
.with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()));

companion.produceStrings()
.fromMulti(Multi.createFrom().range(0, 5).map(i -> new ProducerRecord<>(topic, null, Integer.toString(i))));

MyReactiveMessagingMessageObservationReporter reporter = get(MyReactiveMessagingMessageObservationReporter.class);
await().untilAsserted(() -> {
assertThat(reporter.getObservations()).hasSize(5);
assertThat(reporter.getObservations()).allSatisfy(obs -> {
assertThat(obs.getCreationTime()).isNotEqualTo(-1);
assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime());
assertThat(obs.getReason()).isNull();
});
});
}

@Test
void testWithBatchesMessages() {
addBeans(MyReactiveMessagingMessageObservationReporter.class);
addBeans(MyAppUsingBatches.class);

runApplication(kafkaConfig("mp.messaging.incoming.kafka", false)
.with("topic", topic)
.with("batch", true)
.with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()));

companion.produceStrings()
.fromMulti(Multi.createFrom().range(0, 1000).map(i -> new ProducerRecord<>(topic, null, Integer.toString(i))));

MyReactiveMessagingMessageObservationReporter reporter = get(MyReactiveMessagingMessageObservationReporter.class);
MyAppUsingBatches batches = get(MyAppUsingBatches.class);
await().untilAsserted(() -> {
assertThat(batches.count()).isEqualTo(1000);
assertThat(reporter.getObservations()).allSatisfy(obs -> {
assertThat(obs.getCreationTime()).isNotEqualTo(-1);
assertThat(obs.getCreationTime()).isNotEqualTo(-1);
assertThat(obs.getCompletionTime()).isNotEqualTo(-1).isGreaterThan(obs.getCreationTime());
assertThat(obs.getReason()).isNull();
});
});
}

@ApplicationScoped
public static class MyApp {
@Incoming("kafka")
public void consume(String ignored, MyReactiveMessagingMessageObservationReporter.MyMessageObservation metadata) {
assertThat(metadata).isNotNull();
}
}

@ApplicationScoped
public static class MyAppUsingBatches {

AtomicInteger count = new AtomicInteger();

@Incoming("kafka")
public void consume(List<String> s, MyReactiveMessagingMessageObservationReporter.MyMessageObservation metadata) {
assertThat(metadata).isNotNull();
count.addAndGet(s.size());
}

public int count() {
return count.get();
}
}

@ApplicationScoped
public static class MyReactiveMessagingMessageObservationReporter implements MessageObservationReporter {

private final List<MyMessageObservation> observations = new CopyOnWriteArrayList<>();

@Override
public MessageObservation onNewMessage(String channel, Message<?> message) {
MyMessageObservation observation = new MyMessageObservation(channel, message);
observations.add(observation);
return observation;
}

public List<MyMessageObservation> getObservations() {
return observations;
}

public static class MyMessageObservation extends DefaultMessageObservation {

final String channel;
final Message<?> message;

public MyMessageObservation(String channel, Message<?> message) {
super();
this.channel = channel;
this.message = message;
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.smallrye.reactive.messaging.providers.extension;

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationReporter;
import io.smallrye.reactive.messaging.providers.ProcessingException;

@ApplicationScoped
public class ObservationDecorator implements PublisherDecorator, SubscriberDecorator {

@Inject
@ConfigProperty(name = "smallrye.messaging.observation.enabled", defaultValue = "true")
boolean enabled;

@Inject
ChannelRegistry registry;

@Inject
Instance<MessageObservationReporter> trackingReporter;

// Observe emitter subscribers
@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> multi, List<String> channelName,
boolean isConnector) {
String channel = channelName.isEmpty() ? null : channelName.get(0);
if (trackingReporter.isResolvable() && enabled && (isConnector || registry.getEmitterNames().contains(channel))) {
multi = multi.map(message -> message.addMetadata(trackingReporter.get().onNewMessage(channel, message))
.thenApply(msg -> msg.withAckWithMetadata(metadata -> msg.ack(metadata)
.thenAccept(Unchecked.consumer(x -> {
getObservationMetadata(metadata).ifPresent(track -> track.onAckOrNack(null));
}))))
.thenApply(msg -> msg.withNackWithMetadata((reason, metadata) -> msg.nack(reason, metadata)
.thenAccept(Unchecked.consumer(x -> {
getObservationMetadata(metadata).ifPresent(track -> track.onAckOrNack(extractReason(reason)));
})))));
return multi;
}
return multi;
}

@Override
public int getPriority() {
return PublisherDecorator.super.getPriority();
}

static Optional<MessageObservation> getObservationMetadata(Metadata metadata) {
for (Object item : metadata) {
if (item instanceof MessageObservation) {
return Optional.of((MessageObservation) item);
}
}
return Optional.empty();
}

static Throwable extractReason(Throwable reason) {
if (reason instanceof ProcessingException) {
Throwable cause = reason.getCause();
if (cause instanceof InvocationTargetException) {
cause = ((InvocationTargetException) cause).getTargetException();
}
return cause;
}
return reason;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
Expand Down Expand Up @@ -121,7 +122,8 @@ public void setUp() {
LegacyEmitterFactoryImpl.class,
OutgoingInterceptorDecorator.class,
IncomingInterceptorDecorator.class,

// Observation Decorator
ObservationDecorator.class,
// SmallRye config
io.smallrye.config.inject.ConfigProducer.class);

Expand Down
Loading

0 comments on commit 3c634a3

Please sign in to comment.