Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support JDBC ObjectStore in narayana-jta extension #31729

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/src/main/asciidoc/transaction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,17 @@ NOTE: The `event` object represents the transaction ID, and defines `toString()`
TIP: In listener methods, you can access more information about the transaction in progress by accessing the `TransactionManager`,
which is a CDI bean and can be ``@Inject``ed.

== Configuring transaction log to be stored in a DataSource

The Narayana project has the capability to store the transaction logs into a JDBC Datasource; this should be our recommendation for users needing transaction recovery capabilities, especially when running in volatile containers.

To enable this capability, you need to set `quarkus.transaction-manager.object-store.type` to `jdbc` explicitly. Also, you can specify a datasource name to be used for the transaction log storage by setting `quarkus.transaction-manager.object-store.datasource`. It will use the default datasource configuration if not specified.

If you enable `quarkus.transaction-manager.object-store.create-table`, the transaction log table will be created automatically if it does not exist.

NOTE: When enabling this capability, the transaction node identifier must be set through `quarkus.transaction-manager.node-name`.


== Why always having a transaction manager?

Does it work everywhere I want to?::
Expand Down
4 changes: 4 additions & 0 deletions extensions/narayana-jta/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-narayana-jta</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal-spi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import jakarta.annotation.Priority;
Expand All @@ -13,6 +16,8 @@
import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
import com.arjuna.ats.internal.arjuna.coordinator.CheckedActionFactoryImple;
import com.arjuna.ats.internal.arjuna.objectstore.ShadowNoFileLockStore;
import com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCImple_driver;
import com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore;
import com.arjuna.ats.internal.arjuna.recovery.AtomicActionExpiryScanner;
import com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule;
import com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner;
Expand All @@ -30,19 +35,23 @@
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
import com.arjuna.common.util.propertyservice.PropertiesFactory;

import io.quarkus.agroal.spi.JdbcDataSourceBuildItem;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.ContextRegistrationPhaseBuildItem;
import io.quarkus.arc.deployment.ContextRegistrationPhaseBuildItem.ContextConfiguratorBuildItem;
import io.quarkus.arc.deployment.CustomScopeBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanGizmoAdaptor;
import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsTest;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.annotations.Produce;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageSystemPropertyBuildItem;
Expand Down Expand Up @@ -76,6 +85,7 @@ public NativeImageSystemPropertyBuildItem nativeImageSystemPropertyBuildItem() {
@Record(RUNTIME_INIT)
@Produce(NarayanaInitBuildItem.class)
public void build(NarayanaJtaRecorder recorder,
CombinedIndexBuildItem indexBuildItem,
BuildProducer<AdditionalBeanBuildItem> additionalBeans,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<RuntimeInitializedClassBuildItem> runtimeInit,
Expand All @@ -95,21 +105,25 @@ public void build(NarayanaJtaRecorder recorder,
runtimeInit.produce(new RuntimeInitializedClassBuildItem(JTAActionStatusServiceXAResourceOrphanFilter.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(AtomicActionExpiryScanner.class.getName()));

reflectiveClass.produce(ReflectiveClassBuildItem.builder(JTAEnvironmentBean.class.getName(),
UserTransactionImple.class.getName(),
CheckedActionFactoryImple.class.getName(),
TransactionManagerImple.class.getName(),
TransactionSynchronizationRegistryImple.class.getName(),
ObjectStoreEnvironmentBean.class.getName(),
ShadowNoFileLockStore.class.getName(),
SocketProcessId.class.getName(),
AtomicActionRecoveryModule.class.getName(),
XARecoveryModule.class.getName(),
XAResourceRecord.class.getName(),
JTATransactionLogXAResourceOrphanFilter.class.getName(),
JTANodeNameXAResourceOrphanFilter.class.getName(),
JTAActionStatusServiceXAResourceOrphanFilter.class.getName(),
ExpiredTransactionStatusManagerScanner.class.getName()).build());
indexBuildItem.getIndex().getAllKnownSubclasses(JDBCImple_driver.class).stream()
.map(impl -> ReflectiveClassBuildItem.builder(impl.name().toString()).build())
.forEach(reflectiveClass::produce);
reflectiveClass.produce(ReflectiveClassBuildItem.builder(JTAEnvironmentBean.class,
UserTransactionImple.class,
CheckedActionFactoryImple.class,
TransactionManagerImple.class,
TransactionSynchronizationRegistryImple.class,
ObjectStoreEnvironmentBean.class,
ShadowNoFileLockStore.class,
JDBCStore.class,
SocketProcessId.class,
AtomicActionRecoveryModule.class,
XARecoveryModule.class,
XAResourceRecord.class,
JTATransactionLogXAResourceOrphanFilter.class,
JTANodeNameXAResourceOrphanFilter.class,
JTAActionStatusServiceXAResourceOrphanFilter.class,
ExpiredTransactionStatusManagerScanner.class).build());

AdditionalBeanBuildItem.Builder builder = AdditionalBeanBuildItem.builder();
builder.addBeanClass(TransactionalInterceptorSupports.class);
Expand All @@ -135,6 +149,18 @@ public void build(NarayanaJtaRecorder recorder,
recorder.setConfig(transactions);
}

@BuildStep
@Record(RUNTIME_INIT)
@Consume(NarayanaInitBuildItem.class)
@Consume(SyntheticBeansRuntimeInitBuildItem.class)
public void startRecoveryService(NarayanaJtaRecorder recorder,
List<JdbcDataSourceBuildItem> jdbcDataSourceBuildItems, TransactionManagerConfiguration transactions) {
Map<Boolean, String> namedDataSources = new HashMap<>();

jdbcDataSourceBuildItems.forEach(i -> namedDataSources.put(i.isDefault(), i.getName()));
recorder.startRecoveryService(transactions, namedDataSources);
}

@BuildStep(onlyIf = IsTest.class)
void testTx(BuildProducer<GeneratedBeanBuildItem> generatedBeanBuildItemBuildProducer,
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@
import jakarta.transaction.TransactionSynchronizationRegistry;
import jakarta.transaction.UserTransaction;

import org.jboss.logging.Logger;
import org.jboss.tm.JBossXATerminator;
import org.jboss.tm.XAResourceRecoveryRegistry;
import org.jboss.tm.usertx.UserTransactionRegistry;

import com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;

import io.quarkus.arc.Unremovable;

@Dependent
public class NarayanaJtaProducers {
private static final Logger log = Logger.getLogger(NarayanaJtaProducers.class);

@Produces
@ApplicationScoped
Expand All @@ -41,13 +42,8 @@ public jakarta.transaction.TransactionManager transactionManager() {

@Produces
@Singleton
public XAResourceRecoveryRegistry xaResourceRecoveryRegistry(TransactionManagerConfiguration config) {
RecoveryManagerService recoveryManagerService = new RecoveryManagerService();
if (config.enableRecovery) {
recoveryManagerService.create();
recoveryManagerService.start();
}
return recoveryManagerService;
public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() {
return QuarkusRecoveryService.getInstance();
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkus.narayana.jta.runtime;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.jboss.logging.Logger;
Expand All @@ -13,13 +16,15 @@
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore;
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
import com.arjuna.ats.jta.common.jtaPropertyManager;
import com.arjuna.common.internal.util.propertyservice.BeanPopulator;
import com.arjuna.common.util.propertyservice.PropertiesFactory;

import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.ConfigurationException;

@Recorder
public class NarayanaJtaRecorder {
Expand Down Expand Up @@ -72,12 +77,12 @@ public void disableTransactionStatusManager() {
}

public void setConfig(final TransactionManagerConfiguration transactions) {
BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class)
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "communicationStore")
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "stateStore")
.setObjectStoreDir(transactions.objectStoreDirectory);
List<String> objectStores = Arrays.asList(null, "communicationStore", "stateStore");
if (transactions.objectStore.type.equals(ObjectStoreType.File_System)) {
objectStores.forEach(name -> setObjectStoreDir(name, transactions));
} else if (transactions.objectStore.type.equals(ObjectStoreType.JDBC)) {
objectStores.forEach(name -> setJDBCObjectStore(name, transactions));
}
BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class)
.setRecoveryModuleClassNames(transactions.recoveryModules);
BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class)
Expand All @@ -86,15 +91,56 @@ public void setConfig(final TransactionManagerConfiguration transactions) {
.setXaResourceOrphanFilterClassNames(transactions.xaResourceOrphanFilters);
}

private void setObjectStoreDir(String name, TransactionManagerConfiguration config) {
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, name).setObjectStoreDir(config.objectStore.directory);
}

private void setJDBCObjectStore(String name, TransactionManagerConfiguration config) {
final ObjectStoreEnvironmentBean instance = BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, name);
instance.setObjectStoreType(JDBCStore.class.getName());
instance.setJdbcDataSource(new QuarkusDataSource(config.objectStore.datasource));
instance.setCreateTable(config.objectStore.createTable);
instance.setDropTable(config.objectStore.dropTable);
instance.setTablePrefix(config.objectStore.tablePrefix);
}

public void startRecoveryService(final TransactionManagerConfiguration transactions, Map<Boolean, String> dataSources) {
if (transactions.objectStore.type.equals(ObjectStoreType.JDBC)) {
if (transactions.objectStore.datasource.isEmpty()) {
dataSources.keySet().stream().filter(i -> i).findFirst().orElseThrow(
() -> new ConfigurationException(
"The Narayana JTA extension does not have a datasource configured,"
+ " so it defaults to the default datasource,"
+ " but that datasource is not configured."
+ " To solve this, either configure the default datasource,"
+ " referring to https://quarkus.io/guides/datasource for guidance,"
+ " or configure the datasource to use in the Narayana JTA extension "
+ " by setting property 'quarkus.transaction-manager.object-store.datasource' to the name of a configured datasource."));
} else {
String dsName = transactions.objectStore.datasource.get();
dataSources.values().stream().filter(i -> i.equals(dsName)).findFirst()
.orElseThrow(() -> new ConfigurationException(
"The Narayana JTA extension is configured to use the datasource '"
+ dsName
+ "' but that datasource is not configured."
+ " To solve this, either configure datasource " + dsName
+ " referring to https://quarkus.io/guides/datasource for guidance,"
+ " or configure another datasource to use in the Narayana JTA extension "
+ " by setting property 'quarkus.transaction-manager.object-store.datasource' to the name of a configured datasource."));
}
}
if (transactions.enableRecovery) {
QuarkusRecoveryService.getInstance().create();
QuarkusRecoveryService.getInstance().start();
}
}

public void handleShutdown(ShutdownContext context, TransactionManagerConfiguration transactions) {
context.addLastShutdownTask(new Runnable() {
@Override
public void run() {
if (transactions.enableRecovery) {
RecoveryManager.manager().terminate(true);
}
TransactionReaper.terminate(false);
context.addLastShutdownTask(() -> {
if (transactions.enableRecovery) {
RecoveryManager.manager().terminate(true);
}
TransactionReaper.terminate(false);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.quarkus.narayana.jta.runtime;

public enum ObjectStoreType {
File_System,
JDBC
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.narayana.jta.runtime;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Optional;
import java.util.logging.Logger;

import javax.sql.DataSource;

import jakarta.enterprise.inject.literal.NamedLiteral;

import io.quarkus.arc.Arc;

public class QuarkusDataSource implements DataSource {
private final Optional<String> dsName;
private volatile DataSource datasource;

public QuarkusDataSource(Optional<String> dsName) {
this.dsName = dsName;
}

private DataSource getDataSource() {
if (datasource == null) {
if (dsName.isEmpty()) {
datasource = Arc.container().instance(DataSource.class).get();
} else {
datasource = Arc.container().instance(DataSource.class, NamedLiteral.of(dsName.get())).get();
}
}

return datasource;
}

@Override
public Connection getConnection() throws SQLException {
return getDataSource().getConnection();
}

@Override
public Connection getConnection(final String user, final String passwd) throws SQLException {
return getDataSource().getConnection(user, passwd);
}

@Override
public PrintWriter getLogWriter() throws SQLException {
return getDataSource().getLogWriter();
}

@Override
public void setLogWriter(final PrintWriter writer) throws SQLException {
getDataSource().setLogWriter(writer);
}

@Override
public void setLoginTimeout(final int timeout) throws SQLException {
getDataSource().setLoginTimeout(timeout);
}

@Override
public int getLoginTimeout() throws SQLException {
return getDataSource().getLoginTimeout();
}

@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return getDataSource().getParentLogger();
}

@Override
public <T> T unwrap(final Class<T> aClass) throws SQLException {
return getDataSource().unwrap(aClass);
}

@Override
public boolean isWrapperFor(final Class<?> aClass) throws SQLException {
return getDataSource().isWrapperFor(aClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.narayana.jta.runtime;

import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;

public class QuarkusRecoveryService {
private static RecoveryManagerService recoveryManagerService;

public static RecoveryManagerService getInstance() {
if (recoveryManagerService == null) {
recoveryManagerService = new RecoveryManagerService();
}
return recoveryManagerService;
}

private QuarkusRecoveryService() {
}
}
Loading