Skip to content

Commit

Permalink
Issue ReactiveX#596: The Spring Boot fallback method is not invoked w…
Browse files Browse the repository at this point in the history
…hen a BulkheadFullException occurs ReactiveX#847

The ThreadPoolBulkhead is does not return a CompletionStage when a task could not be submitted, because the Bulkhead is full. The ThreadPoolBulkhead throws a BulkheadFullException instead. Analog to the ThreadPoolExecutor which throws the RejectedExecutionException. The BulkheadFullException is not handled correctly by the BulkheadAspect.
The BulkheadAspect should convert the BulkheadFullException into a exceptionally completed future so that the FallbackDecorator works as expected.
  • Loading branch information
RobWin authored Feb 6, 2020
1 parent f7d2466 commit 79378fb
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.resilience4j.decorators;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.cache.Cache;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
Expand All @@ -18,6 +19,7 @@

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.*;
Expand Down Expand Up @@ -140,7 +142,20 @@ public DecorateSupplier<T> withFallback(List<Class<? extends Throwable>> excepti
}

public DecorateCompletionStage<T> withThreadPoolBulkhead(ThreadPoolBulkhead threadPoolBulkhead) {
return Decorators.ofCompletionStage(threadPoolBulkhead.decorateSupplier(supplier));
return Decorators.ofCompletionStage(getCompletionStageSupplier(threadPoolBulkhead));
}

private Supplier<CompletionStage<T>> getCompletionStageSupplier(
ThreadPoolBulkhead threadPoolBulkhead) {
return () -> {
try {
return threadPoolBulkhead.executeSupplier(supplier);
} catch (BulkheadFullException ex) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(ex);
return future;
}
};
}

public Supplier<T> decorate() {
Expand Down Expand Up @@ -232,7 +247,20 @@ public DecorateRunnable withBulkhead(Bulkhead bulkhead) {
}

public DecorateCompletionStage<Void> withThreadPoolBulkhead(ThreadPoolBulkhead threadPoolBulkhead) {
return Decorators.ofCompletionStage(threadPoolBulkhead.decorateRunnable(runnable));
return Decorators.ofCompletionStage(getCompletionStageSupplier(threadPoolBulkhead));
}

private Supplier<CompletionStage<Void>> getCompletionStageSupplier(
ThreadPoolBulkhead threadPoolBulkhead) {
return () -> {
try {
return threadPoolBulkhead.executeRunnable(runnable);
} catch (BulkheadFullException ex) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(ex);
return future;
}
};
}

public Runnable decorate() {
Expand Down Expand Up @@ -303,7 +331,20 @@ public <X extends Throwable> DecorateCallable<T> withFallback(Class<X> exception
}

public DecorateCompletionStage<T> withThreadPoolBulkhead(ThreadPoolBulkhead threadPoolBulkhead) {
return Decorators.ofCompletionStage(threadPoolBulkhead.decorateCallable(callable));
return Decorators.ofCompletionStage(getCompletionStageSupplier(threadPoolBulkhead));
}

private Supplier<CompletionStage<T>> getCompletionStageSupplier(
ThreadPoolBulkhead threadPoolBulkhead) {
return () -> {
try {
return threadPoolBulkhead.executeCallable(callable);
} catch (BulkheadFullException ex) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(ex);
return future;
}
};
}

public Callable<T> decorate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.github.resilience4j.decorators;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.cache.Cache;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
Expand Down Expand Up @@ -60,25 +61,6 @@ public void setUp() {
helloWorldService = mock(HelloWorldService.class);
}

@Test
public void testExecuteSupplierInThreadPoolBulkhead() throws ExecutionException, InterruptedException {
given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.ofDefaults("helloBackend");
CompletionStage<String> completionStage = Decorators
.ofCompletionStage(bulkhead.decorateSupplier(() -> helloWorldService.returnHelloWorld()))
.withCircuitBreaker(circuitBreaker)
.get();

String value = completionStage.toCompletableFuture().get();

assertThat(value).isEqualTo("Hello world");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
then(helloWorldService).should(times(1)).returnHelloWorld();
}

@Test
public void shouldThrowTimeoutException() {
TimeLimiter timeLimiter = TimeLimiter.of("helloBackend", TimeLimiterConfig.custom()
Expand Down Expand Up @@ -346,7 +328,81 @@ public void testDecorateCallableWithFallback() throws Throwable {
}

@Test
public void testDecorateCompletionStageWithFallback() throws ExecutionException, InterruptedException {
public void testDecorateSupplierWithBulkheadFullExceptionFallback() throws ExecutionException, InterruptedException {
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.ofDefaults("helloBackend");
ThreadPoolBulkhead bulkheadMock = spy(bulkhead);
given(bulkheadMock.submit(any(Callable.class))).willThrow(BulkheadFullException.createBulkheadFullException(bulkhead));

CompletionStage<String> completionStage = Decorators
.ofSupplier(() -> helloWorldService.returnHelloWorld())
.withThreadPoolBulkhead(bulkheadMock)
.withFallback(BulkheadFullException.class, (e) -> "Fallback")
.get();

String result = completionStage.toCompletableFuture().get();

assertThat(result).isEqualTo("Fallback");
}

@Test
public void testDecorateCallableWithBulkheadFullExceptionFallback() throws ExecutionException, InterruptedException {
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.ofDefaults("helloBackend");
ThreadPoolBulkhead bulkheadMock = spy(bulkhead);
given(bulkheadMock.submit(any(Callable.class))).willThrow(BulkheadFullException.createBulkheadFullException(bulkhead));

CompletionStage<String> completionStage = Decorators
.ofCallable(() -> helloWorldService.returnHelloWorldWithException())
.withThreadPoolBulkhead(bulkheadMock)
.withFallback(BulkheadFullException.class, (e) -> "Fallback")
.get();

String result = completionStage.toCompletableFuture().get();

assertThat(result).isEqualTo("Fallback");
}

@Test
public void testDecorateRunnableWithBulkheadFullExceptionFallback() throws ExecutionException, InterruptedException {
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.ofDefaults("helloBackend");
ThreadPoolBulkhead bulkheadMock = spy(bulkhead);
given(bulkheadMock.submit(any(Callable.class))).willThrow(BulkheadFullException.createBulkheadFullException(bulkhead));

CompletionStage<Void> completionStage = Decorators
.ofRunnable(() -> helloWorldService.sayHelloWorld())
.withThreadPoolBulkhead(bulkheadMock)
.withFallback(BulkheadFullException.class, (e) -> {
helloWorldService.sayHelloWorld();
return null;
})
.get();

completionStage.toCompletableFuture().get();

then(helloWorldService).should(times(1)).sayHelloWorld();
}


@Test
public void testDecorateCompletionStageWithCallNotPermittedExceptionFallback() throws ExecutionException, InterruptedException {
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
circuitBreaker.transitionToOpenState();
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.ofDefaults("helloBackend");
CompletionStage<String> completionStage = Decorators
.ofSupplier(() -> helloWorldService.returnHelloWorld())
.withThreadPoolBulkhead(bulkhead)
.withCircuitBreaker(circuitBreaker)
.withFallback(CallNotPermittedException.class, (e) -> "Fallback")
.get();

String result = completionStage.toCompletableFuture().get();

assertThat(result).isEqualTo("Fallback");
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfNotPermittedCalls()).isEqualTo(1);
}

@Test
public void testDecorateCompletionStageWithTimeoutExceptionFallback() throws ExecutionException, InterruptedException {
TimeLimiter timeLimiter = TimeLimiter.of("helloBackend", TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofMillis(100)).build());
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
Expand All @@ -359,7 +415,7 @@ public void testDecorateCompletionStageWithFallback() throws ExecutionException,
.withThreadPoolBulkhead(bulkhead)
.withTimeLimiter(timeLimiter, Executors.newSingleThreadScheduledExecutor())
.withCircuitBreaker(circuitBreaker)
.withFallback(asList(TimeoutException.class, CallNotPermittedException.class), (e) -> "Fallback")
.withFallback(TimeoutException.class, (e) -> "Fallback")
.get();

String result = completionStage.toCompletableFuture().get();
Expand Down
Loading

0 comments on commit 79378fb

Please sign in to comment.