Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message metadata propagation with ack and nack #2244

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>process-test-resources</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
</artifactItem>
</artifactItems>
<stripVersion>true</stripVersion>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>coverage</id>
Expand Down
116 changes: 114 additions & 2 deletions api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,119 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"code": "java.annotation.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack()",
"new": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack()",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"

},
{
"code": "java.annotation.removed",
"old": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable)",
"new": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable)",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"code": "java.annotation.attributeValueChanged",
"old": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"new": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"code": "java.annotation.attributeValueChanged",
"old": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"new": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>)",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"new": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages.Default",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>> org.eclipse.microprofile.reactive.messaging.Message<T>::getNack() @ io.smallrye.reactive.messaging.TargetedMessages",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.removed",
"old": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method java.util.concurrent.CompletionStage<java.lang.Void> org.eclipse.microprofile.reactive.messaging.Message<T>::nack(java.lang.Throwable) @ io.smallrye.reactive.messaging.TargetedMessages",
"annotation": "@io.smallrye.common.annotation.Experimental(\"nack support is a SmallRye-only feature\")",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method <T> org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::of(T, org.eclipse.microprofile.reactive.messaging.Metadata, java.util.function.Supplier<java.util.concurrent.CompletionStage<java.lang.Void>>, java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
},
{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"new": "method org.eclipse.microprofile.reactive.messaging.Message<T> org.eclipse.microprofile.reactive.messaging.Message<T>::withNack(java.util.function.Function<java.lang.Throwable, java.util.concurrent.CompletionStage<java.lang.Void>>) @ io.smallrye.reactive.messaging.TargetedMessages",
"annotationType": "io.smallrye.common.annotation.Experimental",
"attribute": "value",
"oldValue": "\"nack support is a SmallRye-only feature\"",
"newValue": "\"metadata propagation is a SmallRye-specific feature\"",
"justification": "Added Message metadata propagation, nack support no longer a SmallRye-only feature"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -46,4 +158,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.smallrye.reactive.messaging;

import jakarta.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;

/**
* Interceptor for incoming messages on connector channels.
* <p>
* To register an outgoing interceptor, expose a managed bean, implementing this interface,
* and qualified with {@code @Identifier} with the targeted channel name.
* <p>
* Only one interceptor is allowed to be bound for interception per incoming channel.
* When multiple interceptors are available, implementation should override the {@link #getPriority()} method.
*/
@Experimental("Smallrye-only feature")
public interface IncomingInterceptor extends Prioritized {

@Override
default int getPriority() {
return -1;
}

/**
* Called after message received
*
* @param message received message
* @return the message to dispatch for consumer methods, possibly mutated
*/
default Message<?> afterMessageReceive(Message<?> message) {
return message;
}

/**
* Called after message acknowledgment
*
* @param message acknowledged message
*/
void onMessageAck(Message<?> message);

/**
* Called after message negative-acknowledgement
*
* @param message message to negative-acknowledge
* @param failure failure
*/
void onMessageNack(Message<?> message, Throwable failure);
}
Loading
Loading