From 77d2632b5e37b1122bd3c4c4e2997b3492358848 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Mon, 1 Nov 2021 11:50:47 +1100 Subject: [PATCH] Add QuarkusTransaction API as a simplified TX API This API allows for simple static calls with unchecked exceptions and safer semantics. Fixes #979 --- docs/src/main/asciidoc/transaction.adoc | 97 ++++++- .../jta/deployment/NarayanaJtaProcessor.java | 1 + .../quarkus/QuarkusTransactionTest.java | 252 +++++++++++++++++ .../io/quarkus/narayana/jta/BeginOptions.java | 38 +++ .../narayana/jta/QuarkusTransaction.java | 169 ++++++++++++ .../jta/QuarkusTransactionException.java | 23 ++ .../narayana/jta/QuarkusTransactionImpl.java | 259 ++++++++++++++++++ .../jta/RequestScopedTransaction.java | 100 +++++++ .../io/quarkus/narayana/jta/RunOptions.java | 120 ++++++++ .../jta/runtime/NarayanaJtaRecorder.java | 7 +- .../TransactionManagerConfiguration.java | 3 +- 11 files changed, 1061 insertions(+), 8 deletions(-) create mode 100644 extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java create mode 100644 extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/BeginOptions.java create mode 100644 extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransaction.java create mode 100644 extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionException.java create mode 100644 extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionImpl.java create mode 100644 extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RequestScopedTransaction.java create mode 100644 extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RunOptions.java diff --git a/docs/src/main/asciidoc/transaction.adoc b/docs/src/main/asciidoc/transaction.adoc index b1ce7e3a07b17..dbd33a5132bc4 100644 --- a/docs/src/main/asciidoc/transaction.adoc +++ b/docs/src/main/asciidoc/transaction.adoc @@ -30,7 +30,8 @@ Add the following to your `pom.xml`: == Starting and stopping transactions: defining your boundaries -You can define your transaction boundaries the easy way, or the less easy way :) +You can define your transaction boundaries either declarativly with `@Transactional` or programmatically with `QuarkusTransaction`. You can also use +the JTA `UserTransaction` API directly, however this is less user friendly than `QuarkusTransaction`. === Declarative approach @@ -142,7 +143,99 @@ work is really done, and not just until the reactive method returns. If you need to propagate your transaction context across your reactive pipeline, please see the xref:context-propagation.adoc[Context Propagation guide]. -=== API approach +=== Programmatic Approach + +You can use static methods on `QuarkusTransaction` to define transaction boundaries. This provides two different options, +a functional approach that allows you to run a lambda within the scope of a transaction, or by using explicit `begin`, +`commit` and `rollback` methods. + +[source,java] +---- +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.narayana.jta.RunOptions; + +public class TransactionExample { + + public void beginExample() { + QuarkusTransaction.begin(); + //do work + QuarkusTransaction.commit(); + + QuarkusTransaction.begin(QuarkusTransaction.beginOptions() + .timeout(10)); + //do work + QuarkusTransaction.rollback(); + } + + public void lambdaExample() { + QuarkusTransaction.run(() -> { + //do work + }); + + + int result = QuarkusTransaction.call(QuarkusTransaction.runOptions() + .timeout(10) + .exceptionHandler((throwable) -> { + if (throwable instanceof SomeException) { + return RunOptions.ExceptionResult.COMMIT; + } + return RunOptions.ExceptionResult.ROLLBACK; + }) + .semantic(RunOptions.Semantic.SUSPEND_EXISTING), () -> { + //do work + return 0; + }); + } +} +---- + +The above example shows a few different ways the API can be used. The first method simply calls begin, does some work and commits it. +This created transaction is tied to the CDI request scope, so if it is still active when the request scope is destroyed then it will +be automatically rolled back. This removes the need to explicitly catch exceptions and call `rollback`, and acts as a safety net +against inadvertent transaction leaks, however it does mean that this can only be used when the request scope is active. The second +example in the method calls begin with a timeout option, and then rolls back the transaction. + +The second example shows the use of lambda scoped transactions, the first just runs a `Runnable` within a transaction, the second, +runs `Callable` with some specific options. In particular the `exceptionHandler` method can be used to control if the transaction +is rolled back or not on exception, and the `semantic` method controls the behaviour if an existing transaction is already started. + +The following semantics are supported: + + +DISALLOW_EXISTING:: + + If a transaction is already associated with the current thread a `QuarkusTransactionException` will be thrown, + otherwise a new transaction is started, and follows all the normal lifecycle rules. + +JOIN_EXISTING:: + +If no transaction is active then a new transaction will be started, and committed when the method ends. +If an exception is thrown the exception handler registered by `#exceptionHandler(Function)` will be called to +decide if the TX should be committed or rolled back. +If an existing transaction is active then the method is run in the context of the existing transaction. If an +exception is thrown the exception handler will be called, however +a result of `ExceptionResult#ROLLBACK` will result in the TX marked as rollback only, while a result of +`ExceptionResult#COMMIT` will result in no action being taken. + +REQUIRE_NEW:: + +This is the default semantic. +If an existing transaction is already associated with the current thread then the transaction is suspended, and +resumed once +the current transaction is complete. +A new transaction is started after the existing transaction is suspended, and follows all the normal lifecycle rules. + +SUSPEND_EXISTING:: + +If no transaction is active then this semantic is basically a no-op. +If a transaction is active then it is suspended, and resumed after the task is run. +The exception handler will never be consulted when this semantic is in use, specifying both an exception handler and +this semantic is considered an error. +This semantic allows for code to easily be run outside the scope of a transaction. + + + +=== Legacy API approach The less easy way is to inject a `UserTransaction` and use the various transaction demarcation methods. diff --git a/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java b/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java index 3f43375a4a2eb..397f771bdfc79 100644 --- a/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java +++ b/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java @@ -74,6 +74,7 @@ public void build(NarayanaJtaRecorder recorder, feature.produce(new FeatureBuildItem(Feature.NARAYANA_JTA)); additionalBeans.produce(new AdditionalBeanBuildItem(NarayanaJtaProducers.class)); additionalBeans.produce(new AdditionalBeanBuildItem(CDIDelegatingTransactionManager.class)); + additionalBeans.produce(AdditionalBeanBuildItem.unremovableOf("io.quarkus.narayana.jta.RequestScopedTransaction")); runtimeInit.produce(new RuntimeInitializedClassBuildItem( "com.arjuna.ats.internal.jta.resources.arjunacore.CommitMarkableResourceRecord")); diff --git a/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java b/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java new file mode 100644 index 0000000000000..83b77c91ea8a0 --- /dev/null +++ b/extensions/narayana-jta/deployment/src/test/java/io/quarkus/narayana/quarkus/QuarkusTransactionTest.java @@ -0,0 +1,252 @@ +package io.quarkus.narayana.quarkus; + +import static io.quarkus.narayana.jta.QuarkusTransaction.beginOptions; +import static io.quarkus.narayana.jta.QuarkusTransaction.runOptions; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.enterprise.context.ContextNotActiveException; +import javax.enterprise.context.control.ActivateRequestContext; +import javax.inject.Inject; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Arc; +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.narayana.jta.QuarkusTransactionException; +import io.quarkus.narayana.jta.RunOptions; +import io.quarkus.test.QuarkusUnitTest; + +public class QuarkusTransactionTest { + + @Inject + TransactionManager transactionManager; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)); + + @Test + public void testBeginRequestScopeNotActive() { + Assertions.assertThrows(ContextNotActiveException.class, QuarkusTransaction::begin); + } + + @Test + @ActivateRequestContext + public void testBeginCommit() { + QuarkusTransaction.begin(); + var sync = register(); + QuarkusTransaction.commit(); + Assertions.assertEquals(Status.STATUS_COMMITTED, sync.completionStatus); + } + + @Test + @ActivateRequestContext + public void testBeginRollback() { + QuarkusTransaction.begin(); + var sync = register(); + QuarkusTransaction.rollback(); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus); + } + + @Test + @ActivateRequestContext + public void testBeginSetRollbackOnly() { + QuarkusTransaction.begin(); + var sync = register(); + QuarkusTransaction.setRollbackOnly(); + Assertions.assertThrows(QuarkusTransactionException.class, QuarkusTransaction::commit); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus); + } + + @Test + @ActivateRequestContext + public void testBeginTimeout() throws InterruptedException { + QuarkusTransaction.begin(beginOptions().timeout(1)); + var sync = register(); + Thread.sleep(1200); + Assertions.assertThrows(QuarkusTransactionException.class, QuarkusTransaction::commit); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus); + } + + @Test + @ActivateRequestContext + public void testBeginSuspendExistingFalse() { + QuarkusTransaction.begin(); + var sync = register(); + Assertions.assertThrows(QuarkusTransactionException.class, QuarkusTransaction::begin); + Assertions.assertTrue(QuarkusTransaction.isActive()); + QuarkusTransaction.commit(); + Assertions.assertEquals(Status.STATUS_COMMITTED, sync.completionStatus); + } + + @Test + public void testBeginRollbackOnRequestScopeEnd() { + var context = Arc.container().requestContext(); + context.activate(); + TestSync sync = null; + try { + QuarkusTransaction.begin(); + sync = register(); + } finally { + context.terminate(); + } + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus); + } + + @Test + public void testBeginCommitOnRequestScopeEnd() { + var context = Arc.container().requestContext(); + context.activate(); + TestSync sync = null; + try { + QuarkusTransaction.begin(beginOptions().commitOnRequestScopeEnd()); + sync = register(); + } finally { + context.terminate(); + } + Assertions.assertEquals(Status.STATUS_COMMITTED, sync.completionStatus); + } + + @Test + public void testCallCommit() { + Assertions.assertEquals(Status.STATUS_COMMITTED, QuarkusTransaction.call(this::register).completionStatus); + } + + @Test + public void testCallRollback() { + AtomicReference sync = new AtomicReference<>(); + Assertions.assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.call(() -> { + sync.set(register()); + QuarkusTransaction.rollback(); + return null; + })); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus); + } + + @Test + public void testCallRollbackOnly() { + AtomicReference sync = new AtomicReference<>(); + Assertions.assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.call(() -> { + sync.set(register()); + QuarkusTransaction.setRollbackOnly(); + return null; + })); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus); + } + + @Test + public void testCallTimeout() { + AtomicReference sync = new AtomicReference<>(); + Assertions.assertThrows(QuarkusTransactionException.class, + () -> QuarkusTransaction.call(runOptions().timeout(1), () -> { + sync.set(register()); + Thread.sleep(1200); + return null; + })); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus); + } + + @Test + public void testCallException() { + AtomicReference sync = new AtomicReference<>(); + Assertions.assertThrows(IllegalArgumentException.class, () -> QuarkusTransaction.call(() -> { + sync.set(register()); + throw new IllegalArgumentException("foo"); + })); + Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus); + } + + @Test + public void testCallExceptionHandler() { + AtomicReference sync = new AtomicReference<>(); + Assertions.assertThrows(IllegalArgumentException.class, + () -> QuarkusTransaction.call(runOptions().exceptionHandler((e) -> RunOptions.ExceptionResult.COMMIT), () -> { + sync.set(register()); + throw new IllegalArgumentException("foo"); + })); + Assertions.assertEquals(Status.STATUS_COMMITTED, sync.get().completionStatus); + } + + @Test + public void testCallSuspendExisting() { + QuarkusTransaction.call(runOptions().semantic(RunOptions.Semantic.SUSPEND_EXISTING), () -> { + Assertions.assertFalse(QuarkusTransaction.isActive()); + return null; + }); + } + + @Test + @ActivateRequestContext + public void testCallDisallowExisting() { + RunOptions options = runOptions().semantic(RunOptions.Semantic.DISALLOW_EXISTING); + Assertions.assertEquals(Status.STATUS_COMMITTED, QuarkusTransaction.call(options, this::register).completionStatus); + QuarkusTransaction.begin(); + Assertions.assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.call(options, this::register)); + } + + @Test + @ActivateRequestContext + public void testCallRequiresNew() throws SystemException { + RunOptions options = runOptions().semantic(RunOptions.Semantic.REQUIRE_NEW); + QuarkusTransaction.begin(); + var tx = transactionManager.getTransaction(); + QuarkusTransaction.call(options, () -> { + Assertions.assertTrue(QuarkusTransaction.isActive()); + if (tx == transactionManager.getTransaction()) { + throw new RuntimeException("Running in same transaction"); + } + return null; + }); + } + + @Test + @ActivateRequestContext + public void testCallJoinExisting() throws SystemException { + RunOptions options = runOptions().semantic(RunOptions.Semantic.JOIN_EXISTING); + QuarkusTransaction.begin(); + var tx = transactionManager.getTransaction(); + QuarkusTransaction.call(options, () -> { + Assertions.assertTrue(QuarkusTransaction.isActive()); + if (tx != transactionManager.getTransaction()) { + throw new RuntimeException("Running in different transaction"); + } + return null; + }); + } + + TestSync register() { + TestSync t = new TestSync(); + try { + transactionManager.getTransaction().registerSynchronization(t); + } catch (RollbackException | SystemException e) { + throw new RuntimeException(e); + } + return t; + } + + static class TestSync implements Synchronization { + + int completionStatus = -1; + + @Override + public void beforeCompletion() { + + } + + @Override + public void afterCompletion(int status) { + this.completionStatus = status; + } + } + +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/BeginOptions.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/BeginOptions.java new file mode 100644 index 0000000000000..767cb71d78a95 --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/BeginOptions.java @@ -0,0 +1,38 @@ +package io.quarkus.narayana.jta; + +/** + * Builder interface to allow a transaction to be customized, including things like timeout and semantics when an existing + * transaction is present. + */ +public class BeginOptions { + + boolean commitOnRequestScopeEnd; + int timeout = 0; + + /** + * If this method is called the transaction will be automatically committed when the request scope is destroyed, instead of + * being rolled back. + *

+ * + * @return These options + */ + public BeginOptions commitOnRequestScopeEnd() { + commitOnRequestScopeEnd = true; + return this; + } + + /** + * Sets the transaction timeout for transactions created by this builder. A value of zero refers to the system default. + * + * @param seconds The timeout in seconds + * @return This builder + * @throws IllegalArgumentException If seconds is negative + */ + public BeginOptions timeout(int seconds) { + if (seconds < 0) { + throw new IllegalArgumentException("seconds cannot be negative"); + } + this.timeout = seconds; + return this; + } +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransaction.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransaction.java new file mode 100644 index 0000000000000..b1691699aacf7 --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransaction.java @@ -0,0 +1,169 @@ +package io.quarkus.narayana.jta; + +import java.util.concurrent.Callable; + +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transactional; + +import com.arjuna.ats.jta.UserTransaction; + +import io.quarkus.arc.Arc; + +/** + * A simplified transaction interface. While broadly covering the same use cases as {@link javax.transaction.UserTransaction}, + * this class is designed to be easier to use. The main features it offers over {@code UserTransaction} are: + * + *

    + *
  • No Checked Exceptions: All underlying checked exceptions are wrapped in an unchecked + * {@link QuarkusTransactionException}.
  • + *
  • No Transaction Leaks: Transactions are tied to the request scope, if the scope is destroyed before the transaction + * is committed the transaction is rolled back. Note that this means this can only currently be used when the request scope is + * active.
  • + *
  • Per Transaction Timeouts: {@link RunOptions#timeout(int)} can be used to set the new transactions + * timeout, without affecting the per thread default.
  • + *
  • Lambda Style Transactions: {@link Runnable} and {@link Callable} instances can be run inside the scope of a new + * transaction.
  • + *
+ *

+ * Note that any checked exception will be wrapped by a {@link QuarkusTransactionException}, while unchecked exceptions are + * allowed to propagate unchanged. + */ +public interface QuarkusTransaction { + + /** + * Starts a transaction, using the system default timeout. + *

+ * This transaction will be tied to the current request scope, if it is not committed when the scope is destroyed then it + * will be rolled back to prevent transaction leaks. + */ + static void begin() { + begin(beginOptions()); + } + + /** + * Starts a transaction, using the system default timeout. + *

+ * This transaction will be tied to the current request scope, if it is not committed when the scope is destroyed then it + * will be rolled back to prevent transaction leaks. + * + * @param options Options that apply to the new transaction + */ + static void begin(BeginOptions options) { + RequestScopedTransaction tx = Arc.container().instance(RequestScopedTransaction.class).get(); + tx.begin(options); + } + + /** + * Commits the current transaction. + */ + static void commit() { + QuarkusTransactionImpl.commit(); + } + + /** + * Rolls back the current transaction. + */ + static void rollback() { + QuarkusTransactionImpl.rollback(); + } + + /** + * If a transaction is active. + * + * @return {@code true} if the transaction is active. + */ + static boolean isActive() { + try { + return UserTransaction.userTransaction().getStatus() != Status.STATUS_NO_TRANSACTION; + } catch (SystemException e) { + throw new QuarkusTransactionException(e); + } + } + + /** + * If the transaction is rollback only + * + * @return If the transaction has been marked for rollback + */ + static boolean isRollbackOnly() { + try { + return UserTransaction.userTransaction().getStatus() == Status.STATUS_MARKED_ROLLBACK; + } catch (SystemException e) { + throw new QuarkusTransactionException(e); + } + } + + /** + * Marks the transaction as rollback only. Operations can still be carried out, however the transaction cannot be + * successfully committed. + */ + static void setRollbackOnly() { + QuarkusTransactionImpl.setRollbackOnly(); + } + + /** + * Runs a task in a new transaction with the default timeout. This defaults to {@link Transactional.TxType#REQUIRES_NEW} + * semantics, however alternate semantics can be requested using {@link #run(RunOptions, Runnable)}. + * + * @param task The task to run in a transaction + */ + static void run(Runnable task) { + run(runOptions(), task); + } + + /** + * Runs a task in a new transaction with the default timeout. This defaults to {@link Transactional.TxType#REQUIRES_NEW} + * semantics, however alternate semantics can be specified using the {@code options} parameter. + * + * @param options Options that apply to the new transaction + * @param task The task to run in a transaction + */ + static void run(RunOptions options, Runnable task) { + call(options, new Callable() { + @Override + public Object call() throws Exception { + task.run(); + return null; + } + }); + } + + /** + * Calls a task in a new transaction with the default timeout. This defaults to {@link Transactional.TxType#REQUIRES_NEW} + * semantics, however alternate semantics can be requested using {@link #call(RunOptions, Callable)}. + *

+ * If the task throws a checked exception it will be wrapped with a {@link QuarkusTransactionException} + * + * @param task The task to run in a transaction + */ + static T call(Callable task) { + return call(runOptions(), task); + } + + /** + * Calls a task in a new transaction with the default timeout. This defaults to {@link Transactional.TxType#REQUIRES_NEW} + * semantics, however alternate semantics can be requested using {@link #call(RunOptions, Callable)}. + *

+ * If the task throws a checked exception it will be wrapped with a {@link QuarkusTransactionException} + * + * @param task The task to run in a transaction + */ + static T call(RunOptions options, Callable task) { + return QuarkusTransactionImpl.call(options, task); + } + + /** + * @return a new RunOptions + */ + static RunOptions runOptions() { + return new RunOptions(); + } + + /** + * @return a new BeginOptions + */ + static BeginOptions beginOptions() { + return new BeginOptions(); + } +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionException.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionException.java new file mode 100644 index 0000000000000..3917a44d1f532 --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionException.java @@ -0,0 +1,23 @@ +package io.quarkus.narayana.jta; + +/** + * Runtime exception that is used to wrap any checked exceptions thrown from the {@link QuarkusTransaction} methods. + */ +public class QuarkusTransactionException extends RuntimeException { + + public QuarkusTransactionException(Throwable cause) { + super(cause); + } + + public QuarkusTransactionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public QuarkusTransactionException(String message) { + super(message); + } + + public QuarkusTransactionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionImpl.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionImpl.java new file mode 100644 index 0000000000000..a1a4e77f3813d --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/QuarkusTransactionImpl.java @@ -0,0 +1,259 @@ +package io.quarkus.narayana.jta; + +import java.util.concurrent.Callable; + +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.UserTransaction; + +import org.jboss.logging.Logger; + +import io.quarkus.arc.Arc; +import io.quarkus.narayana.jta.runtime.TransactionManagerConfiguration; + +class QuarkusTransactionImpl { + + private static final Logger log = Logger.getLogger(QuarkusTransactionImpl.class); + private static TransactionManager cachedTransactionManager; + private static UserTransaction cachedUserTransaction; + + public static T call(RunOptions options, Callable task) { + switch (options.semantic) { + case REQUIRE_NEW: + return callRequireNew(options, task); + case DISALLOW_EXISTING: + return callDisallowExisting(options, task); + case JOIN_EXISTING: + return callJoinExisting(options, task); + case SUSPEND_EXISTING: + return callSuspendExisting(options, task); + } + throw new IllegalArgumentException("Unkown semantic"); + } + + private static T callSuspendExisting(RunOptions options, Callable task) { + if (options.exceptionHandler != null) { + throw new IllegalStateException("Cannot specify both an exception handler and SUSPEND_EXISTING"); + } + TransactionManager transactionManager = getTransactionManager(); + Transaction transaction = null; + try { + if (isTransactionActive()) { + transaction = transactionManager.suspend(); + } + T result = task.call(); + if (transaction != null) { + try { + transactionManager.resume(transaction); + transaction = null; + } catch (Exception e) { + throw new QuarkusTransactionException(e); + } + } + return result; + } catch (Exception e) { + if (transaction != null) { + try { + transactionManager.resume(transaction); + } catch (Exception ex) { + e.addSuppressed(ex); + } + } + if (e instanceof QuarkusTransactionException) { + throw (QuarkusTransactionException) e; + } + throw new QuarkusTransactionException(e); + } + } + + private static T callJoinExisting(RunOptions options, Callable task) { + if (isTransactionActive()) { + return callInTheirTx(options, task); + } else { + return callInOurTx(options, task); + } + } + + private static boolean isTransactionActive() { + try { + int status = getUserTransaction().getStatus(); + return status != Status.STATUS_NO_TRANSACTION; + } catch (SystemException e) { + throw new QuarkusTransactionException(e); + } + } + + private static T callDisallowExisting(RunOptions options, Callable task) { + if (isTransactionActive()) { + throw new QuarkusTransactionException(new IllegalStateException("Transaction already active")); + } + return callInOurTx(options, task); + } + + private static T callRequireNew(RunOptions options, Callable task) { + TransactionManager transactionManager = getTransactionManager(); + Transaction transaction = null; + try { + if (isTransactionActive()) { + transaction = transactionManager.suspend(); + } + T result = callInOurTx(options, task); + if (transaction != null) { + try { + transactionManager.resume(transaction); + transaction = null; + } catch (Exception e) { + throw new QuarkusTransactionException(e); + } + } + return result; + } catch (Exception e) { + if (transaction != null) { + try { + transactionManager.resume(transaction); + } catch (Exception ex) { + e.addSuppressed(ex); + } + } + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new QuarkusTransactionException(e); + } + } + + private static T callInOurTx(RunOptions options, Callable task) { + begin(options); + try { + T ret; + try { + ret = task.call(); + } catch (Throwable t) { + RunOptions.ExceptionResult handling = RunOptions.ExceptionResult.ROLLBACK; + if (options.exceptionHandler != null) { + handling = options.exceptionHandler.apply(t); + } + if (handling == RunOptions.ExceptionResult.ROLLBACK) { + getUserTransaction().rollback(); + } else { + getUserTransaction().commit(); + } + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new QuarkusTransactionException(t); + } + } + try { + getUserTransaction().commit(); + } catch (Throwable t) { + throw new QuarkusTransactionException(t); + } + return ret; + } catch (SystemException | RollbackException | HeuristicMixedException | HeuristicRollbackException t) { + try { + getUserTransaction().rollback(); + } catch (Throwable e) { + t.addSuppressed(e); + } + throw new QuarkusTransactionException(t); + } + } + + private static T callInTheirTx(RunOptions options, Callable task) { + try { + T ret; + try { + ret = task.call(); + } catch (Throwable t) { + RunOptions.ExceptionResult handling = RunOptions.ExceptionResult.ROLLBACK; + if (options.exceptionHandler != null) { + handling = options.exceptionHandler.apply(t); + } + if (handling == RunOptions.ExceptionResult.ROLLBACK) { + getUserTransaction().setRollbackOnly(); + } + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new QuarkusTransactionException(t); + } + } + return ret; + } catch (SystemException t) { + try { + getUserTransaction().rollback(); + } catch (Throwable e) { + t.addSuppressed(e); + } + throw new QuarkusTransactionException(t); + } + } + + private static void begin(RunOptions options) { + int timeout = options != null ? options.timeout : 0; + try { + if (timeout > 0) { + getUserTransaction().setTransactionTimeout(timeout); + } + getUserTransaction().begin(); + } catch (NotSupportedException | SystemException e) { + throw new QuarkusTransactionException(e); + } finally { + if (timeout > 0) { + try { + getUserTransaction().setTransactionTimeout( + (int) Arc.container().instance(TransactionManagerConfiguration.class) + .get().defaultTransactionTimeout.toSeconds()); + } catch (SystemException e) { + log.error("Failed to reset transaction timeout", e); + } + } + } + } + + static void rollback() { + try { + getUserTransaction().rollback(); + } catch (SystemException e) { + throw new QuarkusTransactionException(e); + } + } + + static void commit() { + try { + getUserTransaction().commit(); + } catch (SystemException | RollbackException | HeuristicMixedException | HeuristicRollbackException e) { + throw new QuarkusTransactionException(e); + } + } + + static void setRollbackOnly() { + try { + getUserTransaction().setRollbackOnly(); + } catch (SystemException e) { + throw new RuntimeException(e); + } + } + + private static javax.transaction.UserTransaction getUserTransaction() { + if (cachedUserTransaction == null) { + return cachedUserTransaction = com.arjuna.ats.jta.UserTransaction.userTransaction(); + } + return cachedUserTransaction; + } + + private static TransactionManager getTransactionManager() { + if (cachedTransactionManager == null) { + return cachedTransactionManager = com.arjuna.ats.jta.TransactionManager + .transactionManager(); + } + return cachedTransactionManager; + } +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RequestScopedTransaction.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RequestScopedTransaction.java new file mode 100644 index 0000000000000..5d0ddfff4c54e --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RequestScopedTransaction.java @@ -0,0 +1,100 @@ +package io.quarkus.narayana.jta; + +import java.util.function.Function; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; +import javax.transaction.NotSupportedException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.UserTransaction; + +import org.jboss.logging.Logger; + +import io.quarkus.narayana.jta.runtime.TransactionManagerConfiguration; + +/** + * A request scoped representation of a transaction. + *

+ * If the transaction is not committed it will be automatically rolled back when the request scope is destroyed. + */ +@RequestScoped +class RequestScopedTransaction { + + private static final Logger log = Logger.getLogger(RequestScopedTransaction.class); + public static final Function DEFAULT_HANDLER = ( + e) -> RunOptions.ExceptionResult.ROLLBACK; + + private final UserTransaction userTransaction; + private final TransactionManager transactionManager; + private final TransactionManagerConfiguration transactionManagerConfiguration; + private Transaction createdTransaction; + boolean autoCommit; + + @Inject + public RequestScopedTransaction(UserTransaction userTransaction, + TransactionManager transactionManager, TransactionManagerConfiguration transactionManagerConfiguration) { + this.userTransaction = userTransaction; + this.transactionManager = transactionManager; + this.transactionManagerConfiguration = transactionManagerConfiguration; + } + + public RequestScopedTransaction() { + //for proxiability + this.userTransaction = null; + this.transactionManagerConfiguration = null; + this.transactionManager = null; + } + + void begin(BeginOptions options) { + int timeout = options != null ? options.timeout : 0; + boolean commitOnRequestScopeEnd = options != null && options.commitOnRequestScopeEnd; + try { + if (userTransaction.getStatus() != Status.STATUS_NO_TRANSACTION) { + throw new QuarkusTransactionException("Transaction already active"); + + } + this.autoCommit = commitOnRequestScopeEnd; + if (timeout > 0) { + userTransaction.setTransactionTimeout(timeout); + } + userTransaction.begin(); + createdTransaction = transactionManager.getTransaction(); + } catch (NotSupportedException | SystemException e) { + throw new QuarkusTransactionException(e); + } finally { + if (timeout > 0) { + try { + userTransaction.setTransactionTimeout( + (int) transactionManagerConfiguration.defaultTransactionTimeout.toSeconds()); + } catch (SystemException e) { + throw new QuarkusTransactionException(e); + } + } + } + } + + @PreDestroy + void destroy() { + try { + if (transactionManager.getTransaction() == createdTransaction) { + if (autoCommit) { + QuarkusTransaction.commit(); + } else { + log.warn("Rolling back transaction that was not committed or explicitly rolled back."); + try { + userTransaction.rollback(); + } catch (SystemException e) { + throw new QuarkusTransactionException(e); + } + } + } + } catch (SystemException e) { + log.warn("Failed to destroy request scoped transaction", e); + } + } + +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RunOptions.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RunOptions.java new file mode 100644 index 0000000000000..b9f66679bbf4a --- /dev/null +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/RunOptions.java @@ -0,0 +1,120 @@ +package io.quarkus.narayana.jta; + +import java.util.function.Function; + +/** + * Builder interface to allow a transaction to be customized, including things like timeout and semantics when an existing + * transaction is present. + */ +public class RunOptions { + + Semantic semantic = Semantic.REQUIRE_NEW; + int timeout = 0; + Function exceptionHandler; + + /** + * Sets the transaction timeout for transactions created by this builder. A value of zero refers to the system default. + * + * @throws IllegalArgumentException If seconds is negative + * @param seconds The timeout in seconds + * @return This builder + */ + public RunOptions timeout(int seconds) { + if (seconds < 0) { + throw new IllegalArgumentException("seconds cannot be negative"); + } + this.timeout = seconds; + return this; + } + + /** + * Sets the transaction semantic that is used to determine the action to take if a transaction is already active. + *

+ * + * @param semantic The semantic + * @return This builder + */ + public RunOptions semantic(Semantic semantic) { + this.semantic = semantic; + return this; + } + + /** + * Provides an exception handler that can make a decision to rollback or commit based on the type of exception. If the + * predicate returns {@link ExceptionResult#ROLLBACK} the transaction is rolled back, + * otherwise it is committed. + *

+ * This exception will still be propagated to the caller, so this method should not log or perform any other actions other + * than determine what should happen to the current transaction. + *

+ * By default the exception is always rolled back. + * + * @param handler The exception handler + * @return This builder + */ + public RunOptions exceptionHandler(Function handler) { + this.exceptionHandler = handler; + return this; + } + + /** + * Enum that can be used to control the transaction behaviour in the presence or absence of an existing transaction. + */ + public enum Semantic { + + /** + * If a transaction is already associated with the current thread a {@link QuarkusTransactionException} will be thrown, + * otherwise a new transaction is started, and follows all the normal lifecycle rules. + */ + DISALLOW_EXISTING, + + /** + *

+ * If no transaction is active then a new transaction will be started, and committed when the method ends. + * If an exception is thrown the exception handler registered by {@link #exceptionHandler(Function)} will be called to + * decide if the TX should be committed or rolled back. + *

+ * If an existing transaction is active then the method is run in the context of the existing transaction. If an + * exception is thrown the exception handler will be called, however + * a result of {@link ExceptionResult#ROLLBACK} will result in the TX marked as rollback only, while a result of + * {@link ExceptionResult#COMMIT} will result in no action being taken. + */ + JOIN_EXISTING, + + /** + * This is the default semantic. + *

+ * If an existing transaction is already associated with the current thread then the transaction is suspended, and + * resumed once + * the current transaction is complete. + *

+ * A new transaction is started after the existing transaction is suspended, and follows all the normal lifecycle rules. + *

+ */ + REQUIRE_NEW, + + /** + * If no transaction is active then this semantic is basically a no-op. + *

+ * If a transaction is active then it is suspended, and resumed after the task is run. + *

+ * The exception handler will never be consulted when this semantic is in use, specifying both an exception handler and + * this semantic is considered an error. + *

+ * This semantic allows for code to easily be run outside the scope of a transaction. + */ + SUSPEND_EXISTING + + } + + public enum ExceptionResult { + /** + * The transaction should be committed. + */ + COMMIT, + /** + * The transaction should be rolled back. + */ + ROLLBACK + } +} diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java index a79a961d74a8b..b66dd67487d37 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java @@ -52,10 +52,9 @@ public void setDefaultProperties(Properties properties) { } public void setDefaultTimeout(TransactionManagerConfiguration transactions) { - transactions.defaultTransactionTimeout.ifPresent(defaultTimeout -> { - arjPropertyManager.getCoordinatorEnvironmentBean().setDefaultTimeout((int) defaultTimeout.getSeconds()); - TxControl.setDefaultTimeout((int) defaultTimeout.getSeconds()); - }); + arjPropertyManager.getCoordinatorEnvironmentBean() + .setDefaultTimeout((int) transactions.defaultTransactionTimeout.getSeconds()); + TxControl.setDefaultTimeout((int) transactions.defaultTransactionTimeout.getSeconds()); } public static Properties getDefaultProperties() { diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java index 949f00141da22..ee2c70841b308 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java @@ -1,7 +1,6 @@ package io.quarkus.narayana.jta.runtime; import java.time.Duration; -import java.util.Optional; import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; @@ -22,7 +21,7 @@ public final class TransactionManagerConfiguration { * The default transaction timeout */ @ConfigItem(defaultValue = "60") - public Optional defaultTransactionTimeout; + public Duration defaultTransactionTimeout; /** * The directory name of location of the transaction logs.