Skip to content

Commit

Permalink
Merge pull request #2263 from ozangunalp/jms_pool_queue
Browse files Browse the repository at this point in the history
Transform JMS Connector thread pool to fix sized ThreadPool without rejection
  • Loading branch information
cescoffier authored Aug 17, 2023
2 parents 0845f60 + 8cd6422 commit e8339ac
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
Expand All @@ -12,6 +11,7 @@

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.json.JsonMapping;

public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T> {
Expand Down Expand Up @@ -110,13 +110,16 @@ public Supplier<CompletionStage<Void>> getAck() {

@Override
public CompletionStage<Void> ack() {
return CompletableFuture.runAsync(() -> {
try {
delegate.acknowledge();
} catch (JMSException e) {
throw new IllegalArgumentException("Unable to acknowledge message", e);
}
}, executor);
return Uni.createFrom().voidItem()
.onItem().invoke(m -> {
try {
delegate.acknowledge();
} catch (JMSException e) {
throw new IllegalArgumentException("Unable to acknowledge message", e);
}
})
.runSubscriptionOn(executor)
.subscribeAsCompletionStage();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -102,7 +100,7 @@ public class JmsConnector implements InboundConnector, OutboundConnector {

@PostConstruct
public void init() {
this.executor = new ThreadPoolExecutor(0, maxPoolSize, ttl, TimeUnit.SECONDS, new SynchronousQueue<>());
this.executor = Executors.newFixedThreadPool(maxPoolSize);
if (jsonMapper.isUnsatisfied()) {
log.warn(
"Please add one of the additional mapping modules (-jsonb or -jackson) to be able to (de)serialize JSON messages.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

Expand Down Expand Up @@ -67,80 +65,79 @@ class JmsSink {
producer.setJMSReplyTo(replyToDestination);
});

sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> Uni.createFrom().completionStage(() -> {
try {
return send(message);
} catch (JMSException e) {
return CompletableFuture.failedStage(new IllegalStateException(e));
}
})).onFailure().invoke(log::unableToSend));
sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(this::send)
.onFailure().invoke(log::unableToSend));

}

private CompletionStage<Message<?>> send(Message<?> message) throws JMSException {
private Uni<? extends Message<?>> send(Message<?> message) {
Object payload = message.getPayload();

// If the payload is a JMS Message, send it as it is, ignoring metadata.
if (payload instanceof jakarta.jms.Message) {
return dispatch(message, () -> producer.send(destination, (jakarta.jms.Message) payload));
}

jakarta.jms.Message outgoing;
if (payload instanceof String || payload.getClass().isPrimitive() || isPrimitiveBoxed(payload.getClass())) {
outgoing = context.createTextMessage(payload.toString());
outgoing.setStringProperty("_classname", payload.getClass().getName());
outgoing.setJMSType(payload.getClass().getName());
} else if (payload.getClass().isArray() && payload.getClass().getComponentType().equals(Byte.TYPE)) {
BytesMessage o = context.createBytesMessage();
o.writeBytes((byte[]) payload);
outgoing = o;
} else {
outgoing = context.createTextMessage(jsonMapping.toJson(payload));
outgoing.setJMSType(payload.getClass().getName());
outgoing.setStringProperty("_classname", payload.getClass().getName());
}

OutgoingJmsMessageMetadata metadata = message.getMetadata(OutgoingJmsMessageMetadata.class).orElse(null);
Destination actualDestination;
if (metadata != null) {
String correlationId = metadata.getCorrelationId();
Destination replyTo = metadata.getReplyTo();
Destination dest = metadata.getDestination();
int deliveryMode = metadata.getDeliveryMode();
String type = metadata.getType();
JmsProperties properties = metadata.getProperties();
if (correlationId != null) {
outgoing.setJMSCorrelationID(correlationId);
}
if (replyTo != null) {
outgoing.setJMSReplyTo(replyTo);
}
if (dest != null) {
outgoing.setJMSDestination(dest);
}
if (deliveryMode != -1) {
outgoing.setJMSDeliveryMode(deliveryMode);
}
if (type != null) {
outgoing.setJMSType(type);
}
if (type != null) {
outgoing.setJMSType(type);
try {
jakarta.jms.Message outgoing;
if (payload instanceof String || payload.getClass().isPrimitive() || isPrimitiveBoxed(payload.getClass())) {
outgoing = context.createTextMessage(payload.toString());
outgoing.setStringProperty("_classname", payload.getClass().getName());
outgoing.setJMSType(payload.getClass().getName());
} else if (payload.getClass().isArray() && payload.getClass().getComponentType().equals(Byte.TYPE)) {
BytesMessage o = context.createBytesMessage();
o.writeBytes((byte[]) payload);
outgoing = o;
} else {
outgoing = context.createTextMessage(jsonMapping.toJson(payload));
outgoing.setJMSType(payload.getClass().getName());
outgoing.setStringProperty("_classname", payload.getClass().getName());
}

if (properties != null) {
if (!(properties instanceof JmsPropertiesBuilder.OutgoingJmsProperties)) {
throw ex.illegalStateUnableToMapProperties(properties.getClass().getName());
OutgoingJmsMessageMetadata metadata = message.getMetadata(OutgoingJmsMessageMetadata.class).orElse(null);
Destination actualDestination;
if (metadata != null) {
String correlationId = metadata.getCorrelationId();
Destination replyTo = metadata.getReplyTo();
Destination dest = metadata.getDestination();
int deliveryMode = metadata.getDeliveryMode();
String type = metadata.getType();
JmsProperties properties = metadata.getProperties();
if (correlationId != null) {
outgoing.setJMSCorrelationID(correlationId);
}
if (replyTo != null) {
outgoing.setJMSReplyTo(replyTo);
}
JmsPropertiesBuilder.OutgoingJmsProperties op = ((JmsPropertiesBuilder.OutgoingJmsProperties) properties);
op.getProperties().forEach(p -> p.apply(outgoing));
if (dest != null) {
outgoing.setJMSDestination(dest);
}
if (deliveryMode != -1) {
outgoing.setJMSDeliveryMode(deliveryMode);
}
if (type != null) {
outgoing.setJMSType(type);
}
if (type != null) {
outgoing.setJMSType(type);
}

if (properties != null) {
if (!(properties instanceof JmsPropertiesBuilder.OutgoingJmsProperties)) {
throw ex.illegalStateUnableToMapProperties(properties.getClass().getName());
}
JmsPropertiesBuilder.OutgoingJmsProperties op = ((JmsPropertiesBuilder.OutgoingJmsProperties) properties);
op.getProperties().forEach(p -> p.apply(outgoing));
}
actualDestination = dest != null ? dest : this.destination;
} else {
actualDestination = this.destination;
}
actualDestination = dest != null ? dest : this.destination;
} else {
actualDestination = this.destination;
}

return dispatch(message, () -> producer.send(actualDestination, outgoing));
return dispatch(message, () -> producer.send(actualDestination, outgoing));
} catch (JMSException e) {
return Uni.createFrom().failure(new IllegalStateException(e));
}
}

private boolean isPrimitiveBoxed(Class<?> c) {
Expand All @@ -154,10 +151,11 @@ private boolean isPrimitiveBoxed(Class<?> c) {
|| c.equals(Long.class);
}

private CompletionStage<Message<?>> dispatch(Message<?> incoming, Runnable action) {
return CompletableFuture.runAsync(action, executor)
.thenCompose(x -> incoming.ack())
.thenApply(x -> incoming);
private Uni<? extends Message<?>> dispatch(Message<?> incoming, Runnable action) {
return Uni.createFrom().item(incoming)
.invoke(action)
.call(message -> Uni.createFrom().completionStage(incoming::ack))
.runSubscriptionOn(executor);
}

private Destination getDestination(JMSContext context, String name, String type) {
Expand Down

0 comments on commit e8339ac

Please sign in to comment.