Skip to content

Commit

Permalink
Merge pull request #21115 from stuartwdouglas/979
Browse files Browse the repository at this point in the history
QuarkusTransaction API
  • Loading branch information
stuartwdouglas authored Jan 28, 2022
2 parents 777d821 + 77d2632 commit ee33fab
Show file tree
Hide file tree
Showing 11 changed files with 1,061 additions and 8 deletions.
97 changes: 95 additions & 2 deletions docs/src/main/asciidoc/transaction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Add the following to your `pom.xml`:

== Starting and stopping transactions: defining your boundaries

You can define your transaction boundaries the easy way, or the less easy way :)
You can define your transaction boundaries either declarativly with `@Transactional` or programmatically with `QuarkusTransaction`. You can also use
the JTA `UserTransaction` API directly, however this is less user friendly than `QuarkusTransaction`.

=== Declarative approach

Expand Down Expand Up @@ -142,7 +143,99 @@ work is really done, and not just until the reactive method returns.
If you need to propagate your transaction context across your reactive pipeline, please see the
xref:context-propagation.adoc[Context Propagation guide].

=== API approach
=== Programmatic Approach

You can use static methods on `QuarkusTransaction` to define transaction boundaries. This provides two different options,
a functional approach that allows you to run a lambda within the scope of a transaction, or by using explicit `begin`,
`commit` and `rollback` methods.

[source,java]
----
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.RunOptions;
public class TransactionExample {
public void beginExample() {
QuarkusTransaction.begin();
//do work
QuarkusTransaction.commit();
QuarkusTransaction.begin(QuarkusTransaction.beginOptions()
.timeout(10));
//do work
QuarkusTransaction.rollback();
}
public void lambdaExample() {
QuarkusTransaction.run(() -> {
//do work
});
int result = QuarkusTransaction.call(QuarkusTransaction.runOptions()
.timeout(10)
.exceptionHandler((throwable) -> {
if (throwable instanceof SomeException) {
return RunOptions.ExceptionResult.COMMIT;
}
return RunOptions.ExceptionResult.ROLLBACK;
})
.semantic(RunOptions.Semantic.SUSPEND_EXISTING), () -> {
//do work
return 0;
});
}
}
----

The above example shows a few different ways the API can be used. The first method simply calls begin, does some work and commits it.
This created transaction is tied to the CDI request scope, so if it is still active when the request scope is destroyed then it will
be automatically rolled back. This removes the need to explicitly catch exceptions and call `rollback`, and acts as a safety net
against inadvertent transaction leaks, however it does mean that this can only be used when the request scope is active. The second
example in the method calls begin with a timeout option, and then rolls back the transaction.

The second example shows the use of lambda scoped transactions, the first just runs a `Runnable` within a transaction, the second,
runs `Callable` with some specific options. In particular the `exceptionHandler` method can be used to control if the transaction
is rolled back or not on exception, and the `semantic` method controls the behaviour if an existing transaction is already started.

The following semantics are supported:


DISALLOW_EXISTING::

If a transaction is already associated with the current thread a `QuarkusTransactionException` will be thrown,
otherwise a new transaction is started, and follows all the normal lifecycle rules.

JOIN_EXISTING::

If no transaction is active then a new transaction will be started, and committed when the method ends.
If an exception is thrown the exception handler registered by `#exceptionHandler(Function)` will be called to
decide if the TX should be committed or rolled back.
If an existing transaction is active then the method is run in the context of the existing transaction. If an
exception is thrown the exception handler will be called, however
a result of `ExceptionResult#ROLLBACK` will result in the TX marked as rollback only, while a result of
`ExceptionResult#COMMIT` will result in no action being taken.

REQUIRE_NEW::

This is the default semantic.
If an existing transaction is already associated with the current thread then the transaction is suspended, and
resumed once
the current transaction is complete.
A new transaction is started after the existing transaction is suspended, and follows all the normal lifecycle rules.

SUSPEND_EXISTING::

If no transaction is active then this semantic is basically a no-op.
If a transaction is active then it is suspended, and resumed after the task is run.
The exception handler will never be consulted when this semantic is in use, specifying both an exception handler and
this semantic is considered an error.
This semantic allows for code to easily be run outside the scope of a transaction.



=== Legacy API approach

The less easy way is to inject a `UserTransaction` and use the various transaction demarcation methods.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void build(NarayanaJtaRecorder recorder,
feature.produce(new FeatureBuildItem(Feature.NARAYANA_JTA));
additionalBeans.produce(new AdditionalBeanBuildItem(NarayanaJtaProducers.class));
additionalBeans.produce(new AdditionalBeanBuildItem(CDIDelegatingTransactionManager.class));
additionalBeans.produce(AdditionalBeanBuildItem.unremovableOf("io.quarkus.narayana.jta.RequestScopedTransaction"));

runtimeInit.produce(new RuntimeInitializedClassBuildItem(
"com.arjuna.ats.internal.jta.resources.arjunacore.CommitMarkableResourceRecord"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package io.quarkus.narayana.quarkus;

import static io.quarkus.narayana.jta.QuarkusTransaction.beginOptions;
import static io.quarkus.narayana.jta.QuarkusTransaction.runOptions;

import java.util.concurrent.atomic.AtomicReference;

import javax.enterprise.context.ContextNotActiveException;
import javax.enterprise.context.control.ActivateRequestContext;
import javax.inject.Inject;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.QuarkusTransactionException;
import io.quarkus.narayana.jta.RunOptions;
import io.quarkus.test.QuarkusUnitTest;

public class QuarkusTransactionTest {

@Inject
TransactionManager transactionManager;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Test
public void testBeginRequestScopeNotActive() {
Assertions.assertThrows(ContextNotActiveException.class, QuarkusTransaction::begin);
}

@Test
@ActivateRequestContext
public void testBeginCommit() {
QuarkusTransaction.begin();
var sync = register();
QuarkusTransaction.commit();
Assertions.assertEquals(Status.STATUS_COMMITTED, sync.completionStatus);
}

@Test
@ActivateRequestContext
public void testBeginRollback() {
QuarkusTransaction.begin();
var sync = register();
QuarkusTransaction.rollback();
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus);
}

@Test
@ActivateRequestContext
public void testBeginSetRollbackOnly() {
QuarkusTransaction.begin();
var sync = register();
QuarkusTransaction.setRollbackOnly();
Assertions.assertThrows(QuarkusTransactionException.class, QuarkusTransaction::commit);
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus);
}

@Test
@ActivateRequestContext
public void testBeginTimeout() throws InterruptedException {
QuarkusTransaction.begin(beginOptions().timeout(1));
var sync = register();
Thread.sleep(1200);
Assertions.assertThrows(QuarkusTransactionException.class, QuarkusTransaction::commit);
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus);
}

@Test
@ActivateRequestContext
public void testBeginSuspendExistingFalse() {
QuarkusTransaction.begin();
var sync = register();
Assertions.assertThrows(QuarkusTransactionException.class, QuarkusTransaction::begin);
Assertions.assertTrue(QuarkusTransaction.isActive());
QuarkusTransaction.commit();
Assertions.assertEquals(Status.STATUS_COMMITTED, sync.completionStatus);
}

@Test
public void testBeginRollbackOnRequestScopeEnd() {
var context = Arc.container().requestContext();
context.activate();
TestSync sync = null;
try {
QuarkusTransaction.begin();
sync = register();
} finally {
context.terminate();
}
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.completionStatus);
}

@Test
public void testBeginCommitOnRequestScopeEnd() {
var context = Arc.container().requestContext();
context.activate();
TestSync sync = null;
try {
QuarkusTransaction.begin(beginOptions().commitOnRequestScopeEnd());
sync = register();
} finally {
context.terminate();
}
Assertions.assertEquals(Status.STATUS_COMMITTED, sync.completionStatus);
}

@Test
public void testCallCommit() {
Assertions.assertEquals(Status.STATUS_COMMITTED, QuarkusTransaction.call(this::register).completionStatus);
}

@Test
public void testCallRollback() {
AtomicReference<TestSync> sync = new AtomicReference<>();
Assertions.assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.call(() -> {
sync.set(register());
QuarkusTransaction.rollback();
return null;
}));
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus);
}

@Test
public void testCallRollbackOnly() {
AtomicReference<TestSync> sync = new AtomicReference<>();
Assertions.assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.call(() -> {
sync.set(register());
QuarkusTransaction.setRollbackOnly();
return null;
}));
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus);
}

@Test
public void testCallTimeout() {
AtomicReference<TestSync> sync = new AtomicReference<>();
Assertions.assertThrows(QuarkusTransactionException.class,
() -> QuarkusTransaction.call(runOptions().timeout(1), () -> {
sync.set(register());
Thread.sleep(1200);
return null;
}));
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus);
}

@Test
public void testCallException() {
AtomicReference<TestSync> sync = new AtomicReference<>();
Assertions.assertThrows(IllegalArgumentException.class, () -> QuarkusTransaction.call(() -> {
sync.set(register());
throw new IllegalArgumentException("foo");
}));
Assertions.assertEquals(Status.STATUS_ROLLEDBACK, sync.get().completionStatus);
}

@Test
public void testCallExceptionHandler() {
AtomicReference<TestSync> sync = new AtomicReference<>();
Assertions.assertThrows(IllegalArgumentException.class,
() -> QuarkusTransaction.call(runOptions().exceptionHandler((e) -> RunOptions.ExceptionResult.COMMIT), () -> {
sync.set(register());
throw new IllegalArgumentException("foo");
}));
Assertions.assertEquals(Status.STATUS_COMMITTED, sync.get().completionStatus);
}

@Test
public void testCallSuspendExisting() {
QuarkusTransaction.call(runOptions().semantic(RunOptions.Semantic.SUSPEND_EXISTING), () -> {
Assertions.assertFalse(QuarkusTransaction.isActive());
return null;
});
}

@Test
@ActivateRequestContext
public void testCallDisallowExisting() {
RunOptions options = runOptions().semantic(RunOptions.Semantic.DISALLOW_EXISTING);
Assertions.assertEquals(Status.STATUS_COMMITTED, QuarkusTransaction.call(options, this::register).completionStatus);
QuarkusTransaction.begin();
Assertions.assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.call(options, this::register));
}

@Test
@ActivateRequestContext
public void testCallRequiresNew() throws SystemException {
RunOptions options = runOptions().semantic(RunOptions.Semantic.REQUIRE_NEW);
QuarkusTransaction.begin();
var tx = transactionManager.getTransaction();
QuarkusTransaction.call(options, () -> {
Assertions.assertTrue(QuarkusTransaction.isActive());
if (tx == transactionManager.getTransaction()) {
throw new RuntimeException("Running in same transaction");
}
return null;
});
}

@Test
@ActivateRequestContext
public void testCallJoinExisting() throws SystemException {
RunOptions options = runOptions().semantic(RunOptions.Semantic.JOIN_EXISTING);
QuarkusTransaction.begin();
var tx = transactionManager.getTransaction();
QuarkusTransaction.call(options, () -> {
Assertions.assertTrue(QuarkusTransaction.isActive());
if (tx != transactionManager.getTransaction()) {
throw new RuntimeException("Running in different transaction");
}
return null;
});
}

TestSync register() {
TestSync t = new TestSync();
try {
transactionManager.getTransaction().registerSynchronization(t);
} catch (RollbackException | SystemException e) {
throw new RuntimeException(e);
}
return t;
}

static class TestSync implements Synchronization {

int completionStatus = -1;

@Override
public void beforeCompletion() {

}

@Override
public void afterCompletion(int status) {
this.completionStatus = status;
}
}

}
Loading

0 comments on commit ee33fab

Please sign in to comment.