Skip to content

Commit

Permalink
Message Observation API
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 30, 2023
1 parent ec40c8d commit 163cdf8
Show file tree
Hide file tree
Showing 17 changed files with 1,139 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface PublisherDecorator extends Prioritized {
* @return the extended multi
* @deprecated replaced with {@link #decorate(Multi, List, boolean)}
*/
@Deprecated(since = "4.10.1")
@Deprecated(since = "4.12.0")
default Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName,
boolean isConnector) {
return publisher;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.smallrye.reactive.messaging.observation;

import java.time.Duration;

import org.eclipse.microprofile.reactive.messaging.Message;

public class DefaultMessageObservation implements MessageObservation {

// metadata
private final String channelName;

// time
private final long creation;
protected volatile long completion;

// status
protected volatile boolean done;
protected volatile Throwable nackReason;

public DefaultMessageObservation(String channelName) {
this(channelName, System.nanoTime());
}

public DefaultMessageObservation(String channelName, long creationTime) {
this.channelName = channelName;
this.creation = creationTime;
}

@Override
public String getChannel() {
return channelName;
}

@Override
public long getCreationTime() {
return creation;
}

@Override
public long getCompletionTime() {
return completion;
}

@Override
public boolean isDone() {
return done || nackReason != null;
}

@Override
public Throwable getReason() {
return nackReason;
}

@Override
public Duration getCompletionDuration() {
if (isDone()) {
return Duration.ofNanos(completion - creation);
}
return null;
}

@Override
public void onMessageAck(Message<?> message) {
completion = System.nanoTime();
done = true;
}

@Override
public void onMessageNack(Message<?> message, Throwable reason) {
completion = System.nanoTime();
nackReason = reason;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.smallrye.reactive.messaging.observation;

import java.time.Duration;

import org.eclipse.microprofile.reactive.messaging.Message;

/**
* The message observation contract
*/
public interface MessageObservation {

/**
* @return the channel name of the message
*/
String getChannel();

/**
* @return the creation time of the message in system nanos
*/
long getCreationTime();

/**
* @return the completion time of the message in system nanos
*/
long getCompletionTime();

/**
*
* @return the duration between creation and the completion time, null if message processing is not completed
*/
Duration getCompletionDuration();

/**
*
* @return {@code true} if the message processing is completed with acknowledgement or negative acknowledgement
*/
boolean isDone();

/**
* @return the negative acknowledgement reason
*/
Throwable getReason();

/**
* Notify the observation of acknowledgement event
*
*/
void onMessageAck(Message<?> message);

/**
* Notify the observation of negative acknowledgement event
*
* @param reason the reason of the negative acknowledgement
*/
void onMessageNack(Message<?> message, Throwable reason);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.smallrye.reactive.messaging.observation;

import org.eclipse.microprofile.reactive.messaging.Message;

/**
* The observation collector is called with the new message and returns the message observation that will be used
* to observe messages from their creation until the ack or the nack event
* <p>
*
* <p>
* The implementation of this interface must be a CDI managed bean in order to be discovered
*
* @param <T> the type of the observation context
*/
public interface MessageObservationCollector<T extends ObservationContext> {

/**
* Initialize observation for the given channel
* If {@code null} is returned the observation for the given channel is disabled
*
* @param channel the channel of the message
* @param incoming whether the channel is incoming or outgoing
* @param emitter whether the channel is an emitter
* @return the observation context
*/
default T initObservation(String channel, boolean incoming, boolean emitter) {
// enabled by default
return (T) ObservationContext.DEFAULT;
}

/**
* Returns a new {@link MessageObservation} object on which to collect the message processing events.
* If {@link #initObservation(String, boolean, boolean)} is implemented,
* the {@link ObservationContext} object returned from that method will be passed to this method.
* If not it is called with {@link ObservationContext#DEFAULT} and should be ignored.
*
* @param channel the channel of the message
* @param message the message
* @param observationContext the observation context
* @return the message observation
*/
MessageObservation onNewMessage(String channel, Message<?> message, T observationContext);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.smallrye.reactive.messaging.observation;

/**
*
*/
public interface ObservationContext {

ObservationContext DEFAULT = observation -> {

};

void complete(MessageObservation observation);
}
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ nav:
- '@Outgoings' : concepts/outgoings.md
- 'Testing' : concepts/testing.md
- 'Logging' : concepts/logging.md
- 'Observability API' : concepts/observability.md
- 'Advanced Configuration' : concepts/advanced-config.md
- 'Message Context' : concepts/message-context.md
- 'Metadata Injection': concepts/incoming-metadata-injection.md
Expand Down
30 changes: 30 additions & 0 deletions documentation/src/main/docs/concepts/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Observability API

!!!important
Observability API is experimental and SmallRye only feature.

Smallrye Reactive Messaging proposes an observability API that allows to observe messages received and send through inbound and outbound channels.

For any observation to happen, you need to provide an implementation of the `MessageObservationCollector`, discovered as a CDI-managed bean.

At wiring time the discovered `MessageObservationCollector` implementation `initObservation` method is called once per channel to initialize the `ObservationContext`.
The default `initObservation` implementation returns a default `ObservationContext` object,
but the collector implementation can provide a custom per-channel `ObservationContext` object that'll hold information necessary for the observation.
The `ObservationContext#complete` method is called each time a message observation is completed – message being acked or nacked.
The collector implementation can decide at initialization time to disable the observation per channel by returning a `null` observation context.

For each new message, the collector is on `onNewMessage` method with the channel name, the `Message` and the `ObservationContext` object initialized beforehand.
This method can react to the creation of a new message but also is responsible for instantiating and returning a `MessageObservation`.
While custom implementations can augment the observability capability, SmallRye Reactive Messaging provides a default implementation `DefaultMessageObservation`.

So a simple observability collector can be implemented as such:

``` java
{{ insert('observability/SimpleMessageObservationCollector.java', ) }}
```

A collector with a custom `ObservationContext` can be implemented as such :

``` java
{{ insert('observability/ContextMessageObservationCollector.java', ) }}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package observability;

import java.time.Duration;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class ContextMessageObservationCollector
implements MessageObservationCollector<ContextMessageObservationCollector.MyContext> {

@Override
public MyContext initObservation(String channel, boolean incoming, boolean emitter) {
// Called on observation setup, per channel
// if returned null the observation for that channel is disabled
return new MyContext(channel, incoming, emitter);
}

@Override
public MessageObservation onNewMessage(String channel, Message<?> message, MyContext ctx) {
// Called after message has been created
return new DefaultMessageObservation(channel);
}

public static class MyContext implements ObservationContext {

private final String channel;
private final boolean incoming;
private final boolean emitter;

public MyContext(String channel, boolean incoming, boolean emitter) {
this.channel = channel;
this.incoming = incoming;
this.emitter = emitter;
}

@Override
public void complete(MessageObservation observation) {
// called after message processing has completed and observation is done
// register duration
Duration duration = observation.getCompletionDuration();
Throwable reason = observation.getReason();
if (reason != null) {
// message was nacked
} else {
// message was acked successfully
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package observability;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class SimpleMessageObservationCollector implements MessageObservationCollector<ObservationContext> {

@Override
public MessageObservation onNewMessage(String channel, Message<?> message, ObservationContext ctx) {
// Called after message has been created
return new DefaultMessageObservation(channel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
Expand Down Expand Up @@ -102,6 +104,8 @@ public void initWeld() {
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(ObservationDecorator.class);
weld.addBeanClass(OutgoingObservationDecorator.class);
weld.disableDiscovery();
}

Expand Down
Loading

0 comments on commit 163cdf8

Please sign in to comment.