Skip to content

Commit

Permalink
Pulsar continue failure strategy
Browse files Browse the repository at this point in the history
Pulsar KeyValueExtractors

Fix for Pulsar OutgoingMessage.of factory method which did not set the key correctly
  • Loading branch information
ozangunalp committed Jul 7, 2023
1 parent 8fcdb37 commit d3d238e
Show file tree
Hide file tree
Showing 17 changed files with 562 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The Pulsar connector supports 4 strategies:
The negative acknowledgment can be further configured using `negativeAckRedeliveryDelayMicros` and `negativeAck.redeliveryBackoff` properties.
- `fail` fail the application, no more messages will be processed.
- `ignore` the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.
- `continue` the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with [acknowledgement timeout](#acknowledgement-timeout) configuration.
- `reconsume-later` sends the message to the [retry letter topic](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic) using the `reconsumeLater` API to be reconsumed with a delay.
The delay can be configured using the `reconsumeLater.delay` property and defaults to 3 seconds.
Custom delay or properties per message can be configured by adding an instance of {{ javadoc('io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata') }} to the failure metadata.
Expand Down Expand Up @@ -113,7 +114,7 @@ The `ackTimeout.redeliveryBackoff` value accepts comma separated values of min d

```properties
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.failure-strategy=ignore
mp.messaging.incoming.data.failure-strategy=continue
mp.messaging.incoming.data.ackTimeoutMillis=10000
mp.messaging.incoming.data.ackTimeout.redeliveryBackoff=1000,60000,2
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package pulsar.configuration;

import java.util.regex.Pattern;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

Expand All @@ -14,8 +12,6 @@
@ApplicationScoped
public class PulsarSchemaProvider {

Pattern linkPattern = Pattern.compile("\\{@link\\s(.*)}");

@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.AVRO(User.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public OutgoingMessage(T value) {

public OutgoingMessage(String key, T value) {
this(value);
this.key = key;
withKey(key);
}

public boolean hasKey() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.reactive.messaging.pulsar.converters;

import java.lang.reflect.Type;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;
import io.smallrye.reactive.messaging.providers.helpers.TypeUtils;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata;

/**
* Key/Value extractor extracting the key from a Pulsar message and passing the message's payload as value.
*
* This extractor has the default priority ({@link KeyValueExtractor#DEFAULT_PRIORITY}).
*/
@ApplicationScoped
public class KeyValueFromPulsarMessageExtractor implements KeyValueExtractor {
@Override
public boolean canExtract(Message<?> message, Type keyType, Type valueType) {
Optional<PulsarIncomingMessageMetadata> metadata = message.getMetadata(PulsarIncomingMessageMetadata.class);
// The type checks can be expensive, so, we do it only once, and rely on the fact the pulsar schema are constant.
return metadata.filter(
incomingMetadata -> (incomingMetadata.hasKey()
&& TypeUtils.isAssignable(keyType, incomingMetadata.getKey().getClass())
&& TypeUtils.isAssignable(valueType, message.getPayload().getClass())))
.isPresent();
}

@Override
public Object extractKey(Message<?> message, Type keyType) {
return message.getMetadata(PulsarIncomingMessageMetadata.class)
.<Object> map(PulsarIncomingMessageMetadata::getKey)
.orElseThrow();
}

@Override
public Object extractValue(Message<?> message, Type valueType) {
return message.getPayload();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.smallrye.reactive.messaging.pulsar.converters;

import java.lang.reflect.Type;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;
import io.smallrye.reactive.messaging.providers.helpers.TypeUtils;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata;

/**
* Key/Value extractor extracting the key and value from a Pulsar message with KeyValue schema.
*
* This extractor has the default priority ({@link KeyValueExtractor#DEFAULT_PRIORITY}).
*/
@ApplicationScoped
public class KeyValueFromPulsarMessageKeyValueExtractor implements KeyValueExtractor {
@Override
public boolean canExtract(Message<?> message, Type keyType, Type valueType) {
Optional<PulsarIncomingMessageMetadata> metadata = message.getMetadata(PulsarIncomingMessageMetadata.class);
// The type checks can be expensive, so, we do it only once, and rely on the fact the pulsar schema are constant.
return metadata.filter(
incomingMetadata -> (message.getPayload() instanceof KeyValue
&& TypeUtils.isAssignable(keyType, ((KeyValue) message.getPayload()).getKey().getClass())
&& TypeUtils.isAssignable(valueType, ((KeyValue) message.getPayload()).getValue().getClass())))
.isPresent();
}

@Override
public Object extractKey(Message<?> message, Type keyType) {
return ((KeyValue) message.getPayload()).getKey();
}

@Override
public Object extractValue(Message<?> message, Type valueType) {
return ((KeyValue) message.getPayload()).getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.smallrye.reactive.messaging.pulsar.fault;

import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.pulsar.client.api.Consumer;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;

/**
* Failure strategy `continue` which calls logs message failure but continues the stream without nacking or acking the message
*/
public class PulsarContinue implements PulsarFailureHandler {
public static final String STRATEGY_NAME = "continue";

@ApplicationScoped
@Identifier(STRATEGY_NAME)
public static class Factory implements PulsarFailureHandler.Factory {

@Override
public PulsarFailureHandler create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {
return new PulsarContinue(config.getChannel());
}
}

private final String channel;

public PulsarContinue(String channel) {
this.channel = channel;
}

@Override
public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Metadata metadata) {
log.messageNackedIgnored(channel, reason.getMessage());
log.messageNackedFullIgnored(reason);
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;

/**
* Failure strategy `fail` which stops the stream by emitting a failure with the message processing failure
*/
public class PulsarFailStop implements PulsarFailureHandler {
public static final String STRATEGY_NAME = "fail";

Expand All @@ -24,16 +27,14 @@ public static class Factory implements PulsarFailureHandler.Factory {
@Override
public PulsarFailStop create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {
return new PulsarFailStop(consumer, config.getChannel(), reportFailure);
return new PulsarFailStop(config.getChannel(), reportFailure);
}
}

private final Consumer<?> consumer;
private final String channel;
private final BiConsumer<Throwable, Boolean> reportFailure;

public PulsarFailStop(Consumer<?> consumer, String channel, BiConsumer<Throwable, Boolean> reportFailure) {
this.consumer = consumer;
public PulsarFailStop(String channel, BiConsumer<Throwable, Boolean> reportFailure) {
this.channel = channel;
this.reportFailure = reportFailure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;

/**
* Failure strategy `ignore` which continues the stream in case of failure and acks the message instead
*/
public class PulsarIgnore implements PulsarFailureHandler {
public static final String STRATEGY_NAME = "ignore";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;

/**
* Failure strategy `nack` which calls negative acknowledgement for the message and continues the stream
*/
public class PulsarNack implements PulsarFailureHandler {
public static final String STRATEGY_NAME = "nack";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata;

/**
* Failure strategy `reconsume-later` which calls reconsume later for the message
*/
public class PulsarReconsumeLater implements PulsarFailureHandler {
public static final String STRATEGY_NAME = "reconsume-later";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,29 @@

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.junit.jupiter.api.Test;

import com.github.dockerjava.zerodep.shaded.org.apache.commons.codec.binary.Base64;

public class MessageMetadataTest {

@Test
void testOutgoingMessageWithKeyValueAndKeyValueSchema() {
TypedMessageBuilderImpl<KeyValue<Integer, String>> messageBuilder = new TypedMessageBuilderImpl<>(null,
Schema.KeyValue(Schema.INT32, Schema.STRING));

TypedMessageBuilderImpl<KeyValue<Integer, String>> msg = (TypedMessageBuilderImpl<KeyValue<Integer, String>>) messageBuilder
.value(new KeyValue<>(1, "value"));

assertThat(msg.getMetadataBuilder().hasNullValue()).isFalse();
assertThat(msg.getMetadataBuilder().hasNullPartitionKey()).isFalse();
assertThat(msg.getMetadataBuilder().hasPartitionKey()).isFalse();
assertThat(msg.hasKey()).isFalse();

assertThatThrownBy(() -> msg.getKey()).isInstanceOf(IllegalStateException.class);
}

@Test
void testOutgoingMessageWithNullKeyValueAndKeyValueSchema() {
TypedMessageBuilderImpl<?> messageBuilder = new TypedMessageBuilderImpl<>(null,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.reactive.messaging.pulsar;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
Expand All @@ -14,7 +15,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.assertj.core.api.Assertions;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -54,10 +54,45 @@ void testPayloadProducer() throws PulsarClientException {

// wait until we have gathered all the expected messages
await().until(() -> received.size() >= NUMBER_OF_MESSAGES);
Assertions.assertThat(received.stream().map(p -> p.age))
assertThat(received.stream().map(p -> p.age))
.containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList()));
}

@Test
void testOutgoingMessageProducer() throws PulsarClientException {
// Run app
OutgoingMessageProducingApp producingApp = runApplication(config(), OutgoingMessageProducingApp.class);

// create consumer
Consumer<Person> consumer = client.newConsumer(Schema.JSON(Person.class))
.topic(topic)
.subscriptionName("test-" + topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.consumerName("test-consumer")
.subscribe();

// gather consumes messages
List<org.apache.pulsar.client.api.Message<Person>> received = new CopyOnWriteArrayList<>();
receive(consumer, NUMBER_OF_MESSAGES, message -> {
try {
received.add(message);
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
});

// wait until we have gathered all the expected messages
await().until(() -> received.size() >= NUMBER_OF_MESSAGES);
assertThat(received).extracting(m -> m.getValue().age)
.containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList()));
assertThat(received)
.allMatch(org.apache.pulsar.client.api.Message::hasKey)
.extracting(m -> Integer.parseInt(m.getKey()))
.containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList()));

}

@Test
void testMessageProducer() throws PulsarClientException {
// Run app
Expand All @@ -84,7 +119,7 @@ void testMessageProducer() throws PulsarClientException {

// wait until we have gathered all the expected messages
await().until(() -> received.size() >= NUMBER_OF_MESSAGES);
Assertions.assertThat(received.stream().map(p -> p.age))
assertThat(received.stream().map(p -> p.age))
.containsAll(IntStream.range(0, 100).boxed().collect(Collectors.toList()));
}

Expand All @@ -108,6 +143,20 @@ public Multi<Person> produce() {
}
}

@ApplicationScoped
public static class OutgoingMessageProducingApp {

@Produces
@Identifier("data")
static Schema<Person> schema = Schema.JSON(Person.class);

@Outgoing("data")
public Multi<OutgoingMessage<Person>> produce() {
return Multi.createFrom().range(0, NUMBER_OF_MESSAGES)
.map(i -> OutgoingMessage.of(String.valueOf(i), new Person("p" + i, i)));
}
}

@ApplicationScoped
public static class MessageProducerApp {
@Produces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,12 @@ public static <T> List<MessageId> send(Producer<T> producer, int numberOfMessage
.subscribe().asStream().collect(Collectors.toList());
}

public static <T> List<MessageId> sendMessages(Producer<T> producer,
Function<Producer<T>, List<TypedMessageBuilder<T>>> generator) {
return Multi.createFrom().iterable(generator.apply(producer))
.runSubscriptionOn(executor)
.onItem().transformToUni(m -> Uni.createFrom().completionStage(m::sendAsync)).merge()
.subscribe().asStream().collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.SchemaResolver;
import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck;
import io.smallrye.reactive.messaging.pulsar.converters.KeyValueFromPulsarMessageExtractor;
import io.smallrye.reactive.messaging.pulsar.converters.KeyValueFromPulsarMessageKeyValueExtractor;
import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionsFactory;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
Expand Down Expand Up @@ -75,6 +77,8 @@ public void initWeld() {
weld.addBeanClass(PulsarMessageAck.Factory.class);
weld.addBeanClass(PulsarMessageAck.Factory.class);
weld.addBeanClass(PulsarNack.Factory.class);
weld.addBeanClass(KeyValueFromPulsarMessageExtractor.class);
weld.addBeanClass(KeyValueFromPulsarMessageKeyValueExtractor.class);
weld.disableDiscovery();
}

Expand Down
Loading

0 comments on commit d3d238e

Please sign in to comment.