From d3d238ecfd5fb73a160424bd3af9e09f11cbdc47 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 5 Jul 2023 20:53:53 +0200 Subject: [PATCH] Pulsar continue failure strategy Pulsar KeyValueExtractors Fix for Pulsar OutgoingMessage.of factory method which did not set the key correctly --- .../docs/pulsar/receiving-pulsar-messages.md | 3 +- .../configuration/PulsarSchemaProvider.java | 4 - .../messaging/pulsar/OutgoingMessage.java | 2 +- .../KeyValueFromPulsarMessageExtractor.java | 43 +++++ ...lueFromPulsarMessageKeyValueExtractor.java | 42 +++++ .../pulsar/fault/PulsarContinue.java | 48 ++++++ .../pulsar/fault/PulsarFailStop.java | 9 +- .../messaging/pulsar/fault/PulsarIgnore.java | 3 + .../messaging/pulsar/fault/PulsarNack.java | 3 + .../pulsar/fault/PulsarReconsumeLater.java | 3 + .../messaging/pulsar/MessageMetadataTest.java | 17 ++ .../messaging/pulsar/PulsarProducerTest.java | 55 +++++- .../pulsar/base/PulsarClientBaseTest.java | 8 + .../messaging/pulsar/base/WeldTestBase.java | 4 + ...eyValueFromPulsarMessageExtractorTest.java | 151 +++++++++++++++++ ...MessageFromPulsarMessageExtractorTest.java | 156 ++++++++++++++++++ .../pulsar/fault/PulsarNackTest.java | 24 +++ 17 files changed, 562 insertions(+), 13 deletions(-) create mode 100644 smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractor.java create mode 100644 smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageKeyValueExtractor.java create mode 100644 smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarContinue.java create mode 100644 smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractorTest.java create mode 100644 smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueMessageFromPulsarMessageExtractorTest.java diff --git a/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md b/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md index fc29aca759..3c45cefb30 100644 --- a/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md +++ b/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md @@ -85,6 +85,7 @@ The Pulsar connector supports 4 strategies: The negative acknowledgment can be further configured using `negativeAckRedeliveryDelayMicros` and `negativeAck.redeliveryBackoff` properties. - `fail` fail the application, no more messages will be processed. - `ignore` the failure is logged, but the acknowledgement strategy will be applied and the processing will continue. +- `continue` the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with [acknowledgement timeout](#acknowledgement-timeout) configuration. - `reconsume-later` sends the message to the [retry letter topic](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic) using the `reconsumeLater` API to be reconsumed with a delay. The delay can be configured using the `reconsumeLater.delay` property and defaults to 3 seconds. Custom delay or properties per message can be configured by adding an instance of {{ javadoc('io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata') }} to the failure metadata. @@ -113,7 +114,7 @@ The `ackTimeout.redeliveryBackoff` value accepts comma separated values of min d ```properties mp.messaging.incoming.data.connector=smallrye-pulsar -mp.messaging.incoming.data.failure-strategy=ignore +mp.messaging.incoming.data.failure-strategy=continue mp.messaging.incoming.data.ackTimeoutMillis=10000 mp.messaging.incoming.data.ackTimeout.redeliveryBackoff=1000,60000,2 ``` diff --git a/documentation/src/main/java/pulsar/configuration/PulsarSchemaProvider.java b/documentation/src/main/java/pulsar/configuration/PulsarSchemaProvider.java index d9b08f3032..3fe6141d0a 100644 --- a/documentation/src/main/java/pulsar/configuration/PulsarSchemaProvider.java +++ b/documentation/src/main/java/pulsar/configuration/PulsarSchemaProvider.java @@ -1,7 +1,5 @@ package pulsar.configuration; -import java.util.regex.Pattern; - import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; @@ -14,8 +12,6 @@ @ApplicationScoped public class PulsarSchemaProvider { - Pattern linkPattern = Pattern.compile("\\{@link\\s(.*)}"); - @Produces @Identifier("user-schema") Schema userSchema = Schema.AVRO(User.class); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/OutgoingMessage.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/OutgoingMessage.java index 3d076daef0..4a1930732a 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/OutgoingMessage.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/OutgoingMessage.java @@ -49,7 +49,7 @@ public OutgoingMessage(T value) { public OutgoingMessage(String key, T value) { this(value); - this.key = key; + withKey(key); } public boolean hasKey() { diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractor.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractor.java new file mode 100644 index 0000000000..e47b7b0ee7 --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractor.java @@ -0,0 +1,43 @@ +package io.smallrye.reactive.messaging.pulsar.converters; + +import java.lang.reflect.Type; +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.reactive.messaging.keyed.KeyValueExtractor; +import io.smallrye.reactive.messaging.providers.helpers.TypeUtils; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata; + +/** + * Key/Value extractor extracting the key from a Pulsar message and passing the message's payload as value. + * + * This extractor has the default priority ({@link KeyValueExtractor#DEFAULT_PRIORITY}). + */ +@ApplicationScoped +public class KeyValueFromPulsarMessageExtractor implements KeyValueExtractor { + @Override + public boolean canExtract(Message message, Type keyType, Type valueType) { + Optional metadata = message.getMetadata(PulsarIncomingMessageMetadata.class); + // The type checks can be expensive, so, we do it only once, and rely on the fact the pulsar schema are constant. + return metadata.filter( + incomingMetadata -> (incomingMetadata.hasKey() + && TypeUtils.isAssignable(keyType, incomingMetadata.getKey().getClass()) + && TypeUtils.isAssignable(valueType, message.getPayload().getClass()))) + .isPresent(); + } + + @Override + public Object extractKey(Message message, Type keyType) { + return message.getMetadata(PulsarIncomingMessageMetadata.class) + . map(PulsarIncomingMessageMetadata::getKey) + .orElseThrow(); + } + + @Override + public Object extractValue(Message message, Type valueType) { + return message.getPayload(); + } +} diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageKeyValueExtractor.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageKeyValueExtractor.java new file mode 100644 index 0000000000..5a1bdb700e --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageKeyValueExtractor.java @@ -0,0 +1,42 @@ +package io.smallrye.reactive.messaging.pulsar.converters; + +import java.lang.reflect.Type; +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.pulsar.common.schema.KeyValue; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.reactive.messaging.keyed.KeyValueExtractor; +import io.smallrye.reactive.messaging.providers.helpers.TypeUtils; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata; + +/** + * Key/Value extractor extracting the key and value from a Pulsar message with KeyValue schema. + * + * This extractor has the default priority ({@link KeyValueExtractor#DEFAULT_PRIORITY}). + */ +@ApplicationScoped +public class KeyValueFromPulsarMessageKeyValueExtractor implements KeyValueExtractor { + @Override + public boolean canExtract(Message message, Type keyType, Type valueType) { + Optional metadata = message.getMetadata(PulsarIncomingMessageMetadata.class); + // The type checks can be expensive, so, we do it only once, and rely on the fact the pulsar schema are constant. + return metadata.filter( + incomingMetadata -> (message.getPayload() instanceof KeyValue + && TypeUtils.isAssignable(keyType, ((KeyValue) message.getPayload()).getKey().getClass()) + && TypeUtils.isAssignable(valueType, ((KeyValue) message.getPayload()).getValue().getClass()))) + .isPresent(); + } + + @Override + public Object extractKey(Message message, Type keyType) { + return ((KeyValue) message.getPayload()).getKey(); + } + + @Override + public Object extractValue(Message message, Type valueType) { + return ((KeyValue) message.getPayload()).getValue(); + } +} diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarContinue.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarContinue.java new file mode 100644 index 0000000000..809a385d6f --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarContinue.java @@ -0,0 +1,48 @@ +package io.smallrye.reactive.messaging.pulsar.fault; + +import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log; + +import java.util.function.BiConsumer; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.pulsar.client.api.Consumer; +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration; +import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage; + +/** + * Failure strategy `continue` which calls logs message failure but continues the stream without nacking or acking the message + */ +public class PulsarContinue implements PulsarFailureHandler { + public static final String STRATEGY_NAME = "continue"; + + @ApplicationScoped + @Identifier(STRATEGY_NAME) + public static class Factory implements PulsarFailureHandler.Factory { + + @Override + public PulsarFailureHandler create(Consumer consumer, PulsarConnectorIncomingConfiguration config, + BiConsumer reportFailure) { + return new PulsarContinue(config.getChannel()); + } + } + + private final String channel; + + public PulsarContinue(String channel) { + this.channel = channel; + } + + @Override + public Uni handle(PulsarIncomingMessage message, Throwable reason, Metadata metadata) { + log.messageNackedIgnored(channel, reason.getMessage()); + log.messageNackedFullIgnored(reason); + return Uni.createFrom().voidItem() + .emitOn(message::runOnMessageContext); + } +} diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarFailStop.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarFailStop.java index c69990132b..29a2a565ff 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarFailStop.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarFailStop.java @@ -14,6 +14,9 @@ import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage; import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging; +/** + * Failure strategy `fail` which stops the stream by emitting a failure with the message processing failure + */ public class PulsarFailStop implements PulsarFailureHandler { public static final String STRATEGY_NAME = "fail"; @@ -24,16 +27,14 @@ public static class Factory implements PulsarFailureHandler.Factory { @Override public PulsarFailStop create(Consumer consumer, PulsarConnectorIncomingConfiguration config, BiConsumer reportFailure) { - return new PulsarFailStop(consumer, config.getChannel(), reportFailure); + return new PulsarFailStop(config.getChannel(), reportFailure); } } - private final Consumer consumer; private final String channel; private final BiConsumer reportFailure; - public PulsarFailStop(Consumer consumer, String channel, BiConsumer reportFailure) { - this.consumer = consumer; + public PulsarFailStop(String channel, BiConsumer reportFailure) { this.channel = channel; this.reportFailure = reportFailure; } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java index f850fe3041..cff81dfa43 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarIgnore.java @@ -15,6 +15,9 @@ import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage; +/** + * Failure strategy `ignore` which continues the stream in case of failure and acks the message instead + */ public class PulsarIgnore implements PulsarFailureHandler { public static final String STRATEGY_NAME = "ignore"; diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java index a23ca67bc3..d90b8f0771 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNack.java @@ -13,6 +13,9 @@ import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage; +/** + * Failure strategy `nack` which calls negative acknowledgement for the message and continues the stream + */ public class PulsarNack implements PulsarFailureHandler { public static final String STRATEGY_NAME = "nack"; diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java index 7708840d39..998cdec7e4 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarReconsumeLater.java @@ -19,6 +19,9 @@ import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage; import io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata; +/** + * Failure strategy `reconsume-later` which calls reconsume later for the message + */ public class PulsarReconsumeLater implements PulsarFailureHandler { public static final String STRATEGY_NAME = "reconsume-later"; diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/MessageMetadataTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/MessageMetadataTest.java index 3ea38c0292..6b377f2695 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/MessageMetadataTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/MessageMetadataTest.java @@ -7,12 +7,29 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.apache.pulsar.common.schema.KeyValue; import org.junit.jupiter.api.Test; import com.github.dockerjava.zerodep.shaded.org.apache.commons.codec.binary.Base64; public class MessageMetadataTest { + @Test + void testOutgoingMessageWithKeyValueAndKeyValueSchema() { + TypedMessageBuilderImpl> messageBuilder = new TypedMessageBuilderImpl<>(null, + Schema.KeyValue(Schema.INT32, Schema.STRING)); + + TypedMessageBuilderImpl> msg = (TypedMessageBuilderImpl>) messageBuilder + .value(new KeyValue<>(1, "value")); + + assertThat(msg.getMetadataBuilder().hasNullValue()).isFalse(); + assertThat(msg.getMetadataBuilder().hasNullPartitionKey()).isFalse(); + assertThat(msg.getMetadataBuilder().hasPartitionKey()).isFalse(); + assertThat(msg.hasKey()).isFalse(); + + assertThatThrownBy(() -> msg.getKey()).isInstanceOf(IllegalStateException.class); + } + @Test void testOutgoingMessageWithNullKeyValueAndKeyValueSchema() { TypedMessageBuilderImpl messageBuilder = new TypedMessageBuilderImpl<>(null, diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarProducerTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarProducerTest.java index ca1b79e562..c512e825eb 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarProducerTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarProducerTest.java @@ -1,5 +1,6 @@ package io.smallrye.reactive.messaging.pulsar; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import java.util.List; @@ -14,7 +15,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.assertj.core.api.Assertions; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; @@ -54,10 +54,45 @@ void testPayloadProducer() throws PulsarClientException { // wait until we have gathered all the expected messages await().until(() -> received.size() >= NUMBER_OF_MESSAGES); - Assertions.assertThat(received.stream().map(p -> p.age)) + assertThat(received.stream().map(p -> p.age)) .containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList())); } + @Test + void testOutgoingMessageProducer() throws PulsarClientException { + // Run app + OutgoingMessageProducingApp producingApp = runApplication(config(), OutgoingMessageProducingApp.class); + + // create consumer + Consumer consumer = client.newConsumer(Schema.JSON(Person.class)) + .topic(topic) + .subscriptionName("test-" + topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .consumerName("test-consumer") + .subscribe(); + + // gather consumes messages + List> received = new CopyOnWriteArrayList<>(); + receive(consumer, NUMBER_OF_MESSAGES, message -> { + try { + received.add(message); + consumer.acknowledge(message); + } catch (Exception e) { + consumer.negativeAcknowledge(message); + } + }); + + // wait until we have gathered all the expected messages + await().until(() -> received.size() >= NUMBER_OF_MESSAGES); + assertThat(received).extracting(m -> m.getValue().age) + .containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList())); + assertThat(received) + .allMatch(org.apache.pulsar.client.api.Message::hasKey) + .extracting(m -> Integer.parseInt(m.getKey())) + .containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList())); + + } + @Test void testMessageProducer() throws PulsarClientException { // Run app @@ -84,7 +119,7 @@ void testMessageProducer() throws PulsarClientException { // wait until we have gathered all the expected messages await().until(() -> received.size() >= NUMBER_OF_MESSAGES); - Assertions.assertThat(received.stream().map(p -> p.age)) + assertThat(received.stream().map(p -> p.age)) .containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList())); } @@ -108,6 +143,20 @@ public Multi produce() { } } + @ApplicationScoped + public static class OutgoingMessageProducingApp { + + @Produces + @Identifier("data") + static Schema schema = Schema.JSON(Person.class); + + @Outgoing("data") + public Multi> produce() { + return Multi.createFrom().range(0, NUMBER_OF_MESSAGES) + .map(i -> OutgoingMessage.of(String.valueOf(i), new Person("p" + i, i))); + } + } + @ApplicationScoped public static class MessageProducerApp { @Produces diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarClientBaseTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarClientBaseTest.java index 4ec063bf99..ba2a6b78cc 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarClientBaseTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarClientBaseTest.java @@ -128,4 +128,12 @@ public static List send(Producer producer, int numberOfMessage .subscribe().asStream().collect(Collectors.toList()); } + public static List sendMessages(Producer producer, + Function, List>> generator) { + return Multi.createFrom().iterable(generator.apply(producer)) + .runSubscriptionOn(executor) + .onItem().transformToUni(m -> Uni.createFrom().completionStage(m::sendAsync)).merge() + .subscribe().asStream().collect(Collectors.toList()); + } + } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java index 56df004337..67161b15eb 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java @@ -32,6 +32,8 @@ import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.SchemaResolver; import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck; +import io.smallrye.reactive.messaging.pulsar.converters.KeyValueFromPulsarMessageExtractor; +import io.smallrye.reactive.messaging.pulsar.converters.KeyValueFromPulsarMessageKeyValueExtractor; import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack; import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionsFactory; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -75,6 +77,8 @@ public void initWeld() { weld.addBeanClass(PulsarMessageAck.Factory.class); weld.addBeanClass(PulsarMessageAck.Factory.class); weld.addBeanClass(PulsarNack.Factory.class); + weld.addBeanClass(KeyValueFromPulsarMessageExtractor.class); + weld.addBeanClass(KeyValueFromPulsarMessageKeyValueExtractor.class); weld.disableDiscovery(); } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractorTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractorTest.java new file mode 100644 index 0000000000..cb7356d5b4 --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueFromPulsarMessageExtractorTest.java @@ -0,0 +1,151 @@ +package io.smallrye.reactive.messaging.pulsar.converters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.KeyValue; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.keyed.KeyedMulti; +import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; + +class KeyValueFromPulsarMessageExtractorTest extends WeldTestBase { + + @Test + void defaultExctractorStringSchema() throws PulsarClientException { + addBeans(Sink.class, AppWithDefault.class); + + runApplication(baseConfig() + .with("mp.messaging.incoming.in.connector", "smallrye-pulsar") + .with("mp.messaging.incoming.in.serviceUrl", serviceUrl) + .with("mp.messaging.incoming.in.topic", topic) + .with("mp.messaging.incoming.in.schema", "STRING") + .with("mp.messaging.incoming.in.subscriptionInitialReset", "Earliest")); + + Sink sink = get(Sink.class); + + sendMessages(client.newProducer(Schema.STRING) + .producerName("test-producer") + .topic(topic) + .create(), + p -> List.of( + p.newMessage().key("a").value("1"), + p.newMessage().key("b").value("1"), + p.newMessage().key("b").value("2"), + p.newMessage().key("a").value("2"), + p.newMessage().key("a").value("3"), + p.newMessage().key("c").value("1"), + p.newMessage().key("c").value("2"), + p.newMessage().key("a").value("4"))); + + await().until(() -> sink.list().size() == 11); + assertThat(sink.list()) + .containsExactlyInAnyOrder( + "A-0", "B-0", "C-0", + "A-1", "A-2", "A-3", "A-4", + "B-1", "B-2", "C-1", "C-2"); + } + + @Test + void defaultExtractorKeyValueSchema() throws PulsarClientException { + addBeans(Sink.class, AppWithDefaultKeyValue.class); + + runApplication(baseConfig() + .with("mp.messaging.incoming.in.connector", "smallrye-pulsar") + .with("mp.messaging.incoming.in.serviceUrl", serviceUrl) + .with("mp.messaging.incoming.in.topic", topic) + .with("mp.messaging.incoming.in.subscriptionInitialReset", "Earliest")); + + Sink sink = get(Sink.class); + + sendMessages(client.newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32)) + .producerName("test-producer") + .topic(topic) + .create(), + p -> List.of( + p.newMessage().value(new KeyValue<>("a", 1)), + p.newMessage().value(new KeyValue<>("b", 1)), + p.newMessage().value(new KeyValue<>("b", 2)), + p.newMessage().value(new KeyValue<>("a", 2)), + p.newMessage().value(new KeyValue<>("a", 3)), + p.newMessage().value(new KeyValue<>("c", 1)), + p.newMessage().value(new KeyValue<>("c", 2)), + p.newMessage().value(new KeyValue<>("a", 4)))); + + await().until(() -> sink.list().size() == 11); + assertThat(sink.list()) + .containsExactlyInAnyOrder( + "A-0", "B-0", "C-0", + "A-1", "A-2", "A-3", "A-4", + "B-1", "B-2", "C-1", "C-2"); + } + + @ApplicationScoped + public static class Sink { + + private final List list = new CopyOnWriteArrayList<>(); + + @Incoming("out") + void consume(String s) { + list.add(s); + } + + public List list() { + return list; + } + } + + @ApplicationScoped + public static class AppWithDefault { + + @Incoming("in") + @Outgoing("out") + public Multi reshape(KeyedMulti keyed) { + assertThat(keyed.key()).isNotNull().isNotBlank(); + return keyed + .select().distinct() + .onItem().scan(AtomicInteger::new, (count, s) -> { + count.incrementAndGet(); + return count; + }) + .map(s -> keyed.key().toUpperCase() + "-" + s.get()); + } + + } + + @ApplicationScoped + public static class AppWithDefaultKeyValue { + + @Produces + @Identifier("in") + Schema> schema = Schema.KeyValue(Schema.STRING, Schema.INT32); + + @Incoming("in") + @Outgoing("out") + public Multi reshape(KeyedMulti keyed) { + assertThat(keyed.key()).isNotNull(); + return keyed + .select().distinct() + .onItem().scan(AtomicInteger::new, (count, s) -> { + count.incrementAndGet(); + return count; + }) + .map(s -> keyed.key().toUpperCase() + "-" + s.get()); + } + + } + +} diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueMessageFromPulsarMessageExtractorTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueMessageFromPulsarMessageExtractorTest.java new file mode 100644 index 0000000000..b273a689dd --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/converters/KeyValueMessageFromPulsarMessageExtractorTest.java @@ -0,0 +1,156 @@ +package io.smallrye.reactive.messaging.pulsar.converters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.KeyValue; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.keyed.KeyedMulti; +import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; + +class KeyValueMessageFromPulsarMessageExtractorTest extends WeldTestBase { + + @Test + void defaultExctractorStringSchema() throws PulsarClientException { + addBeans(Sink.class, AppWithDefault.class); + + runApplication(baseConfig() + .with("mp.messaging.incoming.in.connector", "smallrye-pulsar") + .with("mp.messaging.incoming.in.serviceUrl", serviceUrl) + .with("mp.messaging.incoming.in.topic", topic) + .with("mp.messaging.incoming.in.schema", "STRING") + .with("mp.messaging.incoming.in.subscriptionInitialReset", "Earliest")); + + Sink sink = get(Sink.class); + + sendMessages(client.newProducer(Schema.STRING) + .producerName("test-producer") + .topic(topic) + .create(), + p -> List.of( + p.newMessage().key("a").value("1"), + p.newMessage().key("b").value("1"), + p.newMessage().key("b").value("2"), + p.newMessage().key("a").value("2"), + p.newMessage().key("a").value("3"), + p.newMessage().key("c").value("1"), + p.newMessage().key("c").value("2"), + p.newMessage().key("a").value("4"))); + + await().until(() -> sink.list().size() == 11); + assertThat(sink.list()) + .containsExactlyInAnyOrder( + "A-0", "B-0", "C-0", + "A-1", "A-2", "A-3", "A-4", + "B-1", "B-2", "C-1", "C-2"); + } + + @Test + void defaultExtractorKeyValueSchema() throws PulsarClientException { + addBeans(Sink.class, AppWithDefaultKeyValue.class); + + runApplication(baseConfig() + .with("mp.messaging.incoming.in.connector", "smallrye-pulsar") + .with("mp.messaging.incoming.in.serviceUrl", serviceUrl) + .with("mp.messaging.incoming.in.topic", topic) + .with("mp.messaging.incoming.in.subscriptionInitialReset", "Earliest")); + + Sink sink = get(Sink.class); + + sendMessages(client.newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32)) + .producerName("test-producer") + .topic(topic) + .create(), + p -> List.of( + p.newMessage().value(new KeyValue<>("a", 1)), + p.newMessage().value(new KeyValue<>("b", 1)), + p.newMessage().value(new KeyValue<>("b", 2)), + p.newMessage().value(new KeyValue<>("a", 2)), + p.newMessage().value(new KeyValue<>("a", 3)), + p.newMessage().value(new KeyValue<>("c", 1)), + p.newMessage().value(new KeyValue<>("c", 2)), + p.newMessage().value(new KeyValue<>("a", 4)))); + + await().until(() -> sink.list().size() == 11); + assertThat(sink.list()) + .containsExactlyInAnyOrder( + "A-0", "B-0", "C-0", + "A-1", "A-2", "A-3", "A-4", + "B-1", "B-2", "C-1", "C-2"); + } + + @ApplicationScoped + public static class Sink { + + private final List list = new CopyOnWriteArrayList<>(); + + @Incoming("out") + void consume(String s) { + list.add(s); + } + + public List list() { + return list; + } + } + + public static class Container { + Message message; + int count; + + public Container(Message message, int count) { + this.message = message; + this.count = count; + } + } + + @ApplicationScoped + public static class AppWithDefault { + + @Incoming("in") + @Outgoing("out") + public Multi> reshape(KeyedMulti> keyed) { + assertThat(keyed.key()).isNotNull().isNotBlank(); + return keyed + .select().distinct() + .onItem().scan(() -> new Container(Message.of(null), 0), + (cont, msg) -> new Container<>(msg, cont.count + 1)) + .map(cont -> cont.message.withPayload(keyed.key().toUpperCase() + "-" + cont.count)); + } + + } + + @ApplicationScoped + public static class AppWithDefaultKeyValue { + + @Produces + @Identifier("in") + Schema> schema = Schema.KeyValue(Schema.STRING, Schema.INT32); + + @Incoming("in") + @Outgoing("out") + public Multi> reshape(KeyedMulti> keyed) { + assertThat(keyed.key()).isNotNull().isNotBlank(); + return keyed + .select().distinct() + .onItem().scan(() -> new Container(Message.of(null), 0), + (cont, msg) -> new Container<>(msg, cont.count + 1)) + .map(cont -> cont.message.withPayload(keyed.key().toUpperCase() + "-" + cont.count)); + } + + } +} diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java index 9781573f52..fb7c73a07e 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/fault/PulsarNackTest.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import jakarta.enterprise.context.ApplicationScoped; @@ -63,6 +64,29 @@ void testIgnore() throws PulsarClientException { }); } + @Test + void testContinue() throws PulsarClientException { + addBeans(PulsarContinue.Factory.class); + // Run app + FailingConsumingApp app = runApplication(config() + .with("mp.messaging.incoming.data.failure-strategy", "continue") + .with("mp.messaging.incoming.data.ackTimeoutMillis", "100") + .with("mp.messaging.incoming.data.ackTimeout.redeliveryBackoff", "100,1000,2") + , FailingConsumingApp.class); + // Produce messages + send(client.newProducer(Schema.INT32) + .producerName("test-producer") + .enableBatching(false) // avoid receiving acked messages with producer batching + .topic(topic) + .create(), NUMBER_OF_MESSAGES, i -> i); + + // Check for consumed messages in app + await().pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(app.getResults()).hasSize(NUMBER_OF_MESSAGES - 10); + assertThat(app.getFailures()).hasSizeGreaterThan(10); + }); + } + @Test void testNackRedelivery() throws PulsarClientException { addBeans(PulsarNack.Factory.class);