Skip to content

Commit

Permalink
Handle some post-processing for cases consuming payload and returning…
Browse files Browse the repository at this point in the history
… message
  • Loading branch information
ozangunalp committed Sep 4, 2024
1 parent 6bc9865 commit 728c022
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 39 deletions.
57 changes: 35 additions & 22 deletions documentation/src/main/docs/concepts/signatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,41 @@ and available acknowledgement strategies (when applicable).

## Method signatures to process data

| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation |
|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------|----------------------|
| `@Outgoing @Incoming Message<O> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming CompletionStage<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming CompletionStage<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Uni<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming Uni<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Flow.Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming ProcessorBuilder<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation |
|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------------------------------|----------------------|
| `@Outgoing @Incoming Message<O> method(Message<I> msg)` | Called for every incoming message (sequentially) | POST_PROCESSING (Smallrye only), *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming Message<O> method(I payload)` | Called for every incoming message (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming CompletionStage<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming CompletionStage<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming CompletionStage<Message<O>> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Uni<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming Uni<Message<O>> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Uni<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Flow.Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming ProcessorBuilder<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |

Note that in additional to the MicroProfile Reactive Messaging specification,
SmallRye Reactive Messaging supports the post-processing acknowledgment handling with automatic metadata propagation for the following signatures:

- `@Outgoing @Incoming Message<O> method(I payload)`
- `@Outgoing @Incoming CompletionStage<Message<O>> method(I payload)`
- `@Outgoing @Incoming Uni<Message<O>> method(I payload)`
- `@Outgoing @Incoming Message<O> method(Message<I> payload)` : For this signature, the post-processing acknowledgment handling is limited.
It covers cases for nacking incoming messages on caught exceptions at the method body, acking incoming messages when outgoing message is skipped by returning `null`, and chaining acknowlegment from outgoing message to the incoming.
However, if the incoming message has already been (n)acked, you will experience duplicate (n)acks.

## Method signatures to manipulate streams

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ private ValidationOutput validateProcessor(Acknowledgment.Strategy acknowledgmen

if (production == MediatorConfiguration.Production.INDIVIDUAL_MESSAGE
&& acknowledgment == Acknowledgment.Strategy.POST_PROCESSING) {
throw ex.illegalStateForValidateProcessor(methodAsString);
// relax here the validation for the post-processing acknowledgment
if (consumption == MediatorConfiguration.Consumption.MESSAGE) {
log.postProcessingNotFullySupported(methodAsString);
}
}

return new ValidationOutput(production, consumption, useBuilderTypes, useReactiveStreams, payloadType);
Expand Down
Loading

0 comments on commit 728c022

Please sign in to comment.