Skip to content

Commit

Permalink
add FaultTolerance.Builder.withThreadOffloadExecutor()
Browse files Browse the repository at this point in the history
This method allows configuring the executor to use for thread
offloads of actions guarded by the built `FaultTolerance` instance.
  • Loading branch information
Ladicek committed Mar 25, 2024
1 parent b8fb35f commit 92fe1c7
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
* the result of the asynchronous execution, when it completes.
* <p>
* Before the asynchronous execution completes, the {@code CompletionStage} returned to the caller is incomplete.
* Once the asynchronous execution completes, the {@code CompletionStage} returned to the caller assumes behavior
* that is equivalent to the {@code CompletionStage} returned by the guarded method. If the guarded method
* synchronously throws an exception, the returned {@code CompletionStage} completes with that exception.
* Once the asynchronous execution completes, the {@code CompletionStage} returned to the caller becomes equivalent
* to the {@code CompletionStage} returned by the guarded method. If the guarded method synchronously throws
* an exception, the returned {@code CompletionStage} completes with that exception.
* <p>
* If a method marked with this annotation doesn't declare return type of {@code CompletionStage},
* {@link org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -302,7 +303,7 @@ interface Builder<T, R> {
/**
* Configures whether the guarded action should be offloaded to another thread. Thread offload is
* only possible for asynchronous actions. If this builder was not created using {@code createAsync},
* attempting to configure thread offload throws an exception.
* attempting to enable thread offload throws an exception.
*
* @param value whether the guarded action should be offloaded to another thread
* @return this fault tolerance builder
Expand All @@ -311,6 +312,19 @@ interface Builder<T, R> {
*/
Builder<T, R> withThreadOffload(boolean value);

/**
* Configures the executor to use when offloading the guarded action to another thread. Thread
* offload is only possible for asynchronous actions. If this builder was not created using
* {@code createAsync}, this method throws an exception.
* <p>
* If this method is not called, the guarded action is offloaded to the default executor
* provided by the integrator.
*
* @param executor the executor to which the guarded action should be offloaded
* @return this fault tolerance builder
*/
Builder<T, R> withThreadOffloadExecutor(Executor executor);

/**
* Returns a ready-to-use instance of {@code FaultTolerance} or guarded {@code Callable}, depending on
* how this builder was created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public static final class BuilderImpl<T, R> implements Builder<T, R> {
private RetryBuilderImpl<T, R> retryBuilder;
private TimeoutBuilderImpl<T, R> timeoutBuilder;
private boolean offloadToAnotherThread;
private Executor offloadExecutor;

public BuilderImpl(BuilderEagerDependencies eagerDependencies, Supplier<BuilderLazyDependencies> lazyDependencies,
Class<?> asyncType, Function<FaultTolerance<T>, R> finisher) {
Expand Down Expand Up @@ -187,6 +188,16 @@ public Builder<T, R> withThreadOffload(boolean value) {
return this;
}

@Override
public Builder<T, R> withThreadOffloadExecutor(Executor executor) {
if (!isAsync) {
throw new IllegalStateException("Thread offload executor may only be set for asynchronous invocations");
}

this.offloadExecutor = Preconditions.checkNotNull(executor, "Thread offload executor must be set");
return this;
}

@Override
public R build() {
eagerInitialization();
Expand Down Expand Up @@ -307,7 +318,9 @@ private <V> FaultToleranceStrategy<CompletionStage<V>> buildAsyncStrategy(Builde
FaultToleranceStrategy<CompletionStage<V>> result = invocation();

// thread offload is always enabled
Executor executor = offloadToAnotherThread ? lazyDependencies.asyncExecutor() : DirectExecutor.INSTANCE;
Executor executor = offloadToAnotherThread
? (offloadExecutor != null ? offloadExecutor : lazyDependencies.asyncExecutor())
: DirectExecutor.INSTANCE;
result = new CompletionStageExecution<>(result, executor);

if (lazyDependencies.ftEnabled() && bulkheadBuilder != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.faulttolerance.core.async;

import static io.smallrye.faulttolerance.core.async.AsyncLogger.LOG;
import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -15,7 +16,7 @@ public class RememberEventLoop<V> implements FaultToleranceStrategy<CompletionSt

public RememberEventLoop(FaultToleranceStrategy<CompletionStage<V>> delegate, EventLoop eventLoop) {
this.delegate = delegate;
this.eventLoop = eventLoop;
this.eventLoop = checkNotNull(eventLoop, "Event loop must be set");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.smallrye.faulttolerance.standalone.test;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.core.util.party.Party;

public class StandaloneThreadOffloadTest {
@Test
public void integratedExecutor() throws Exception {
FaultTolerance<CompletionStage<String>> guarded = FaultTolerance.<String> createAsync()
.withThreadOffload(true)
.build();

Set<String> threadNames = ConcurrentHashMap.newKeySet();
Party party = Party.create(5);

for (int i = 0; i < 5; i++) {
guarded.call(() -> {
threadNames.add(Thread.currentThread().getName());
party.participant().attend();
return completedFuture("ignored");
});
}

party.organizer().waitForAll();
party.organizer().disband();

assertThat(threadNames).hasSize(5);
}

@Test
public void explicitExecutor() throws Exception {
String prefix = UUID.randomUUID().toString();
AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newCachedThreadPool(runnable -> new Thread(runnable,
prefix + "_" + counter.incrementAndGet()));

FaultTolerance<CompletionStage<String>> guarded = FaultTolerance.<String> createAsync()
.withThreadOffload(true)
.withThreadOffloadExecutor(executor)
.build();

Set<String> threadNames = ConcurrentHashMap.newKeySet();
Party party = Party.create(5);

for (int i = 0; i < 5; i++) {
guarded.call(() -> {
threadNames.add(Thread.currentThread().getName());
party.participant().attend();
return completedFuture("ignored");
});
}

party.organizer().waitForAll();
party.organizer().disband();

assertThat(threadNames).hasSize(5);
assertThat(threadNames).allSatisfy(threadName -> {
assertThat(threadName).startsWith(prefix);
});

executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.smallrye.faulttolerance.programmatic;

import io.smallrye.faulttolerance.standalone.test.StandaloneThreadOffloadTest;
import io.smallrye.faulttolerance.util.FaultToleranceBasicTest;

@FaultToleranceBasicTest
public class CdiThreadOffloadTest extends StandaloneThreadOffloadTest {
}

0 comments on commit 92fe1c7

Please sign in to comment.