Skip to content

Commit

Permalink
Merge pull request #989 from Ladicek/programmatic-api-thread-offload-…
Browse files Browse the repository at this point in the history
…executor

add FaultTolerance.Builder.withThreadOffloadExecutor()
  • Loading branch information
Ladicek authored Mar 25, 2024
2 parents b8fb35f + 92fe1c7 commit 101b8b6
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
* the result of the asynchronous execution, when it completes.
* <p>
* Before the asynchronous execution completes, the {@code CompletionStage} returned to the caller is incomplete.
* Once the asynchronous execution completes, the {@code CompletionStage} returned to the caller assumes behavior
* that is equivalent to the {@code CompletionStage} returned by the guarded method. If the guarded method
* synchronously throws an exception, the returned {@code CompletionStage} completes with that exception.
* Once the asynchronous execution completes, the {@code CompletionStage} returned to the caller becomes equivalent
* to the {@code CompletionStage} returned by the guarded method. If the guarded method synchronously throws
* an exception, the returned {@code CompletionStage} completes with that exception.
* <p>
* If a method marked with this annotation doesn't declare return type of {@code CompletionStage},
* {@link org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -302,7 +303,7 @@ interface Builder<T, R> {
/**
* Configures whether the guarded action should be offloaded to another thread. Thread offload is
* only possible for asynchronous actions. If this builder was not created using {@code createAsync},
* attempting to configure thread offload throws an exception.
* attempting to enable thread offload throws an exception.
*
* @param value whether the guarded action should be offloaded to another thread
* @return this fault tolerance builder
Expand All @@ -311,6 +312,19 @@ interface Builder<T, R> {
*/
Builder<T, R> withThreadOffload(boolean value);

/**
* Configures the executor to use when offloading the guarded action to another thread. Thread
* offload is only possible for asynchronous actions. If this builder was not created using
* {@code createAsync}, this method throws an exception.
* <p>
* If this method is not called, the guarded action is offloaded to the default executor
* provided by the integrator.
*
* @param executor the executor to which the guarded action should be offloaded
* @return this fault tolerance builder
*/
Builder<T, R> withThreadOffloadExecutor(Executor executor);

/**
* Returns a ready-to-use instance of {@code FaultTolerance} or guarded {@code Callable}, depending on
* how this builder was created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public static final class BuilderImpl<T, R> implements Builder<T, R> {
private RetryBuilderImpl<T, R> retryBuilder;
private TimeoutBuilderImpl<T, R> timeoutBuilder;
private boolean offloadToAnotherThread;
private Executor offloadExecutor;

public BuilderImpl(BuilderEagerDependencies eagerDependencies, Supplier<BuilderLazyDependencies> lazyDependencies,
Class<?> asyncType, Function<FaultTolerance<T>, R> finisher) {
Expand Down Expand Up @@ -187,6 +188,16 @@ public Builder<T, R> withThreadOffload(boolean value) {
return this;
}

@Override
public Builder<T, R> withThreadOffloadExecutor(Executor executor) {
if (!isAsync) {
throw new IllegalStateException("Thread offload executor may only be set for asynchronous invocations");
}

this.offloadExecutor = Preconditions.checkNotNull(executor, "Thread offload executor must be set");
return this;
}

@Override
public R build() {
eagerInitialization();
Expand Down Expand Up @@ -307,7 +318,9 @@ private <V> FaultToleranceStrategy<CompletionStage<V>> buildAsyncStrategy(Builde
FaultToleranceStrategy<CompletionStage<V>> result = invocation();

// thread offload is always enabled
Executor executor = offloadToAnotherThread ? lazyDependencies.asyncExecutor() : DirectExecutor.INSTANCE;
Executor executor = offloadToAnotherThread
? (offloadExecutor != null ? offloadExecutor : lazyDependencies.asyncExecutor())
: DirectExecutor.INSTANCE;
result = new CompletionStageExecution<>(result, executor);

if (lazyDependencies.ftEnabled() && bulkheadBuilder != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.faulttolerance.core.async;

import static io.smallrye.faulttolerance.core.async.AsyncLogger.LOG;
import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -15,7 +16,7 @@ public class RememberEventLoop<V> implements FaultToleranceStrategy<CompletionSt

public RememberEventLoop(FaultToleranceStrategy<CompletionStage<V>> delegate, EventLoop eventLoop) {
this.delegate = delegate;
this.eventLoop = eventLoop;
this.eventLoop = checkNotNull(eventLoop, "Event loop must be set");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.smallrye.faulttolerance.standalone.test;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.core.util.party.Party;

public class StandaloneThreadOffloadTest {
@Test
public void integratedExecutor() throws Exception {
FaultTolerance<CompletionStage<String>> guarded = FaultTolerance.<String> createAsync()
.withThreadOffload(true)
.build();

Set<String> threadNames = ConcurrentHashMap.newKeySet();
Party party = Party.create(5);

for (int i = 0; i < 5; i++) {
guarded.call(() -> {
threadNames.add(Thread.currentThread().getName());
party.participant().attend();
return completedFuture("ignored");
});
}

party.organizer().waitForAll();
party.organizer().disband();

assertThat(threadNames).hasSize(5);
}

@Test
public void explicitExecutor() throws Exception {
String prefix = UUID.randomUUID().toString();
AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newCachedThreadPool(runnable -> new Thread(runnable,
prefix + "_" + counter.incrementAndGet()));

FaultTolerance<CompletionStage<String>> guarded = FaultTolerance.<String> createAsync()
.withThreadOffload(true)
.withThreadOffloadExecutor(executor)
.build();

Set<String> threadNames = ConcurrentHashMap.newKeySet();
Party party = Party.create(5);

for (int i = 0; i < 5; i++) {
guarded.call(() -> {
threadNames.add(Thread.currentThread().getName());
party.participant().attend();
return completedFuture("ignored");
});
}

party.organizer().waitForAll();
party.organizer().disband();

assertThat(threadNames).hasSize(5);
assertThat(threadNames).allSatisfy(threadName -> {
assertThat(threadName).startsWith(prefix);
});

executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.smallrye.faulttolerance.programmatic;

import io.smallrye.faulttolerance.standalone.test.StandaloneThreadOffloadTest;
import io.smallrye.faulttolerance.util.FaultToleranceBasicTest;

@FaultToleranceBasicTest
public class CdiThreadOffloadTest extends StandaloneThreadOffloadTest {
}

0 comments on commit 101b8b6

Please sign in to comment.