diff --git a/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java b/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java index a422c7dc7..bef0a9a83 100644 --- a/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java +++ b/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java @@ -28,9 +28,9 @@ * the result of the asynchronous execution, when it completes. *

* Before the asynchronous execution completes, the {@code CompletionStage} returned to the caller is incomplete. - * Once the asynchronous execution completes, the {@code CompletionStage} returned to the caller assumes behavior - * that is equivalent to the {@code CompletionStage} returned by the guarded method. If the guarded method - * synchronously throws an exception, the returned {@code CompletionStage} completes with that exception. + * Once the asynchronous execution completes, the {@code CompletionStage} returned to the caller becomes equivalent + * to the {@code CompletionStage} returned by the guarded method. If the guarded method synchronously throws + * an exception, the returned {@code CompletionStage} completes with that exception. *

* If a method marked with this annotation doesn't declare return type of {@code CompletionStage}, * {@link org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException diff --git a/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java b/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java index 214690ac4..993fd1cd3 100644 --- a/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java +++ b/api/src/main/java/io/smallrye/faulttolerance/api/FaultTolerance.java @@ -6,6 +6,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Function; @@ -302,7 +303,7 @@ interface Builder { /** * Configures whether the guarded action should be offloaded to another thread. Thread offload is * only possible for asynchronous actions. If this builder was not created using {@code createAsync}, - * attempting to configure thread offload throws an exception. + * attempting to enable thread offload throws an exception. * * @param value whether the guarded action should be offloaded to another thread * @return this fault tolerance builder @@ -311,6 +312,19 @@ interface Builder { */ Builder withThreadOffload(boolean value); + /** + * Configures the executor to use when offloading the guarded action to another thread. Thread + * offload is only possible for asynchronous actions. If this builder was not created using + * {@code createAsync}, this method throws an exception. + *

+ * If this method is not called, the guarded action is offloaded to the default executor + * provided by the integrator. + * + * @param executor the executor to which the guarded action should be offloaded + * @return this fault tolerance builder + */ + Builder withThreadOffloadExecutor(Executor executor); + /** * Returns a ready-to-use instance of {@code FaultTolerance} or guarded {@code Callable}, depending on * how this builder was created. diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java index 6564ab65f..d9fe0a458 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/apiimpl/FaultToleranceImpl.java @@ -129,6 +129,7 @@ public static final class BuilderImpl implements Builder { private RetryBuilderImpl retryBuilder; private TimeoutBuilderImpl timeoutBuilder; private boolean offloadToAnotherThread; + private Executor offloadExecutor; public BuilderImpl(BuilderEagerDependencies eagerDependencies, Supplier lazyDependencies, Class asyncType, Function, R> finisher) { @@ -187,6 +188,16 @@ public Builder withThreadOffload(boolean value) { return this; } + @Override + public Builder withThreadOffloadExecutor(Executor executor) { + if (!isAsync) { + throw new IllegalStateException("Thread offload executor may only be set for asynchronous invocations"); + } + + this.offloadExecutor = Preconditions.checkNotNull(executor, "Thread offload executor must be set"); + return this; + } + @Override public R build() { eagerInitialization(); @@ -307,7 +318,9 @@ private FaultToleranceStrategy> buildAsyncStrategy(Builde FaultToleranceStrategy> result = invocation(); // thread offload is always enabled - Executor executor = offloadToAnotherThread ? lazyDependencies.asyncExecutor() : DirectExecutor.INSTANCE; + Executor executor = offloadToAnotherThread + ? (offloadExecutor != null ? offloadExecutor : lazyDependencies.asyncExecutor()) + : DirectExecutor.INSTANCE; result = new CompletionStageExecution<>(result, executor); if (lazyDependencies.ftEnabled() && bulkheadBuilder != null) { diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java index 110c877af..14a56cb25 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java @@ -1,6 +1,7 @@ package io.smallrye.faulttolerance.core.async; import static io.smallrye.faulttolerance.core.async.AsyncLogger.LOG; +import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -15,7 +16,7 @@ public class RememberEventLoop implements FaultToleranceStrategy> delegate, EventLoop eventLoop) { this.delegate = delegate; - this.eventLoop = eventLoop; + this.eventLoop = checkNotNull(eventLoop, "Event loop must be set"); } @Override diff --git a/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneThreadOffloadTest.java b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneThreadOffloadTest.java new file mode 100644 index 000000000..9743c2ce0 --- /dev/null +++ b/implementation/standalone/src/test/java/io/smallrye/faulttolerance/standalone/test/StandaloneThreadOffloadTest.java @@ -0,0 +1,78 @@ +package io.smallrye.faulttolerance.standalone.test; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.api.FaultTolerance; +import io.smallrye.faulttolerance.core.util.party.Party; + +public class StandaloneThreadOffloadTest { + @Test + public void integratedExecutor() throws Exception { + FaultTolerance> guarded = FaultTolerance. createAsync() + .withThreadOffload(true) + .build(); + + Set threadNames = ConcurrentHashMap.newKeySet(); + Party party = Party.create(5); + + for (int i = 0; i < 5; i++) { + guarded.call(() -> { + threadNames.add(Thread.currentThread().getName()); + party.participant().attend(); + return completedFuture("ignored"); + }); + } + + party.organizer().waitForAll(); + party.organizer().disband(); + + assertThat(threadNames).hasSize(5); + } + + @Test + public void explicitExecutor() throws Exception { + String prefix = UUID.randomUUID().toString(); + AtomicInteger counter = new AtomicInteger(); + ExecutorService executor = Executors.newCachedThreadPool(runnable -> new Thread(runnable, + prefix + "_" + counter.incrementAndGet())); + + FaultTolerance> guarded = FaultTolerance. createAsync() + .withThreadOffload(true) + .withThreadOffloadExecutor(executor) + .build(); + + Set threadNames = ConcurrentHashMap.newKeySet(); + Party party = Party.create(5); + + for (int i = 0; i < 5; i++) { + guarded.call(() -> { + threadNames.add(Thread.currentThread().getName()); + party.participant().attend(); + return completedFuture("ignored"); + }); + } + + party.organizer().waitForAll(); + party.organizer().disband(); + + assertThat(threadNames).hasSize(5); + assertThat(threadNames).allSatisfy(threadName -> { + assertThat(threadName).startsWith(prefix); + }); + + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } +} diff --git a/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiThreadOffloadTest.java b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiThreadOffloadTest.java new file mode 100644 index 000000000..15e24e3ff --- /dev/null +++ b/testsuite/basic/src/test/java/io/smallrye/faulttolerance/programmatic/CdiThreadOffloadTest.java @@ -0,0 +1,8 @@ +package io.smallrye.faulttolerance.programmatic; + +import io.smallrye.faulttolerance.standalone.test.StandaloneThreadOffloadTest; +import io.smallrye.faulttolerance.util.FaultToleranceBasicTest; + +@FaultToleranceBasicTest +public class CdiThreadOffloadTest extends StandaloneThreadOffloadTest { +}