Skip to content

Commit

Permalink
Retreive OpenTelemetry instance through CDI injection instead of rely…
Browse files Browse the repository at this point in the history
…ing on GlobalOpenTelemetry.get

Still falls back to "GlobalOpenTelemetry.get"

Fixes #2539

(cherry picked from commit 30493b4)
  • Loading branch information
ozangunalp committed Jul 25, 2024
1 parent 50b8455 commit 2cb3951
Show file tree
Hide file tree
Showing 48 changed files with 438 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
Expand Down Expand Up @@ -121,6 +122,9 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt
@Any
private Instance<SSLContext> clientSslContexts;

@Inject
private Instance<OpenTelemetry> openTelemetryInstance;

private final List<AmqpClient> clients = new CopyOnWriteArrayList<>();

/**
Expand Down Expand Up @@ -230,7 +234,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
AmqpFailureHandler onNack = createFailureHandler(ic);

if (tracing && amqpInstrumenter == null) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector();
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance);
}

Multi<? extends Message<?>> multi = holder.getOrEstablishConnection()
Expand Down Expand Up @@ -318,7 +322,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
this,
holder,
oc,
getSender);
getSender,
openTelemetryInstance);
processors.put(oc.getChannel(), processor);

return MultiUtils.via(processor, m -> m.onFailure().invoke(t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
Expand Down Expand Up @@ -57,7 +60,8 @@ public class AmqpCreditBasedSender implements Processor<Message<?>, Message<?>>,
private volatile boolean creditRetrievalInProgress = false;

public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender) {
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender,
Instance<OpenTelemetry> openTelemetryInstance) {
this.connector = connector;
this.holder = holder;
this.retrieveSender = retrieveSender;
Expand All @@ -75,7 +79,7 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
this.retryInterval = configuration.getReconnectInterval();

if (tracingEnabled) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender();
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender(openTelemetryInstance);
} else {
amqpInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.smallrye.reactive.messaging.amqp.tracing;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -19,20 +21,20 @@ private AmqpOpenTelemetryInstrumenter(Instrumenter<AmqpMessage<?>, Void> instrum
this.instrumenter = instrumenter;
}

public static AmqpOpenTelemetryInstrumenter createForConnector() {
return create(false);
public static AmqpOpenTelemetryInstrumenter createForConnector(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
}

public static AmqpOpenTelemetryInstrumenter createForSender() {
return create(true);
public static AmqpOpenTelemetryInstrumenter createForSender(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
}

private static AmqpOpenTelemetryInstrumenter create(boolean sender) {
private static AmqpOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean sender) {
MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE;
AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor();
MessagingAttributesGetter<AmqpMessage<?>, Void> messagingAttributesGetter = amqpAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(openTelemetry,
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -161,6 +162,9 @@ public SpanBuilder spanBuilder(final String spanName) {
@Any
Instance<KafkaFailureHandler.Factory> failureHandlerFactories;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Inject
KafkaCDIEvents kafkaCDIEvents;

Expand Down Expand Up @@ -209,7 +213,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
});

if (partitions == 1) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic,
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, -1);
Expand All @@ -231,7 +235,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
// create an instance of source per partitions.
List<Publisher<? extends Message<?>>> streams = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic,
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, i);
Expand Down Expand Up @@ -268,7 +272,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
if (oc.getHealthReadinessTimeout().isPresent()) {
log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
}
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers, producerInterceptors);
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
sinks.add(sink);
return sink.getSink();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
Expand Down Expand Up @@ -78,7 +79,9 @@ public class KafkaSink {

private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter;

public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents,
public KafkaSink(KafkaConnectorOutgoingConfiguration config,
KafkaCDIEvents kafkaCDIEvents,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<SerializationFailureHandler<?>> serializationFailureHandlers,
Instance<ProducerInterceptor<?, ?>> producerInterceptors) {
this.isTracingEnabled = config.getTracingEnabled();
Expand Down Expand Up @@ -134,7 +137,7 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk
}));

if (isTracingEnabled) {
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink();
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
kafkaInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Header;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class KafkaSource<K, V> {
public KafkaSource(Vertx vertx,
String consumerGroup,
KafkaConnectorIncomingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<KafkaCommitHandler.Factory> commitHandlerFactories,
Instance<KafkaFailureHandler.Factory> failureHandlerFactories,
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners,
Expand Down Expand Up @@ -227,7 +229,7 @@ public KafkaSource(Vertx vertx,
}

if (isTracingEnabled) {
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource();
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
kafkaInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Channel;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.EmitterFactory;
Expand Down Expand Up @@ -75,10 +76,13 @@ public class KafkaRequestReplyFactory implements EmitterFactory<KafkaRequestRepl
@Any
Instance<Map<String, Object>> configurations;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Override
public KafkaRequestReplyImpl<Object, Object> createEmitter(EmitterConfiguration configuration, long defaultBufferSize) {
return new KafkaRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), configurations, holder.vertx(),
kafkaCDIEvents, commitStrategyFactories, failureStrategyFactories, failureHandlers,
kafkaCDIEvents, openTelemetryInstance, commitStrategyFactories, failureStrategyFactories, failureHandlers,
correlationIdHandlers, replyFailureHandlers, rebalanceListeners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -80,6 +81,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
Instance<Map<String, Object>> configurations,
Vertx vertx,
KafkaCDIEvents kafkaCDIEvents,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<KafkaCommitHandler.Factory> commitHandlerFactory,
Instance<KafkaFailureHandler.Factory> failureHandlerFactories,
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers,
Expand Down Expand Up @@ -116,7 +118,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
String consumerGroup = consumerConfig.getGroupId().orElseGet(() -> UUID.randomUUID().toString());
this.waitForPartitions = getWaitForPartitions(consumerConfig);
this.gracefulShutdown = consumerConfig.getGracefulShutdown();
this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig,
this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig, openTelemetryInstance,
commitHandlerFactory, failureHandlerFactories, rebalanceListeners, kafkaCDIEvents,
deserializationFailureHandlers, -1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.smallrye.reactive.messaging.kafka.tracing;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -23,22 +25,22 @@ private KafkaOpenTelemetryInstrumenter(Instrumenter<KafkaTrace, Void> instrument
this.instrumenter = instrumenter;
}

public static KafkaOpenTelemetryInstrumenter createForSource() {
return create(true);
public static KafkaOpenTelemetryInstrumenter createForSource(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
}

public static KafkaOpenTelemetryInstrumenter createForSink() {
return create(false);
public static KafkaOpenTelemetryInstrumenter createForSink(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
}

private static KafkaOpenTelemetryInstrumenter create(boolean source) {
private static KafkaOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) {

MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;

KafkaAttributesExtractor kafkaAttributesExtractor = new KafkaAttributesExtractor();
MessagingAttributesGetter<KafkaTrace, Void> messagingAttributesGetter = kafkaAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<KafkaTrace, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
InstrumenterBuilder<KafkaTrace, Void> builder = Instrumenter.builder(openTelemetry,
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testSinkUsingInteger() {
.with("channel-name", "testSinkUsingInteger");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand All @@ -84,6 +85,7 @@ public void testSinkUsingIntegerAndChannelName() {
.with("partition", 0);
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand All @@ -106,6 +108,7 @@ public void testSinkUsingString() {
.with("channel-name", "testSinkUsingString");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand Down Expand Up @@ -237,7 +240,8 @@ public void testInvalidPayloadType() {
.with("retries", 0L); // disable retry.
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents();
sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());
sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());

await().until(() -> {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
Expand Down Expand Up @@ -286,8 +290,8 @@ public void testInvalidTypeWithDefaultInflightMessages() {
.with("retries", 0L)
.with("channel-name", "testInvalidTypeWithDefaultInflightMessages");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents,
UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());

Flow.Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 5)
Expand Down
Loading

0 comments on commit 2cb3951

Please sign in to comment.