diff --git a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/PooledJmsWrapper.java b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/PooledJmsWrapper.java index 1b52b56..06790f9 100644 --- a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/PooledJmsWrapper.java +++ b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/PooledJmsWrapper.java @@ -1,5 +1,8 @@ package io.quarkiverse.messaginghub.pooled.jms; +import java.util.ArrayList; +import java.util.List; + import jakarta.jms.ConnectionFactory; import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; @@ -11,6 +14,8 @@ public class PooledJmsWrapper { private boolean transaction; private PooledJmsRuntimeConfig pooledJmsRuntimeConfig; + private static List poolConnectionFactories = new ArrayList<>(); + public PooledJmsWrapper(boolean transaction, PooledJmsRuntimeConfig pooledJmsRuntimeConfig) { this.transaction = transaction; this.pooledJmsRuntimeConfig = pooledJmsRuntimeConfig; @@ -23,22 +28,36 @@ public ConnectionFactory wrapConnectionFactory(ConnectionFactory connectionFacto if (transaction && pooledJmsRuntimeConfig.transaction.equals(TransactionIntegration.XA)) { if (XATransactionSupport.isEnabled()) { - return XATransactionSupport.getXAConnectionFactory(connectionFactory, pooledJmsRuntimeConfig); + JmsPoolConnectionFactory cf = XATransactionSupport.getXAConnectionFactory(connectionFactory, + pooledJmsRuntimeConfig); + poolConnectionFactories.add(cf); + return cf; } throw new IllegalStateException("XA Transaction support is not available"); } else if (transaction && pooledJmsRuntimeConfig.transaction.equals(TransactionIntegration.ENABLED)) { if (LocalTransactionSupport.isEnabled()) { - return LocalTransactionSupport.getLocalTransactionConnectionFactory(connectionFactory, pooledJmsRuntimeConfig); + JmsPoolConnectionFactory cf = LocalTransactionSupport.getLocalTransactionConnectionFactory(connectionFactory, + pooledJmsRuntimeConfig); + poolConnectionFactories.add(cf); + return cf; } throw new IllegalStateException("Local TransactionManager support is not available"); } else { - return getConnectionFactory(connectionFactory); + JmsPoolConnectionFactory cf = getConnectionFactory(connectionFactory); + poolConnectionFactories.add(cf); + return cf; + } + } + + public void clearAll() { + for (JmsPoolConnectionFactory cf : poolConnectionFactories) { + cf.clear(); } } - private ConnectionFactory getConnectionFactory(ConnectionFactory connectionFactory) { + private JmsPoolConnectionFactory getConnectionFactory(ConnectionFactory connectionFactory) { JmsPoolConnectionFactory poolConnectionFactory = new JmsPoolConnectionFactory(); pooledJmsRuntimeConfigureConnectionFactory(poolConnectionFactory, connectionFactory, pooledJmsRuntimeConfig); diff --git a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupport.java b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupport.java index d5a3527..83c9d09 100644 --- a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupport.java +++ b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupport.java @@ -2,6 +2,8 @@ import jakarta.jms.ConnectionFactory; +import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; + import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig; public class LocalTransactionSupport { @@ -14,7 +16,7 @@ public static boolean isEnabled() { return true; } - public static ConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory, + public static JmsPoolConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory, PooledJmsRuntimeConfig pooledJmsRuntimeConfig) { return LocalTransactionSupportIndirect.getLocalTransactionConnectionFactory(connectionFactory, pooledJmsRuntimeConfig); } diff --git a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupportIndirect.java b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupportIndirect.java index d7ba994..b5e7bde 100644 --- a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupportIndirect.java +++ b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/LocalTransactionSupportIndirect.java @@ -3,6 +3,8 @@ import jakarta.jms.ConnectionFactory; import jakarta.transaction.TransactionManager; +import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; + import io.quarkiverse.messaginghub.pooled.jms.JmsPoolLocalTransactionConnectionFactory; import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig; import io.quarkiverse.messaginghub.pooled.jms.PooledJmsWrapper; @@ -13,7 +15,7 @@ */ public class LocalTransactionSupportIndirect { - public static ConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory, + public static JmsPoolConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory, PooledJmsRuntimeConfig pooledJmsRuntimeConfig) { TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get(); diff --git a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupport.java b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupport.java index 3a7b393..6d4fb43 100644 --- a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupport.java +++ b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupport.java @@ -2,6 +2,8 @@ import jakarta.jms.ConnectionFactory; +import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; + import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig; public class XATransactionSupport { @@ -15,7 +17,7 @@ public static boolean isEnabled() { return true; } - public static ConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory, + public static JmsPoolConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory, PooledJmsRuntimeConfig pooledJmsRuntimeConfig) { return XATransactionSupportIndirect.getXAConnectionFactory(connectionFactory, pooledJmsRuntimeConfig); } diff --git a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupportIndirect.java b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupportIndirect.java index 3acaecf..695cf5a 100644 --- a/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupportIndirect.java +++ b/runtime/src/main/java/io/quarkiverse/messaginghub/pooled/jms/transaction/XATransactionSupportIndirect.java @@ -6,6 +6,7 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.narayana.jta.jms.JmsXAResourceRecoveryHelper; import org.jboss.tm.XAResourceRecoveryRegistry; +import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; import org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory; import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig; @@ -17,7 +18,7 @@ */ public class XATransactionSupportIndirect { - public static ConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory, + public static JmsPoolConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory, PooledJmsRuntimeConfig pooledJmsRuntimeConfig) { TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get();