From 728c0222e9cb20f16555e9356a44cbf58fd0586d Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 29 Aug 2024 17:29:03 +0200 Subject: [PATCH] Handle some post-processing for cases consuming payload and returning message --- .../src/main/docs/concepts/signatures.md | 57 +++++--- .../MediatorConfigurationSupport.java | 5 +- .../providers/ProcessorMediator.java | 107 ++++++++++++--- .../providers/i18n/ProviderLogging.java | 4 + .../AsynchronousPayloadProcessorAckTest.java | 57 ++++++++ .../SynchronousPayloadProcessorAckTest.java | 127 ++++++++++++++++++ 6 files changed, 318 insertions(+), 39 deletions(-) diff --git a/documentation/src/main/docs/concepts/signatures.md b/documentation/src/main/docs/concepts/signatures.md index de02de4107..934f9b12f1 100644 --- a/documentation/src/main/docs/concepts/signatures.md +++ b/documentation/src/main/docs/concepts/signatures.md @@ -41,28 +41,41 @@ and available acknowledgement strategies (when applicable). ## Method signatures to process data -| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation | -|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------|----------------------| -| `@Outgoing @Incoming Message method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | -| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | -| `@Outgoing @Incoming CompletionStage> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | -| `@Outgoing @Incoming CompletionStage method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | -| `@Outgoing @Incoming Uni> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | -| `@Outgoing @Incoming Uni method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | -| `@Outgoing @Incoming Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming Flow.Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Flow.Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming ProcessorBuilder, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming ProcessorBuilder method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | -| `@Outgoing @Incoming Multi> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Multi method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | -| `@Outgoing @Incoming Flow.Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Flow.Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | -| `@Outgoing @Incoming PublisherBuilder> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming PublisherBuilder method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation | +|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------------------------------|----------------------| +| `@Outgoing @Incoming Message method(Message msg)` | Called for every incoming message (sequentially) | POST_PROCESSING (Smallrye only), *MANUAL*, NONE, PRE_PROCESSING | manual | +| `@Outgoing @Incoming Message method(I payload)` | Called for every incoming message (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming CompletionStage> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | +| `@Outgoing @Incoming CompletionStage method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming CompletionStage> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming Uni> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | +| `@Outgoing @Incoming Uni> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming Uni method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | +| `@Outgoing @Incoming Flow.Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Flow.Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | +| `@Outgoing @Incoming ProcessorBuilder, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming ProcessorBuilder method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | +| `@Outgoing @Incoming Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming Multi> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Multi method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming Flow.Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Flow.Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming PublisherBuilder> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming PublisherBuilder method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | + +Note that in additional to the MicroProfile Reactive Messaging specification, +SmallRye Reactive Messaging supports the post-processing acknowledgment handling with automatic metadata propagation for the following signatures: + +- `@Outgoing @Incoming Message method(I payload)` +- `@Outgoing @Incoming CompletionStage> method(I payload)` +- `@Outgoing @Incoming Uni> method(I payload)` +- `@Outgoing @Incoming Message method(Message payload)` : For this signature, the post-processing acknowledgment handling is limited. + It covers cases for nacking incoming messages on caught exceptions at the method body, acking incoming messages when outgoing message is skipped by returning `null`, and chaining acknowlegment from outgoing message to the incoming. + However, if the incoming message has already been (n)acked, you will experience duplicate (n)acks. ## Method signatures to manipulate streams diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java index 5c1b724148..732104246c 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java @@ -422,7 +422,10 @@ private ValidationOutput validateProcessor(Acknowledgment.Strategy acknowledgmen if (production == MediatorConfiguration.Production.INDIVIDUAL_MESSAGE && acknowledgment == Acknowledgment.Strategy.POST_PROCESSING) { - throw ex.illegalStateForValidateProcessor(methodAsString); + // relax here the validation for the post-processing acknowledgment + if (consumption == MediatorConfiguration.Consumption.MESSAGE) { + log.postProcessingNotFullySupported(methodAsString); + } } return new ValidationOutput(production, consumption, useBuilderTypes, useReactiveStreams, payloadType); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index 665c3719e4..34d02575be 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -26,6 +26,7 @@ import io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator; import io.smallrye.reactive.messaging.providers.helpers.ClassUtils; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; import mutiny.zero.flow.adapters.AdaptersToFlow; @SuppressWarnings("ReactiveStreamsUnusedPublisher") @@ -147,19 +148,19 @@ public void initialize(Object bean) { processMethodReturningIndividualPayloadAndConsumingIndividualItem(); break; case COMPLETION_STAGE_OF_MESSAGE: - // Case 11 + // Case 12 processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem(); break; case COMPLETION_STAGE_OF_PAYLOAD: - // Case 12 + // Case 11 processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem(); break; case UNI_OF_MESSAGE: - // Case 11 - Uni variant + // Case 12 - Uni variant processMethodReturningAUniOfMessageAndConsumingIndividualItem(); break; case UNI_OF_PAYLOAD: - // Case 12 - Uni variant + // Case 11 - Uni variant processMethodReturningAUniOfPayloadAndConsumingIndividualItem(); break; default: @@ -167,6 +168,9 @@ public void initialize(Object bean) { } } + /** + * {@code PublisherBuilder> method(Message msg)} + */ @SuppressWarnings("unchecked") private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) @@ -174,6 +178,9 @@ private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessage msg -> AdaptersToFlow.publisher(((PublisherBuilder>) invoke(msg)).buildRs())); } + /** + * {@code PublisherBuilder> method(Message msg)} + */ @SuppressWarnings("unchecked") private void processMethodReturningAReactiveStreamsPublisherOfMessageAndConsumingMessages() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) @@ -181,12 +188,18 @@ private void processMethodReturningAReactiveStreamsPublisherOfMessageAndConsumin msg -> AdaptersToFlow.publisher((Publisher>) invoke(msg))); } + /** + * {@code Flow.Publisher> method(Message msg)} + */ @SuppressWarnings("unchecked") private void processMethodReturningAPublisherOfMessageAndConsumingMessages() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate(msg -> (Flow.Publisher>) invoke(msg)); } + /** + * {@code ProcessorBuilder, Message> method()} + */ private void processMethodReturningAProcessorBuilderOfMessages() { ProcessorBuilder, Message> builder = Objects.requireNonNull(invoke(), msg.methodReturnedNull(configuration.methodAsString())); @@ -197,6 +210,9 @@ private void processMethodReturningAProcessorBuilderOfMessages() { }; } + /** + * {@code Processor, Message> method()} + */ private void processMethodReturningAReactiveStreamsProcessorOfMessages() { Processor, Message> result = Objects.requireNonNull(invoke(), msg.methodReturnedNull(configuration.methodAsString())); @@ -207,6 +223,9 @@ private void processMethodReturningAReactiveStreamsProcessorOfMessages() { }; } + /** + * {@code Flow.Processor, Message> method()} + */ private void processMethodReturningAProcessorOfMessages() { Flow.Processor, Message> result = Objects.requireNonNull(invoke(), msg.methodReturnedNull(configuration.methodAsString())); @@ -217,6 +236,9 @@ private void processMethodReturningAProcessorOfMessages() { }; } + /** + * {@code ProcessorBuilder method()} + */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void processMethodReturningAProcessorBuilderOfPayloads() { ProcessorBuilder returnedProcessorBuilder = invoke(); @@ -230,6 +252,9 @@ private void processMethodReturningAProcessorBuilderOfPayloads() { }; } + /** + * {@code Processor method()} + */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void processMethodReturningAReactiveStreamsProcessorOfPayloads() { Processor returnedProcessor = invoke(); @@ -242,6 +267,9 @@ private void processMethodReturningAReactiveStreamsProcessorOfPayloads() { }; } + /** + * {@code Flow.Processor method()} + */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void processMethodReturningAProcessorOfPayloads() { Flow.Processor returnedProcessor = invoke(); @@ -254,6 +282,9 @@ private void processMethodReturningAProcessorOfPayloads() { }; } + /** + * {@code PublisherBuilder method(I payload)} + */ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); @@ -278,6 +309,9 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa }; } + /** + * {@code Publisher method(I payload)} + */ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); @@ -302,6 +336,9 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi }; } + /** + * {@code Flow.Publisher method(I payload)} + */ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); @@ -327,6 +364,10 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() { }; } + /** + * {@code Message method(Message msg)} + * {@code Message method(I payload)} + */ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() { // Item can be a message or a payload if (configuration.isBlocking()) { @@ -337,7 +378,7 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() .onItem() .transformToMultiAndConcatenate(message -> invokeBlocking(message, getArguments(message)) .onItemOrFailure() - .transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message) o, t)) + .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) .onItem().transformToMulti(this::handleSkip)); }; } else { @@ -346,7 +387,7 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() return multi .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message)) .onItemOrFailure() - .transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message) o, t)) + .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) .onItem().transformToMulti(this::handleSkip)) .merge(maxConcurrency()); }; @@ -359,16 +400,16 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() .onItem().transformToMultiAndConcatenate( message -> invokeOnMessageContext(message, getArguments(message)) .onItem().transform(o -> (Message) o) - .onItemOrFailure().transformToUni(this::handlePostInvocationWithMessage) + .onItemOrFailure() + .transformToUni((r, f) -> handlePostInvocationWithMessage(message, r, f)) .onItem().transformToMulti(this::handleSkip)); }; } } - private boolean isPostAck() { - return configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING; - } - + /** + * {@code O method(I payload)} + */ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() { // Item can be message or payload. if (configuration.isBlocking()) { @@ -441,36 +482,62 @@ private Uni> handlePostInvocation(Message message, } } - private Uni> handlePostInvocationWithMessage(Message res, - Throwable fail) { + private Uni> handlePostInvocationWithMessage(Message in, Message res, Throwable fail) { if (fail != null) { - throw ex.processingException(getMethodAsString(), fail); + if (isPostAck()) { + // Here we nack the incoming, but maybe the message has already been (n)acked + return Uni.createFrom() + .completionStage(in.nack(fail).thenApply(x -> null)); + } else { + throw ex.processingException(getMethodAsString(), fail); + } } else if (res != null) { + if (isPostAck()) { + // Here we chain the outgoing message to the incoming, but maybe the message has already been (n)acked + return Uni.createFrom().item((Message) res.withAckWithMetadata(m -> res.ack(m).thenCompose(x -> in.ack(m))) + .withNackWithMetadata((t, m) -> res.nack(t, m).thenCompose(x -> in.nack(t, m)))); + } + return Uni.createFrom().item((Message) res); } else { // the method returned null, the message is not forwarded + if (isPostAck()) { + // Here we ack the incoming message, but maybe the message has already been (n)acked + return Uni.createFrom().completionStage(in.ack().thenApply(x -> null)); + } return Uni.createFrom().nullItem(); } } + /** + * {@code CompletionStage> method(I payload)} + */ private void processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( message -> invokeOnMessageContext(message, getArguments(message)) .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs)) - .onItemOrFailure().transformToUni((r, f) -> handlePostInvocationWithMessage((Message) r, f)) + .onItemOrFailure() + .transformToUni((r, f) -> handlePostInvocationWithMessage(message, (Message) r, f)) .onItem().transformToMulti(this::handleSkip)); } + /** + * {@code Uni> method(I payload)} + */ private void processMethodReturningAUniOfMessageAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( message -> invokeOnMessageContext(message, getArguments(message)) .onItem().transformToUni(u -> (Uni) u) - .onItemOrFailure().transformToUni((r, f) -> handlePostInvocationWithMessage((Message) r, f)) + .onItemOrFailure() + .transformToUni((r, f) -> handlePostInvocationWithMessage(message, (Message) r, f)) .onItem().transformToMulti(this::handleSkip)); } + /** + * {@code CompletionStage method(I payload)} + */ private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( @@ -480,6 +547,9 @@ private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividu .onItem().transformToMulti(this::handleSkip)); } + /** + * {@code Uni method(I payload)} + */ private void processMethodReturningAUniOfPayloadAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( @@ -502,4 +572,9 @@ private boolean isReturningAProcessorOrAReactiveStreamsProcessorOrAProcessorBuil || ClassUtils.isAssignable(returnType, Processor.class) || ClassUtils.isAssignable(returnType, ProcessorBuilder.class); } + + private boolean isPostAck() { + return configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING; + } + } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java index adf369b8e4..50e65195cb 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java @@ -143,4 +143,8 @@ public interface ProviderLogging extends BasicLogger { @LogMessage(level = Logger.Level.DEBUG) @Message(id = 242, value = "Resuming polling messages for channel %s, queue size %s <= %s") void resumingRequestingMessages(String channel, int size, int halfMaxQueueSize); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 243, value = "Processing method '%s' annotated with @Acknowledgement(POST_PROCESSING), but may not be compatible with post-processing acknowledgement management. You may experience duplicate (negative-)acknowledgement of messages.") + void postProcessingNotFullySupported(String methodAsString); } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java index 58e96ec94f..564b4f582a 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java @@ -42,6 +42,40 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayload() throws assertThat(nacked).hasSize(0); } + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningCompletionStageOfMessage() throws InterruptedException { + addBeanClass(SuccessfulPayloadProcessorCompletionStageOfMessage.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningUniOfMessage() throws InterruptedException { + addBeanClass(SuccessfulPayloadProcessorUniOfMessage.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + @Test public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadUni() throws InterruptedException { addBeanClass(SuccessfulPayloadProcessorUni.class); @@ -161,6 +195,29 @@ public Uni process(String s) { } + + @ApplicationScoped + public static class SuccessfulPayloadProcessorCompletionStageOfMessage { + + @Incoming("data") + @Outgoing("out") + public CompletionStage> process(String s) { + return CompletableFuture.supplyAsync(() -> Message.of(s.toUpperCase())); + } + + } + + @ApplicationScoped + public static class SuccessfulPayloadProcessorUniOfMessage { + + @Incoming("data") + @Outgoing("out") + public Uni> process(String s) { + return Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> Message.of(s.toUpperCase()))); + } + + } + @ApplicationScoped public static class FailingPayloadProcessor { diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java index e405eefb84..6557bd3e1e 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java @@ -48,6 +48,23 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayload() throws assertThat(nacked).hasSize(0); } + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningMessage() throws InterruptedException { + addBeanClass(SuccessfulPayloadToMessageProcessor.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + @Test public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws InterruptedException { addBeanClass(SuccessfulMessageToPayloadProcessor.class); @@ -65,6 +82,61 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws assertThat(nacked).hasSize(0); } + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessage() throws InterruptedException { + addBeanClass(SuccessfulMessageToMessageProcessor.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessing() throws InterruptedException { + addBeanClass(SuccessfulMessageToMessageProcessorPostProcessing.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + + + @Test + public void testThatMessagesAreNackedAfterFailingProcessingOfMessageReturningMessage() throws InterruptedException { + addBeanClass(FailingMessageToMessageProcessor.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + List throwables = run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 8); + assertThat(acked).hasSize(9); + assertThat(nacked).hasSize(1); + assertThat(throwables).hasSize(1).allSatisfy(t -> assertThat(t).isInstanceOf(ProcessingException.class) + .hasCauseInstanceOf(InvocationTargetException.class).hasStackTraceContaining("b")); + } + @Test public void testThatMessagesAreNackedAfterFailingProcessingOfPayload() throws InterruptedException { addBeanClass(FailingPayloadProcessor.class); @@ -230,6 +302,17 @@ public String process(String s) { } + public static class SuccessfulPayloadToMessageProcessor { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(String s) { + return Message.of(s.toUpperCase()); + } + + } + @ApplicationScoped public static class SuccessfulMessageToPayloadProcessor { @@ -242,6 +325,27 @@ public String process(Message s) { } + public static class SuccessfulMessageToMessageProcessor { + + @Incoming("data") + @Outgoing("out") + public Message process(Message s) { + return s.withPayload(s.getPayload().toUpperCase()); + } + + } + + public static class SuccessfulMessageToMessageProcessorPostProcessing { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(Message s) { + return Message.of(s.getPayload().toUpperCase()); + } + + } + @ApplicationScoped public static class FailingPayloadProcessor { @@ -286,6 +390,29 @@ public String process(Message m) { } + @ApplicationScoped + public static class FailingMessageToMessageProcessor { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(Message m) { + String s = m.getPayload(); + if (s.equalsIgnoreCase("b")) { + // nacked. + throw new IllegalArgumentException("b"); + } + + if (s.equalsIgnoreCase("e")) { + // acked but not forwarded + return null; + } + + return m.withPayload(s.toUpperCase()); + } + + } + @ApplicationScoped public static class SuccessfulBlockingPayloadProcessor {