Skip to content

Commit

Permalink
Fix quarkiverse#248 to add clearAll
Browse files Browse the repository at this point in the history
  • Loading branch information
zhfeng committed May 9, 2024
1 parent 1bf1a52 commit 42891ad
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.quarkiverse.messaginghub.pooled.jms;

import java.util.ArrayList;
import java.util.List;

import jakarta.jms.ConnectionFactory;

import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
Expand All @@ -11,6 +14,8 @@ public class PooledJmsWrapper {
private boolean transaction;
private PooledJmsRuntimeConfig pooledJmsRuntimeConfig;

private static List<JmsPoolConnectionFactory> poolConnectionFactories = new ArrayList<>();

public PooledJmsWrapper(boolean transaction, PooledJmsRuntimeConfig pooledJmsRuntimeConfig) {
this.transaction = transaction;
this.pooledJmsRuntimeConfig = pooledJmsRuntimeConfig;
Expand All @@ -23,22 +28,36 @@ public ConnectionFactory wrapConnectionFactory(ConnectionFactory connectionFacto

if (transaction && pooledJmsRuntimeConfig.transaction.equals(TransactionIntegration.XA)) {
if (XATransactionSupport.isEnabled()) {
return XATransactionSupport.getXAConnectionFactory(connectionFactory, pooledJmsRuntimeConfig);
JmsPoolConnectionFactory cf = XATransactionSupport.getXAConnectionFactory(connectionFactory,
pooledJmsRuntimeConfig);
poolConnectionFactories.add(cf);
return cf;
}

throw new IllegalStateException("XA Transaction support is not available");
} else if (transaction && pooledJmsRuntimeConfig.transaction.equals(TransactionIntegration.ENABLED)) {
if (LocalTransactionSupport.isEnabled()) {
return LocalTransactionSupport.getLocalTransactionConnectionFactory(connectionFactory, pooledJmsRuntimeConfig);
JmsPoolConnectionFactory cf = LocalTransactionSupport.getLocalTransactionConnectionFactory(connectionFactory,
pooledJmsRuntimeConfig);
poolConnectionFactories.add(cf);
return cf;
}

throw new IllegalStateException("Local TransactionManager support is not available");
} else {
return getConnectionFactory(connectionFactory);
JmsPoolConnectionFactory cf = getConnectionFactory(connectionFactory);
poolConnectionFactories.add(cf);
return cf;
}
}

public void clearAll() {
for (JmsPoolConnectionFactory cf : poolConnectionFactories) {
cf.clear();
}
}

private ConnectionFactory getConnectionFactory(ConnectionFactory connectionFactory) {
private JmsPoolConnectionFactory getConnectionFactory(ConnectionFactory connectionFactory) {
JmsPoolConnectionFactory poolConnectionFactory = new JmsPoolConnectionFactory();
pooledJmsRuntimeConfigureConnectionFactory(poolConnectionFactory, connectionFactory, pooledJmsRuntimeConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import jakarta.jms.ConnectionFactory;

import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;

import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig;

public class LocalTransactionSupport {
Expand All @@ -14,7 +16,7 @@ public static boolean isEnabled() {
return true;
}

public static ConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory,
public static JmsPoolConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory,
PooledJmsRuntimeConfig pooledJmsRuntimeConfig) {
return LocalTransactionSupportIndirect.getLocalTransactionConnectionFactory(connectionFactory, pooledJmsRuntimeConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import jakarta.jms.ConnectionFactory;
import jakarta.transaction.TransactionManager;

import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;

import io.quarkiverse.messaginghub.pooled.jms.JmsPoolLocalTransactionConnectionFactory;
import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig;
import io.quarkiverse.messaginghub.pooled.jms.PooledJmsWrapper;
Expand All @@ -13,7 +15,7 @@
*/
public class LocalTransactionSupportIndirect {

public static ConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory,
public static JmsPoolConnectionFactory getLocalTransactionConnectionFactory(ConnectionFactory connectionFactory,
PooledJmsRuntimeConfig pooledJmsRuntimeConfig) {
TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import jakarta.jms.ConnectionFactory;

import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;

import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig;

public class XATransactionSupport {
Expand All @@ -15,7 +17,7 @@ public static boolean isEnabled() {
return true;
}

public static ConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory,
public static JmsPoolConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory,
PooledJmsRuntimeConfig pooledJmsRuntimeConfig) {
return XATransactionSupportIndirect.getXAConnectionFactory(connectionFactory, pooledJmsRuntimeConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.narayana.jta.jms.JmsXAResourceRecoveryHelper;
import org.jboss.tm.XAResourceRecoveryRegistry;
import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
import org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory;

import io.quarkiverse.messaginghub.pooled.jms.PooledJmsRuntimeConfig;
Expand All @@ -17,7 +18,7 @@
*/
public class XATransactionSupportIndirect {

public static ConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory,
public static JmsPoolConnectionFactory getXAConnectionFactory(ConnectionFactory connectionFactory,
PooledJmsRuntimeConfig pooledJmsRuntimeConfig) {
TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get();

Expand Down

0 comments on commit 42891ad

Please sign in to comment.