Skip to content

Commit

Permalink
Issue ReactiveX#343: CircuitBreaker should only allow a certain numbe…
Browse files Browse the repository at this point in the history
…r of test requests in HALF_OPEN

CircuitBreaker should only allow a certain number of test requests in HALF_OPEN and reject calls when maximum number is reached. The number of requests is equal to ringBufferSizeInHalfOpenState.
  • Loading branch information
RobWin authored Apr 23, 2019
1 parent 7055444 commit ab0d8a0
Show file tree
Hide file tree
Showing 53 changed files with 563 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,35 @@
*/
package io.github.resilience4j.decorators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;

import org.junit.Before;
import org.junit.Test;
import org.mockito.BDDMockito;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.cache.Cache;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.github.resilience4j.retry.AsyncRetry;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.test.HelloWorldService;
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
import io.vavr.CheckedRunnable;
import io.vavr.control.Try;
import org.junit.Before;
import org.junit.Test;
import org.mockito.BDDMockito;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

public class DecoratorsTest {
public boolean state = false;
Expand Down Expand Up @@ -158,7 +156,7 @@ public void testDecorateCompletionStage() throws ExecutionException, Interrupted

CompletionStage<String> completionStage = Decorators.ofCompletionStage(completionStageSupplier)
.withCircuitBreaker(circuitBreaker)
.withRetry(AsyncRetry.ofDefaults("id"), Executors.newSingleThreadScheduledExecutor())
.withRetry(Retry.ofDefaults("id"), Executors.newSingleThreadScheduledExecutor())
.withBulkhead(Bulkhead.ofDefaults("testName"))
.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public ConcurrentBulkheadTest() {

@Actor
public void firstActor() {
if (bulkhead.isCallPermitted()) {
if (bulkhead.obtainPermission()) {
bulkhead.onComplete();
}
}

@Actor
public void secondActor() {
if (bulkhead.isCallPermitted()) {
if (bulkhead.obtainPermission()) {
bulkhead.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@
*/
package io.github.resilience4j.bulkhead;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
import io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead;
import io.github.resilience4j.bulkhead.utils.BulkheadUtils;
import io.github.resilience4j.core.EventConsumer;
import io.vavr.CheckedConsumer;
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
import io.vavr.CheckedRunnable;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A Bulkhead instance is thread-safe can be used to decorate multiple requests.
*
Expand All @@ -45,7 +44,7 @@
* underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of
* threads/actors involved in a particular flow, etc).
*
* In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#isCallPermitted()}
* In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#tryObtainPermission()} ()}
* If the bulkhead is full, no additional operations will be permitted to execute until space is available.
*
* Once the operation is complete, regardless of the result, client needs to call {@link Bulkhead#onComplete()} in order to maintain
Expand All @@ -62,12 +61,28 @@ public interface Bulkhead {
void changeConfig(BulkheadConfig newConfig);

/**
* Attempts to acquire a permit, which allows an call to be executed.
* Attempts to obtain a permission to execute a call.
* @deprecated Use {@link Bulkhead#tryObtainPermission()}. instead.
*
* @return boolean whether a call should be executed
* @return boolean whether a call is permitted
*/
@Deprecated
boolean isCallPermitted();

/**
* Attempts to obtain a permission to execute a call.
*
* @return boolean whether a call should be executed
*/
boolean tryObtainPermission();

/**
* Attempts to obtain a permission to execute a call.
*
* @throws BulkheadFullException when the Bulkhead is full and no further calls are permitted.
*/
void obtainPermission();

/**
* Records a completed call.
*/
Expand Down Expand Up @@ -167,7 +182,7 @@ default <T> CompletionStage<T> executeCompletionStage(Supplier<CompletionStage<T
*/
static <T> CheckedFunction0<T> decorateCheckedSupplier(Bulkhead bulkhead, CheckedFunction0<T> supplier){
return () -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try {
return supplier.apply();
}
Expand All @@ -190,8 +205,8 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(Bulkhead bulkhea

final CompletableFuture<T> promise = new CompletableFuture<>();

if (!bulkhead.isCallPermitted()) {
promise.completeExceptionally(new BulkheadFullException(String.format("Bulkhead '%s' is open", bulkhead.getName())));
if (!bulkhead.tryObtainPermission()) {
promise.completeExceptionally(new BulkheadFullException(bulkhead));
}
else {
try {
Expand Down Expand Up @@ -228,7 +243,7 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(Bulkhead bulkhea
*/
static CheckedRunnable decorateCheckedRunnable(Bulkhead bulkhead, CheckedRunnable runnable){
return () -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try{
runnable.run();
}
Expand All @@ -249,7 +264,7 @@ static CheckedRunnable decorateCheckedRunnable(Bulkhead bulkhead, CheckedRunnabl
*/
static <T> Callable<T> decorateCallable(Bulkhead bulkhead, Callable<T> callable){
return () -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try {
return callable.call();
}
Expand All @@ -270,7 +285,7 @@ static <T> Callable<T> decorateCallable(Bulkhead bulkhead, Callable<T> callable)
*/
static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier){
return () -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try {
return supplier.get();
}
Expand All @@ -291,7 +306,7 @@ static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier)
*/
static <T> Consumer<T> decorateConsumer(Bulkhead bulkhead, Consumer<T> consumer){
return (t) -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try {
consumer.accept(t);
}
Expand All @@ -312,7 +327,7 @@ static <T> Consumer<T> decorateConsumer(Bulkhead bulkhead, Consumer<T> consumer)
*/
static <T> CheckedConsumer<T> decorateCheckedConsumer(Bulkhead bulkhead, CheckedConsumer<T> consumer){
return (t) -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try {
consumer.accept(t);
}
Expand All @@ -332,7 +347,7 @@ static <T> CheckedConsumer<T> decorateCheckedConsumer(Bulkhead bulkhead, Checked
*/
static Runnable decorateRunnable(Bulkhead bulkhead, Runnable runnable){
return () -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try{
runnable.run();
}
Expand All @@ -353,7 +368,7 @@ static Runnable decorateRunnable(Bulkhead bulkhead, Runnable runnable){
*/
static <T, R> Function<T, R> decorateFunction(Bulkhead bulkhead, Function<T, R> function){
return (T t) -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try{
return function.apply(t);
}
Expand All @@ -374,7 +389,7 @@ static <T, R> Function<T, R> decorateFunction(Bulkhead bulkhead, Function<T, R>
*/
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Bulkhead bulkhead, CheckedFunction1<T, R> function){
return (T t) -> {
BulkheadUtils.isCallPermitted(bulkhead);
bulkhead.obtainPermission();
try{
return function.apply(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
*/
public class BulkheadFullException extends RuntimeException {

/**
* The constructor with a message.
*
* @param bulkhead the Bulkhead.
*/
public BulkheadFullException(Bulkhead bulkhead) {
super(String.format("Bulkhead '%s' is full and does not permit further calls", bulkhead.getName()));
}

/**
* The constructor with a message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@
package io.github.resilience4j.bulkhead.internal;


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
Expand All @@ -37,6 +28,10 @@
import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.core.EventProcessor;
import io.github.resilience4j.core.lang.Nullable;

import java.util.concurrent.*;
import java.util.function.Supplier;

/**
* A Bulkhead implementation based on a fixed ThreadPoolExecutor.
Expand All @@ -59,7 +54,7 @@ public class FixedThreadPoolBulkhead implements ThreadPoolBulkhead {
* @param name the name of this bulkhead
* @param bulkheadConfig custom bulkhead configuration
*/
public FixedThreadPoolBulkhead(String name, ThreadPoolBulkheadConfig bulkheadConfig) {
public FixedThreadPoolBulkhead(String name, @Nullable ThreadPoolBulkheadConfig bulkheadConfig) {
this.name = name;
this.config = bulkheadConfig != null ? bulkheadConfig
: ThreadPoolBulkheadConfig.ofDefaults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
package io.github.resilience4j.bulkhead.internal;


import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent;
Expand All @@ -33,6 +30,10 @@
import io.github.resilience4j.core.EventProcessor;
import io.github.resilience4j.core.lang.Nullable;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A Bulkhead implementation based on a semaphore.
*/
Expand Down Expand Up @@ -102,7 +103,11 @@ public void changeConfig(final BulkheadConfig newConfig) {
*/
@Override
public boolean isCallPermitted() {
return tryObtainPermission();
}

@Override
public boolean tryObtainPermission() {
boolean callPermitted = tryEnterBulkhead();

publishBulkheadEvent(
Expand All @@ -113,6 +118,13 @@ public boolean isCallPermitted() {
return callPermitted;
}

@Override
public void obtainPermission() {
if(!tryObtainPermission()) {
throw new BulkheadFullException(this);
}
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;

@Deprecated
public final class BulkheadUtils {

private BulkheadUtils() {
}

public static void isCallPermitted(Bulkhead bulkhead) {
if (!bulkhead.isCallPermitted()) {
throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
}
}
public static void isCallPermitted(Bulkhead bulkhead) {
if(!bulkhead.tryObtainPermission()) {
throw new BulkheadFullException(bulkhead);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void shouldConsumeOnCallRejectedEvent() {
.onCallRejected(event ->
logger.info(event.getEventType().toString()));

bulkhead.isCallPermitted();
bulkhead.tryObtainPermission();

Try.ofSupplier(Bulkhead.decorateSupplier(bulkhead,helloWorldService::returnHelloWorld));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ public void shouldReturnFailureWithBulkheadFullException() {
// Given
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
Bulkhead bulkhead = Bulkhead.of("test", config);
bulkhead.isCallPermitted();
bulkhead.isCallPermitted();
bulkhead.tryObtainPermission();
bulkhead.tryObtainPermission();

// When
CheckedRunnable checkedRunnable = Bulkhead.decorateCheckedRunnable(bulkhead, () -> {throw new RuntimeException("BAM!");});
Expand All @@ -421,7 +421,7 @@ public void shouldReturnFailureWithRuntimeException() {
// Given
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
Bulkhead bulkhead = Bulkhead.of("test", config);
bulkhead.isCallPermitted();
bulkhead.tryObtainPermission();

//v When
CheckedRunnable checkedRunnable = Bulkhead.decorateCheckedRunnable(bulkhead, () -> {throw new RuntimeException("BAM!");});
Expand Down
Loading

0 comments on commit ab0d8a0

Please sign in to comment.