From 6bc986533908da9cfa08f80114388f85acb7e7e2 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 29 Aug 2024 14:57:52 +0200 Subject: [PATCH] Handle post-processing for payload consuming stream returning processor methods Closes #2732 and #2733 --- .../src/main/docs/concepts/signatures.md | 16 ++--- .../providers/ProcessorMediator.java | 59 +++++++++++++------ .../ProcessorShapeReturningPublisherTest.java | 25 ++++++++ ...gIndividualPayloadWithProcessingError.java | 29 +++++++++ ...gIndividualPayloadWithProcessingError.java | 30 ++++++++++ 5 files changed, 133 insertions(+), 26 deletions(-) create mode 100644 smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java create mode 100644 smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java diff --git a/documentation/src/main/docs/concepts/signatures.md b/documentation/src/main/docs/concepts/signatures.md index 816773e9e7..de02de4107 100644 --- a/documentation/src/main/docs/concepts/signatures.md +++ b/documentation/src/main/docs/concepts/signatures.md @@ -55,14 +55,14 @@ and available acknowledgement strategies (when applicable). | `@Outgoing @Incoming Flow.Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | | `@Outgoing @Incoming ProcessorBuilder, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | | `@Outgoing @Incoming ProcessorBuilder method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming Publisher> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Publisher method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | -| `@Outgoing @Incoming Multi> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Multi method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | -| `@Outgoing @Incoming Flow.Publisher> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Flow.Publisher method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | -| `@Outgoing @Incoming PublisherBuilder> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming PublisherBuilder method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | +| `@Outgoing @Incoming Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming Multi> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Multi method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming Flow.Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Flow.Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming PublisherBuilder> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming PublisherBuilder method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | ## Method signatures to manipulate streams diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index 561d7ba98e..665c3719e4 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -258,13 +258,19 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { - PublisherBuilder pb = invoke(getArguments(message)); - if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) { - // POST_PROCESSING must not be used when returning an infinite stream - AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); - return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs())) - .onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + if (isPostAck()) { + try { + PublisherBuilder pb = invoke(getArguments(message)); + // POST_PROCESSING must not be used when returning an infinite stream + AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); + return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs())) + .onItem() + .transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + } catch (Throwable t) { + return handlePostInvocation(message, t); + } } else { + PublisherBuilder pb = invoke(getArguments(message)); return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs())) .onItem().transform(payload -> payloadToMessage(payload, message.getMetadata())); } @@ -276,13 +282,19 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { - Publisher pub = invoke(getArguments(message)); - if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) { - // POST_PROCESSING must not be used when returning an infinite stream - AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); - return MultiUtils.publisher(AdaptersToFlow.publisher(pub)) - .onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + if (isPostAck()) { + try { + Publisher pub = invoke(getArguments(message)); + // POST_PROCESSING must not be used when returning an infinite stream + AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); + return MultiUtils.publisher(AdaptersToFlow.publisher(pub)) + .onItem() + .transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + } catch (Throwable t) { + return handlePostInvocation(message, t); + } } else { + Publisher pub = invoke(getArguments(message)); return MultiUtils.publisher(AdaptersToFlow.publisher(pub)) .onItem().transform(payload -> payloadToMessage(payload, message.getMetadata())); } @@ -294,13 +306,20 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { - Flow.Publisher pub = invoke(getArguments(message)); - if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) { - // POST_PROCESSING must not be used when returning an infinite stream - AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); - return MultiUtils.publisher(pub) - .onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + if (isPostAck()) { + try { + Flow.Publisher pub = invoke(getArguments(message)); + // POST_PROCESSING must not be used when returning an infinite stream + AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); + return MultiUtils.publisher(pub) + .onItem() + .transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + + } catch (Throwable t) { + return handlePostInvocation(message, t); + } } else { + Flow.Publisher pub = invoke(getArguments(message)); return MultiUtils.publisher(pub) .onItem().transform(payload -> payloadToMessage(payload, message.getMetadata())); } @@ -384,6 +403,10 @@ private Flow.Publisher> handleSkip(Message m) } } + private Multi> handlePostInvocation(Message message, Throwable fail) { + return Uni.createFrom().completionStage(() -> message.nack(fail).thenApply(x -> (Message) null)).toMulti(); + } + private Uni> handlePostInvocation(Message message, Object res, Throwable fail) { if (fail != null) { if (isPostAck()) { diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java index a3bf604f67..1772228e1c 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java @@ -2,12 +2,17 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.junit.jupiter.api.Test; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload; +import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfMessagesAndConsumingIndividualMessage; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload; +import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError; public class ProcessorShapeReturningPublisherTest extends WeldTestBase { @@ -27,6 +32,17 @@ public void testBeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload() assertThat(collector.payloads()).isEqualTo(EXPECTED); } + @Test + public void BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError() { + addBeanClass(BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class); + initialize(); + MyCollector collector = container.select(MyCollector.class).get(); + assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 5) + .map(i -> i * 2) + .flatMap(i -> IntStream.of(i, i)).boxed() + .map(Object::toString).collect(Collectors.toList())); + } + @Test public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload() { addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload.class); @@ -35,6 +51,15 @@ public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPa assertThat(collector.payloads()).isEqualTo(EXPECTED); } + @Test + public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError() { + addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class); + initialize(); + MyCollector collector = container.select(MyCollector.class).get(); + assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 6).flatMap(i -> IntStream.of(i, i)).boxed() + .map(Object::toString).collect(Collectors.toList())); + } + @Test public void testBeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage() { addBeanClass(BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage.class); diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java new file mode 100644 index 0000000000..3670a92cb9 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java @@ -0,0 +1,29 @@ +package io.smallrye.reactive.messaging.beans; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; + +import io.reactivex.Flowable; + +@ApplicationScoped +public class BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError { + + @Incoming("count") + @Outgoing("sink") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public PublisherBuilder process(Integer payload) { + if (payload > 5) { + throw new IllegalArgumentException("boom"); + } + return ReactiveStreams.of(payload) + .map(i -> i + 1) + .flatMapRsPublisher(i -> Flowable.just(i, i)) + .map(i -> Integer.toString(i)); + } + +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java new file mode 100644 index 0000000000..aa7e8bf583 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java @@ -0,0 +1,30 @@ +package io.smallrye.reactive.messaging.beans; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.reactivestreams.Publisher; + +import io.reactivex.Flowable; + +@ApplicationScoped +public class BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError { + + @Incoming("count") + @Outgoing("sink") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Publisher process(Integer payload) { + if (payload % 2 == 0) { + throw new IllegalArgumentException("boom"); + } + return ReactiveStreams.of(payload) + .map(i -> i + 1) + .flatMapRsPublisher(i -> Flowable.just(i, i)) + .map(i -> Integer.toString(i)) + .buildRs(); + } + +}