Skip to content

Commit

Permalink
AG-227 - XAResource reopen test case
Browse files Browse the repository at this point in the history
  • Loading branch information
barreiro committed Feb 1, 2024
1 parent 342ee87 commit 6caa14d
Showing 1 changed file with 118 additions and 2 deletions.
120 changes: 118 additions & 2 deletions agroal-test/src/test/java/io/agroal/test/narayana/RecoveryTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package io.agroal.test.narayana;

import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule;
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
import com.arjuna.ats.jta.xa.XidImple;
import io.agroal.api.AgroalDataSource;
import io.agroal.api.AgroalDataSourceListener;
import io.agroal.api.configuration.supplier.AgroalDataSourceConfigurationSupplier;
Expand All @@ -13,6 +16,7 @@
import io.agroal.narayana.NarayanaTransactionIntegration;
import io.agroal.test.MockXAConnection;
import io.agroal.test.MockXADataSource;
import io.agroal.test.MockXAResource;
import org.jboss.tm.XAResourceRecovery;
import org.jboss.tm.XAResourceRecoveryRegistry;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -22,20 +26,31 @@
import org.junit.jupiter.api.Test;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import jakarta.transaction.TransactionManager;
import jakarta.transaction.TransactionSynchronizationRegistry;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.logging.Logger;

import static com.arjuna.ats.arjuna.recovery.RecoveryManager.DIRECT_MANAGEMENT;
import static io.agroal.test.AgroalTestGroup.FUNCTIONAL;
import static io.agroal.test.AgroalTestGroup.TRANSACTION;
import static io.agroal.test.MockDriver.deregisterMockDriver;
import static io.agroal.test.MockDriver.registerMockDriver;
import static java.util.logging.Logger.getLogger;
import static javax.transaction.xa.XAResource.TMENDRSCAN;
import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -140,12 +155,12 @@ void reuseCredentials() throws SQLException {

@Test
@DisplayName( "Close recovery connection" )
void closeRecoveryConnection() throws SQLException, InterruptedException {
void closeRecoveryConnection() throws Exception {
TransactionManager txManager = com.arjuna.ats.jta.TransactionManager.transactionManager();
TransactionSynchronizationRegistry txSyncRegistry = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple();

com.arjuna.ats.arjuna.common.recoveryPropertyManager.getRecoveryEnvironmentBean().setRecoveryBackoffPeriod( 1 );
RecoveryManager recoveryManager = RecoveryManager.manager( RecoveryManager.DIRECT_MANAGEMENT );
RecoveryManager recoveryManager = RecoveryManager.manager( DIRECT_MANAGEMENT );

RecoveryManagerService recoveryService = new RecoveryManagerService();
recoveryService.create();
Expand All @@ -168,6 +183,38 @@ void closeRecoveryConnection() throws SQLException, InterruptedException {
assertEquals( 2, RequiresCloseXADataSource.getClosed(), "Recovery connection not closed" );
}

@Test
@DisplayName( "XAResource reopen test" )
void xaResourceReopenTest() throws SQLException {
TransactionManager txManager = com.arjuna.ats.jta.TransactionManager.transactionManager();
TransactionSynchronizationRegistry txSyncRegistry = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple();

com.arjuna.ats.arjuna.common.recoveryPropertyManager.getRecoveryEnvironmentBean().setRecoveryBackoffPeriod( 1 );
RecoveryManager recoveryManager = RecoveryManager.manager( DIRECT_MANAGEMENT );
recoveryManager.removeAllModules( true );
recoveryManager.addModule( new DoublePassXARecoveryModule() );

RecoveryManagerService recoveryService = new RecoveryManagerService();
recoveryService.create();

AgroalDataSourceConfigurationSupplier configurationSupplier = new AgroalDataSourceConfigurationSupplier()
.connectionPoolConfiguration( cp -> cp
.maxSize( 1 )
.transactionIntegration( new NarayanaTransactionIntegration( txManager, txSyncRegistry, "", false, recoveryService ) )
.connectionFactoryConfiguration( cf -> cf
.connectionProviderClass( RequiresCloseXADataSource.class ) )
);

try ( AgroalDataSource dataSource = AgroalDataSource.from( configurationSupplier, new WarningsAgroalDatasourceListener() ) ) {
logger.info( "Starting recovery on DataSource " + dataSource );
recoveryManager.scan();

assertEquals( 2, RequiresCloseXADataSource.getClosed(), "Expected two connections to have been closed" );
} finally {
recoveryManager.terminate( true );
}
}

// --- //

private static class DriverAgroalDataSourceListener implements AgroalDataSourceListener {
Expand Down Expand Up @@ -323,15 +370,25 @@ static int getClosed() {
return closed;
}

public RequiresCloseXADataSource() {
closed = 0; // reset whenever a new instance is created
}

@Override
public XAConnection getXAConnection() throws SQLException {
logger.info( "Creating new XAConnection");
return new MyMockXAConnection();
}

private static class MyMockXAConnection implements MockXAConnection {
MyMockXAConnection() {
}

@Override
public XAResource getXAResource() throws SQLException {
return new NonEmptyXidsResource();
}

@Override
@SuppressWarnings( "ObjectToString" )
public void close() throws SQLException {
Expand All @@ -340,4 +397,63 @@ public void close() throws SQLException {
}
}
}

private static class NonEmptyXidsResource implements MockXAResource {
@Override
public Xid[] recover(int flags) throws XAException {
return flags == TMENDRSCAN ? null : new Xid[] { new XidImple() };
}
}

// --- //

private static class DoublePassXARecoveryModule extends XARecoveryModule {

private final Collection<XAResource> xaResources = new ArrayList<>();
private final Collection<XAResourceRecoveryHelper> xaResourceRecoveryHelpers = new ArrayList<>();


public void addXAResourceRecoveryHelper(XAResourceRecoveryHelper xaResourceRecoveryHelper) {
xaResourceRecoveryHelpers.add(xaResourceRecoveryHelper);
}

public synchronized void removeXAResourceRecoveryHelper(XAResourceRecoveryHelper xaResourceRecoveryHelper) {
xaResourceRecoveryHelpers.remove( xaResourceRecoveryHelper );
}

@Override
public synchronized void periodicWorkFirstPass() {
for ( XAResourceRecoveryHelper helper : xaResourceRecoveryHelpers ) {
try {
xaResources.addAll( List.of( helper.getXAResources() ) );
} catch ( Exception e ) {
fail( "Failure while obtaining XA Resource", e );
}
}
doPeriodicPass();
}

@Override
public synchronized void periodicWorkSecondPass() {
doPeriodicPass();
xaResources.clear();
}

/*
* Do a scan for each pass to verify that a new scan can execute after a previous one has completed
*/
private void doPeriodicPass() {
for ( XAResource xaResource : xaResources ) {
try {
Xid[] recoveryXids = xaResource.recover( TMSTARTRSCAN );
assertNotNull( recoveryXids, "Expecting some Xids" );

recoveryXids = xaResource.recover( TMENDRSCAN );
assertNull( recoveryXids, "Expecting empty Xids" );
} catch ( XAException e ) {
fail( "Failure while using XA Resource", e );
}
}
}
}
}

0 comments on commit 6caa14d

Please sign in to comment.