Skip to content

Commit

Permalink
create temp queue by thread, automatic one reconnecting retry when se… (
Browse files Browse the repository at this point in the history
#33)

* create temp queue by thread, automatic one reconnecting retry when send a message

* fix duplicated code

* delete tempqueue explictly
  • Loading branch information
juancgalvis committed Oct 30, 2023
1 parent 8bf7166 commit 49f4088
Show file tree
Hide file tree
Showing 22 changed files with 243 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

public interface MQQueuesContainer {
void registerQueue(String key, Queue queue);
void registerToQueueGroup(String groupId, Queue queue);
void unregisterFromQueueGroup(String groupId, Queue queue);

Queue get(String alias);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ public class MQAutoconfiguration {
@ConditionalOnMissingBean(MQQueueCustomizer.class)
public MQQueueCustomizer defaultMQQueueCustomizer() {
return queue -> {
MQQueue customized = (MQQueue) queue;
customized.setProperty(WMQ_TARGET_CLIENT, "1");
customized.setProperty(WMQ_MQMD_READ_ENABLED, "true");
customized.setProperty(WMQ_MQMD_WRITE_ENABLED, "true");
customized.setPutAsyncAllowed(WMQ_PUT_ASYNC_ALLOWED_ENABLED);
customized.setReadAheadAllowed(WMQ_READ_AHEAD_ALLOWED_ENABLED);
if (queue instanceof MQQueue) {
MQQueue customized = (MQQueue) queue;
customized.setProperty(WMQ_TARGET_CLIENT, "1");
customized.setProperty(WMQ_MQMD_READ_ENABLED, "true");
customized.setProperty(WMQ_MQMD_WRITE_ENABLED, "true");
customized.setPutAsyncAllowed(WMQ_PUT_ASYNC_ALLOWED_ENABLED);
customized.setReadAheadAllowed(WMQ_READ_AHEAD_ALLOWED_ENABLED);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import co.com.bancolombia.commons.jms.api.MQRequestReply;
import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Mono;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import java.time.Duration;

@Log4j2
Expand Down Expand Up @@ -60,7 +61,11 @@ public Mono<Message> requestReply(MQMessageCreator messageCreator, Duration time
private MQMessageCreator defaultCreator(String message) {
return ctx -> {
Message jmsMessage = ctx.createTextMessage(message);
jmsMessage.setJMSReplyTo(container.get(replyQueue));
Queue queue = container.get(replyQueue);
jmsMessage.setJMSReplyTo(queue);
if (log.isInfoEnabled() && queue != null) {
log.info("Setting queue for reply to: {}", queue.getQueueName());
}
return jmsMessage;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import com.ibm.msg.client.jakarta.wmq.compat.jms.internal.JMSC;
import jakarta.jms.JMSContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -48,9 +50,7 @@ class InterfaceComponentProxyFactoryBeanTest {
@Mock
private MQHealthListener healthListener;
@Mock
private Connection connection;
@Mock
private Session session;
private JMSContext context;
@Mock
private TemporaryQueue queue;
@Mock
Expand Down Expand Up @@ -112,9 +112,8 @@ void shouldInstanceTheBean() throws JMSException {
return null;
});
// Listener mocks
when(connectionFactory.createConnection()).thenReturn(connection);
when(connection.createSession()).thenReturn(session);
when(session.createTemporaryQueue()).thenReturn(queue);
when(connectionFactory.createContext()).thenReturn(context);
when(context.createTemporaryQueue()).thenReturn(queue);
// Sender Mock
when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.empty());
// Act
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import jakarta.jms.JMSException;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import jakarta.jms.TemporaryQueue;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

Expand All @@ -23,38 +24,80 @@ public class MQContextListener extends AbstractJMSReconnectable<MQContextListene
private final MQListenerConfig config;
private final MQQueuesContainer container;
private final MQBrokerUtils utils;
private final boolean temporary;
private JMSConsumer consumer;
private JMSContext context;
private TemporaryQueue tempQueue;

@Override
protected String name() {
String[] parts = Thread.currentThread().getName().split("-");
String finalName = "mq-listener-fixed-queue-" + parts[parts.length - 1] + "[" + config.getQueue() + "]";
String finalName;
if (temporary) {
finalName = "mq-listener-tmp-queue-" + parts[parts.length - 1] + "[" + config.getTempQueueAlias() + "]";
} else {
finalName = "mq-listener-fixed-queue-" + parts[parts.length - 1] + "[" + config.getQueue() + "]";
}
Thread.currentThread().setName(finalName);
return finalName;
}

@Override
protected void disconnect() throws JMSException {
protected void disconnect() {
if (temporary) {
container.unregisterFromQueueGroup(config.getTempQueueAlias(), tempQueue);
}
if (consumer != null) {
consumer.close();
try {
consumer.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
if (tempQueue != null) {
try {
tempQueue.delete();
} catch (Exception ignored) {
// ignore because disconnection
}
}
if (context != null) {
context.close();
try {
context.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
}

@Override
protected MQContextListener connect() {
log.info("Starting listener {}", getProcess());
context = connectionFactory.createContext();
Destination destination = MQQueueUtils.setupFixedQueue(context, config);
consumer = context.createConsumer(destination);//NOSONAR
container.registerQueue(config.getQueue(), (Queue) destination);
utils.setQueueManager(context, (Queue) destination);
consumer.setMessageListener(listener);
context.setExceptionListener(this);
log.info("Listener {} started successfully", getProcess());
if (temporary) {
tempQueue = MQQueueUtils.setupTemporaryQueue(context, config);
container.registerToQueueGroup(config.getTempQueueAlias(), tempQueue);
consumer = context.createConsumer(tempQueue);//NOSONAR
consumer.setMessageListener(listener);
log.info("Listener {} started successfully with queue: {}", getProcess(), shortDestinationName());
} else {
Destination destination = MQQueueUtils.setupFixedQueue(context, config);
utils.setQueueManager(context, (Queue) destination);
container.registerQueue(config.getQueue(), (Queue) destination);
consumer = context.createConsumer(destination);//NOSONAR
consumer.setMessageListener(listener);
log.info("Listener {} started successfully with queue: {}", getProcess(), config.getQueue());
}
return this;
}

private String shortDestinationName() {
try {
return tempQueue.getQueueName().split("\\?")[0];
} catch (JMSException e) {
log.warn("Error getting temp queue name", e);
return "Error getting queue name";
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected String name() {
}

@Override
protected void disconnect() throws JMSException {
protected void disconnect() {
// do not disconnect to avoid another thread exceptions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class AbstractJMSReconnectable<T> implements ExceptionListener,

protected abstract T connect();

protected abstract void disconnect() throws JMSException;
protected abstract void disconnect();

protected abstract String name();

Expand All @@ -37,17 +37,25 @@ public abstract class AbstractJMSReconnectable<T> implements ExceptionListener,
public T call() {
this.process = name();
healthListener.onInit(process);
return start();
}

protected T start() {
try {
this.disconnect();
} catch (Exception e) {
log.info("Error disconnecting but ignore it because is in reconnection process", e);
}
try {
T result = connect();
markAsStarted();
return result;
} catch (JMSRuntimeException e) {
log.warn("JMSRuntimeException in {}", process, e);
} catch (Exception e) {
log.warn("Exception in {}", process, e);
throw e;
}
}


public void onException(JMSRuntimeException exception) {
onException(new JMSException(exception.getMessage(), exception.getErrorCode(),
new Exception(exception.getCause())));
Expand All @@ -71,15 +79,7 @@ private void reconnect() {
Thread.currentThread().setName("reconnection-" + process);
try {
log.warn("Starting reconnection for {}", process);
RetryableTask.runWithRetries(process, retryableConfig, () -> {
try {
this.disconnect();
} catch (Exception e) {
log.info("Error disconnecting but ignore it because is in reconnection process", e);
}
this.connect();
});
markAsStarted();
RetryableTask.runWithRetries(process, retryableConfig, this::start);
log.warn("Reconnection successful for {}", process);
} catch (JMSRuntimeException ex) {
log.warn("Reconnection error for {}", process, ex);
Expand Down
Loading

0 comments on commit 49f4088

Please sign in to comment.