From ab0d8a0c2940f7f8af6f4fc9392cc539662290b8 Mon Sep 17 00:00:00 2001 From: Robert Winkler Date: Tue, 23 Apr 2019 13:22:03 +0200 Subject: [PATCH] Issue #343: CircuitBreaker should only allow a certain number of test requests in HALF_OPEN CircuitBreaker should only allow a certain number of test requests in HALF_OPEN and reject calls when maximum number is reached. The number of requests is equal to ringBufferSizeInHalfOpenState. --- .../decorators/DecoratorsTest.java | 38 +++---- .../concurrent/ConcurrentBulkheadTest.java | 4 +- .../resilience4j/bulkhead/Bulkhead.java | 59 ++++++---- .../bulkhead/BulkheadFullException.java | 9 ++ .../internal/FixedThreadPoolBulkhead.java | 15 +-- .../bulkhead/internal/SemaphoreBulkhead.java | 20 +++- .../bulkhead/utils/BulkheadUtils.java | 14 +-- .../bulkhead/BulkheadEventPublisherTest.java | 2 +- .../resilience4j/bulkhead/BulkheadTest.java | 6 +- .../internal/SemaphoreBulkheadTest.java | 22 ++-- .../CallNotPermittedException.java | 46 ++++++++ .../circuitbreaker/CircuitBreaker.java | 105 ++++++++++++------ .../CircuitBreakerOpenException.java | 11 ++ .../internal/CircuitBreakerState.java | 4 +- .../internal/CircuitBreakerStateMachine.java | 17 ++- .../circuitbreaker/internal/ClosedState.java | 10 +- .../internal/DisabledState.java | 11 +- .../internal/ForcedOpenState.java | 13 ++- .../internal/HalfOpenState.java | 27 ++++- .../circuitbreaker/internal/OpenState.java | 18 ++- .../utils/CircuitBreakerUtils.java | 7 +- .../CallNotPermittedExceptionTest.java | 22 ++++ .../CircuitBreakerEventPublisherTest.java | 4 +- .../CircuitBreakerOpenExceptionTest.java | 22 ++++ .../circuitbreaker/CircuitBreakerTest.java | 19 ++-- .../CircuitBreakerStateMachineTest.java | 105 +++++++++++++----- .../Resilience4jFeignCircuitBreakerTest.java | 27 ++--- .../metrics/StateTransitionMetricsTest.java | 2 +- .../tagged/TaggedBulkheadMetricsTest.java | 4 +- .../BulkheadMetricsCollectorTest.java | 4 +- .../bulkhead/BulkheadMethodInterceptor.java | 4 +- .../ratpack/bulkhead/BulkheadTransformer.java | 2 +- .../CircuitBreakerMethodInterceptor.java | 6 +- .../CircuitBreakerTransformer.java | 6 +- .../endpoint/BulkheadChainSpec.groovy | 12 +- .../circuitbreaker/CircuitBreakerSpec.groovy | 6 +- .../reactor/ResilienceBaseSubscriber.java | 4 +- .../bulkhead/operator/BulkheadSubscriber.java | 4 +- .../operator/CircuitBreakerSubscriber.java | 9 +- .../operator/RateLimiterSubscriber.java | 2 +- .../bulkhead/operator/FluxBulkheadTest.java | 6 +- .../bulkhead/operator/MonoBulkheadTest.java | 6 +- .../bulkhead/operator/BulkheadSubscriber.java | 2 +- .../bulkhead/operator/DisposableBulkhead.java | 2 +- .../operator/CircuitBreakerSubscriber.java | 6 +- .../operator/DisposableCircuitBreaker.java | 6 +- .../BulkheadCompletableObserverTest.java | 18 ++- .../operator/BulkheadMaybeObserverTest.java | 20 ++-- .../operator/BulkheadObserverTest.java | 19 ++-- .../operator/BulkheadSingleObserverTest.java | 20 ++-- .../operator/BulkheadSubscriberTest.java | 19 ++-- .../CircuitBreakerSubscriberTest.java | 17 ++- .../circuitbreaker/VertxCircuitBreaker.java | 6 +- 53 files changed, 563 insertions(+), 306 deletions(-) create mode 100644 resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CallNotPermittedException.java create mode 100644 resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CallNotPermittedExceptionTest.java create mode 100644 resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenExceptionTest.java diff --git a/resilience4j-all/src/test/java/io/github/resilience4j/decorators/DecoratorsTest.java b/resilience4j-all/src/test/java/io/github/resilience4j/decorators/DecoratorsTest.java index af44235598..62ab9b448f 100644 --- a/resilience4j-all/src/test/java/io/github/resilience4j/decorators/DecoratorsTest.java +++ b/resilience4j-all/src/test/java/io/github/resilience4j/decorators/DecoratorsTest.java @@ -18,37 +18,35 @@ */ package io.github.resilience4j.decorators; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; - -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.BDDMockito; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.cache.Cache; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.ratelimiter.RateLimiter; import io.github.resilience4j.ratelimiter.RateLimiterConfig; import io.github.resilience4j.ratelimiter.RequestNotPermitted; -import io.github.resilience4j.retry.AsyncRetry; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.test.HelloWorldService; import io.vavr.CheckedFunction0; import io.vavr.CheckedFunction1; import io.vavr.CheckedRunnable; import io.vavr.control.Try; +import org.junit.Before; +import org.junit.Test; +import org.mockito.BDDMockito; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; public class DecoratorsTest { public boolean state = false; @@ -158,7 +156,7 @@ public void testDecorateCompletionStage() throws ExecutionException, Interrupted CompletionStage completionStage = Decorators.ofCompletionStage(completionStageSupplier) .withCircuitBreaker(circuitBreaker) - .withRetry(AsyncRetry.ofDefaults("id"), Executors.newSingleThreadScheduledExecutor()) + .withRetry(Retry.ofDefaults("id"), Executors.newSingleThreadScheduledExecutor()) .withBulkhead(Bulkhead.ofDefaults("testName")) .get(); diff --git a/resilience4j-bulkhead/src/jcstress/java/io/github/resilience4j/bulkhead/concurrent/ConcurrentBulkheadTest.java b/resilience4j-bulkhead/src/jcstress/java/io/github/resilience4j/bulkhead/concurrent/ConcurrentBulkheadTest.java index c940ccb115..9bdaf84d0d 100644 --- a/resilience4j-bulkhead/src/jcstress/java/io/github/resilience4j/bulkhead/concurrent/ConcurrentBulkheadTest.java +++ b/resilience4j-bulkhead/src/jcstress/java/io/github/resilience4j/bulkhead/concurrent/ConcurrentBulkheadTest.java @@ -63,14 +63,14 @@ public ConcurrentBulkheadTest() { @Actor public void firstActor() { - if (bulkhead.isCallPermitted()) { + if (bulkhead.obtainPermission()) { bulkhead.onComplete(); } } @Actor public void secondActor() { - if (bulkhead.isCallPermitted()) { + if (bulkhead.obtainPermission()) { bulkhead.onComplete(); } } diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java index f543c155c3..f691aa1c69 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/Bulkhead.java @@ -18,25 +18,24 @@ */ package io.github.resilience4j.bulkhead; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - import io.github.resilience4j.bulkhead.event.BulkheadEvent; import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent; import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent; import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent; import io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead; -import io.github.resilience4j.bulkhead.utils.BulkheadUtils; import io.github.resilience4j.core.EventConsumer; import io.vavr.CheckedConsumer; import io.vavr.CheckedFunction0; import io.vavr.CheckedFunction1; import io.vavr.CheckedRunnable; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + /** * A Bulkhead instance is thread-safe can be used to decorate multiple requests. * @@ -45,7 +44,7 @@ * underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of * threads/actors involved in a particular flow, etc). * - * In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#isCallPermitted()} + * In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#tryObtainPermission()} ()} * If the bulkhead is full, no additional operations will be permitted to execute until space is available. * * Once the operation is complete, regardless of the result, client needs to call {@link Bulkhead#onComplete()} in order to maintain @@ -62,12 +61,28 @@ public interface Bulkhead { void changeConfig(BulkheadConfig newConfig); /** - * Attempts to acquire a permit, which allows an call to be executed. + * Attempts to obtain a permission to execute a call. + * @deprecated Use {@link Bulkhead#tryObtainPermission()}. instead. * - * @return boolean whether a call should be executed + * @return boolean whether a call is permitted */ + @Deprecated boolean isCallPermitted(); + /** + * Attempts to obtain a permission to execute a call. + * + * @return boolean whether a call should be executed + */ + boolean tryObtainPermission(); + + /** + * Attempts to obtain a permission to execute a call. + * + * @throws BulkheadFullException when the Bulkhead is full and no further calls are permitted. + */ + void obtainPermission(); + /** * Records a completed call. */ @@ -167,7 +182,7 @@ default CompletionStage executeCompletionStage(Supplier CheckedFunction0 decorateCheckedSupplier(Bulkhead bulkhead, CheckedFunction0 supplier){ return () -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try { return supplier.apply(); } @@ -190,8 +205,8 @@ static Supplier> decorateCompletionStage(Bulkhead bulkhea final CompletableFuture promise = new CompletableFuture<>(); - if (!bulkhead.isCallPermitted()) { - promise.completeExceptionally(new BulkheadFullException(String.format("Bulkhead '%s' is open", bulkhead.getName()))); + if (!bulkhead.tryObtainPermission()) { + promise.completeExceptionally(new BulkheadFullException(bulkhead)); } else { try { @@ -228,7 +243,7 @@ static Supplier> decorateCompletionStage(Bulkhead bulkhea */ static CheckedRunnable decorateCheckedRunnable(Bulkhead bulkhead, CheckedRunnable runnable){ return () -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try{ runnable.run(); } @@ -249,7 +264,7 @@ static CheckedRunnable decorateCheckedRunnable(Bulkhead bulkhead, CheckedRunnabl */ static Callable decorateCallable(Bulkhead bulkhead, Callable callable){ return () -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try { return callable.call(); } @@ -270,7 +285,7 @@ static Callable decorateCallable(Bulkhead bulkhead, Callable callable) */ static Supplier decorateSupplier(Bulkhead bulkhead, Supplier supplier){ return () -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try { return supplier.get(); } @@ -291,7 +306,7 @@ static Supplier decorateSupplier(Bulkhead bulkhead, Supplier supplier) */ static Consumer decorateConsumer(Bulkhead bulkhead, Consumer consumer){ return (t) -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try { consumer.accept(t); } @@ -312,7 +327,7 @@ static Consumer decorateConsumer(Bulkhead bulkhead, Consumer consumer) */ static CheckedConsumer decorateCheckedConsumer(Bulkhead bulkhead, CheckedConsumer consumer){ return (t) -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try { consumer.accept(t); } @@ -332,7 +347,7 @@ static CheckedConsumer decorateCheckedConsumer(Bulkhead bulkhead, Checked */ static Runnable decorateRunnable(Bulkhead bulkhead, Runnable runnable){ return () -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try{ runnable.run(); } @@ -353,7 +368,7 @@ static Runnable decorateRunnable(Bulkhead bulkhead, Runnable runnable){ */ static Function decorateFunction(Bulkhead bulkhead, Function function){ return (T t) -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try{ return function.apply(t); } @@ -374,7 +389,7 @@ static Function decorateFunction(Bulkhead bulkhead, Function */ static CheckedFunction1 decorateCheckedFunction(Bulkhead bulkhead, CheckedFunction1 function){ return (T t) -> { - BulkheadUtils.isCallPermitted(bulkhead); + bulkhead.obtainPermission(); try{ return function.apply(t); } diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadFullException.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadFullException.java index baa6079e86..96c1e1a044 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadFullException.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/BulkheadFullException.java @@ -23,6 +23,15 @@ */ public class BulkheadFullException extends RuntimeException { + /** + * The constructor with a message. + * + * @param bulkhead the Bulkhead. + */ + public BulkheadFullException(Bulkhead bulkhead) { + super(String.format("Bulkhead '%s' is full and does not permit further calls", bulkhead.getName())); + } + /** * The constructor with a message. * diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java index da649f2db2..03831dcf15 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java @@ -19,15 +19,6 @@ package io.github.resilience4j.bulkhead.internal; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - import io.github.resilience4j.bulkhead.BulkheadFullException; import io.github.resilience4j.bulkhead.ThreadPoolBulkhead; import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig; @@ -37,6 +28,10 @@ import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent; import io.github.resilience4j.core.EventConsumer; import io.github.resilience4j.core.EventProcessor; +import io.github.resilience4j.core.lang.Nullable; + +import java.util.concurrent.*; +import java.util.function.Supplier; /** * A Bulkhead implementation based on a fixed ThreadPoolExecutor. @@ -59,7 +54,7 @@ public class FixedThreadPoolBulkhead implements ThreadPoolBulkhead { * @param name the name of this bulkhead * @param bulkheadConfig custom bulkhead configuration */ - public FixedThreadPoolBulkhead(String name, ThreadPoolBulkheadConfig bulkheadConfig) { + public FixedThreadPoolBulkhead(String name, @Nullable ThreadPoolBulkheadConfig bulkheadConfig) { this.name = name; this.config = bulkheadConfig != null ? bulkheadConfig : ThreadPoolBulkheadConfig.ofDefaults(); diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java index 2bb77aa2f2..995204062e 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java @@ -19,12 +19,9 @@ package io.github.resilience4j.bulkhead.internal; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadFullException; import io.github.resilience4j.bulkhead.event.BulkheadEvent; import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent; import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent; @@ -33,6 +30,10 @@ import io.github.resilience4j.core.EventProcessor; import io.github.resilience4j.core.lang.Nullable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + /** * A Bulkhead implementation based on a semaphore. */ @@ -102,7 +103,11 @@ public void changeConfig(final BulkheadConfig newConfig) { */ @Override public boolean isCallPermitted() { + return tryObtainPermission(); + } + @Override + public boolean tryObtainPermission() { boolean callPermitted = tryEnterBulkhead(); publishBulkheadEvent( @@ -113,6 +118,13 @@ public boolean isCallPermitted() { return callPermitted; } + @Override + public void obtainPermission() { + if(!tryObtainPermission()) { + throw new BulkheadFullException(this); + } + } + /** * {@inheritDoc} */ diff --git a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java index 600b81a04a..dfb1bab527 100644 --- a/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java +++ b/resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java @@ -21,14 +21,12 @@ import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadFullException; +@Deprecated public final class BulkheadUtils { - private BulkheadUtils() { - } - - public static void isCallPermitted(Bulkhead bulkhead) { - if (!bulkhead.isCallPermitted()) { - throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName())); - } - } + public static void isCallPermitted(Bulkhead bulkhead) { + if(!bulkhead.tryObtainPermission()) { + throw new BulkheadFullException(bulkhead); + } + } } diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadEventPublisherTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadEventPublisherTest.java index 6a9e08aed9..f0053ce294 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadEventPublisherTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadEventPublisherTest.java @@ -86,7 +86,7 @@ public void shouldConsumeOnCallRejectedEvent() { .onCallRejected(event -> logger.info(event.getEventType().toString())); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); Try.ofSupplier(Bulkhead.decorateSupplier(bulkhead,helloWorldService::returnHelloWorld)); diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadTest.java index c1038e9699..8d923a1726 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadTest.java @@ -402,8 +402,8 @@ public void shouldReturnFailureWithBulkheadFullException() { // Given BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build(); Bulkhead bulkhead = Bulkhead.of("test", config); - bulkhead.isCallPermitted(); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); + bulkhead.tryObtainPermission(); // When CheckedRunnable checkedRunnable = Bulkhead.decorateCheckedRunnable(bulkhead, () -> {throw new RuntimeException("BAM!");}); @@ -421,7 +421,7 @@ public void shouldReturnFailureWithRuntimeException() { // Given BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build(); Bulkhead bulkhead = Bulkhead.of("test", config); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); //v When CheckedRunnable checkedRunnable = Bulkhead.decorateCheckedRunnable(bulkhead, () -> {throw new RuntimeException("BAM!");}); diff --git a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java index f95f3ca3c7..3ab1402fa5 100644 --- a/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java +++ b/resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkheadTest.java @@ -63,12 +63,12 @@ public void shouldReturnTheCorrectName() { @Test public void testBulkhead() throws InterruptedException { - bulkhead.isCallPermitted(); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); bulkhead.onComplete(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); @@ -77,7 +77,7 @@ public void testBulkhead() throws InterruptedException { assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); testSubscriber.assertValueCount(6) .assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_FINISHED, CALL_FINISHED, CALL_PERMITTED); @@ -147,7 +147,7 @@ public void testEntryTimeout() { .build(); SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", config); - bulkhead.isCallPermitted(); // consume the permit + bulkhead.tryObtainPermission(); // consume the permit // when boolean entered = bulkhead.tryEnterBulkhead(); @@ -236,7 +236,7 @@ public void changePermissionsCountWhileOneThreadIsRunningWithThisPermission() { AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true); assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); Thread bulkheadThread = new Thread(() -> { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); while (bulkheadThreadTrigger.get()) { Thread.yield(); } @@ -301,12 +301,12 @@ public void changePermissionsCountWhileOneThreadIsWaitingForPermission() { .build(); SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); Thread bulkheadThread = new Thread(() -> { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); bulkhead.onComplete(); }); bulkheadThread.setDaemon(true); @@ -337,12 +337,12 @@ public void changeWaitingTimeWhileOneThreadIsWaitingForPermission() { .build(); SemaphoreBulkhead bulkhead = new SemaphoreBulkhead("test", originalConfig); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(1); Thread bulkheadThread = new Thread(() -> { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); bulkhead.onComplete(); }); bulkheadThread.setDaemon(true); @@ -379,7 +379,7 @@ public void changePermissionsConcurrently() { AtomicBoolean bulkheadThreadTrigger = new AtomicBoolean(true); assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(3); Thread bulkheadThread = new Thread(() -> { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); while (bulkheadThreadTrigger.get()) { Thread.yield(); } diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CallNotPermittedException.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CallNotPermittedException.java new file mode 100644 index 0000000000..781dea5e03 --- /dev/null +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CallNotPermittedException.java @@ -0,0 +1,46 @@ +/* + * + * Copyright 2016 Robert Winkler + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package io.github.resilience4j.circuitbreaker; + +/** + * A {@link CallNotPermittedException} signals that the CircuitBreaker is HALF_OPEN or OPEN + * and a call is not permitted to be executed. + */ +public class CallNotPermittedException extends CircuitBreakerOpenException { + + /** + * The constructor with a CircuitBreaker. + * + * @param circuitBreaker the CircuitBreaker. + */ + public CallNotPermittedException(CircuitBreaker circuitBreaker) { + super(String.format("CircuitBreaker '%s' is %s and does not permit further calls", circuitBreaker.getName(), circuitBreaker.getState())); + } + + /** + * The constructor with a message. + * + * @param message The message. + */ + public CallNotPermittedException(String message) { + super(message); + } +} + + diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreaker.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreaker.java index ac7d8af31f..ad1fea893b 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreaker.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreaker.java @@ -20,7 +20,6 @@ import io.github.resilience4j.circuitbreaker.event.*; import io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine; -import io.github.resilience4j.circuitbreaker.utils.CircuitBreakerUtils; import io.github.resilience4j.core.EventConsumer; import io.vavr.*; @@ -38,27 +37,58 @@ * A CircuitBreaker instance is thread-safe can be used to decorate multiple requests. * * A {@link CircuitBreaker} manages the state of a backend system. - * The CircuitBreaker is implemented via a finite state machine with three states: CLOSED, OPEN and HALF_OPEN. + * The CircuitBreaker is implemented via a finite state machine with five states: CLOSED, OPEN, HALF_OPEN, DISABLED AND FORCED_OPEN. * The CircuitBreaker does not know anything about the backend's state by itself, but uses the information provided by the decorators via * {@link CircuitBreaker#onSuccess} and {@link CircuitBreaker#onError} events. - * Before communicating with the backend, the the permission to do so must be obtained via the method {@link CircuitBreaker#isCallPermitted()}. + * Before communicating with the backend, the permission to do so must be obtained via the method {@link CircuitBreaker#tryObtainPermission()}. * * The state of the CircuitBreaker changes from CLOSED to OPEN when the failure rate is above a (configurable) threshold. - * Then, all access to the backend is blocked for a (configurable) time duration. {@link CircuitBreaker#isCallPermitted} throws a {@link CircuitBreakerOpenException}, if the CircuitBreaker is OPEN. + * Then, all access to the backend is rejected for a (configurable) time duration. No further calls are permitted. * - * After the time duration has elapsed, the CircuitBreaker state changes from OPEN to HALF_OPEN and allows calls to see if the backend is still unavailable or has become available again. + * After the time duration has elapsed, the CircuitBreaker state changes from OPEN to HALF_OPEN and allows a number of calls to see if the backend is still unavailable or has become available again. * If the failure rate is above the configured threshold, the state changes back to OPEN. If the failure rate is below or equal to the threshold, the state changes back to CLOSED. */ -@SuppressWarnings("ALL") public interface CircuitBreaker { /** - * Requests permission to call this circuitBreaker's backend. + * Attempts to obtain a permission to execute a call. + * @deprecated Use {@link CircuitBreaker#tryObtainPermission()}. instead. * - * @return boolean whether a call should be permitted + * @return true when a call is permitted */ + @Deprecated boolean isCallPermitted(); + /** + * Obtain a permission to execute a call. If a call not not permitted, the number + * of permitted calls is increased. + * + * Returns false when the state is OPEN or FORCED_OPEN. + * Returns true when the state is CLOSED or DISABLED. + * Returns true when the state is HALF_OPEN and further test calls are allowed. + * Returns false when the state is HALF_OPEN and the number of test calls has been reached. + * If the state is HALF_OPEN, the number of allowed test calls is decreased. Make sure to call onSuccess or onError + * after the call is finished. + * + * @return true when a call is permitted + */ + boolean tryObtainPermission(); + + /** + * Obtain a permission to execute a call. If a call not not permitted, the number + * of permitted calls is increased. + * + * Throws a CallNotPermittedException when the state is OPEN or FORCED_OPEN. + * Returns when the state is CLOSED or DISABLED. + * Returns when the state is HALF_OPEN and further test calls are allowed. + * Throws a CallNotPermittedException when the state is HALF_OPEN and the number of test calls has been reached. + * If the state is HALF_OPEN, the number of allowed test calls is decreased. Make sure to call onSuccess or onError + * after the call is finished. + * + * @throws CallNotPermittedException when CircuitBreaker is OPEN or HALF_OPEN and no further test calls are permitted. + */ + void obtainPermission(); + /** * Records a failed call. * This method must be invoked when a call failed. @@ -114,7 +144,7 @@ public interface CircuitBreaker { void transitionToDisabledState(); /** - * Transitions the state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. + * Transitions the state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. * * Should only be used, when you want to disable the circuit breaker allowing no call to pass. * To recover from this state you must force a new state transition @@ -245,7 +275,7 @@ enum State { * @param order * @param allowPublish */ - private State(int order, boolean allowPublish){ + State(int order, boolean allowPublish){ this.order = order; this.allowPublish = allowPublish; } @@ -336,7 +366,7 @@ interface EventPublisher extends io.github.resilience4j.core.EventPublisher eventConsumer); EventPublisher onCallNotPermitted(EventConsumer eventConsumer); - } + } interface Metrics { @@ -397,7 +427,7 @@ interface Metrics { */ static CheckedFunction0 decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0 supplier){ return () -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try { T returnValue = supplier.apply(); @@ -430,26 +460,31 @@ static Supplier> decorateCompletionStage( final CompletableFuture promise = new CompletableFuture<>(); - if (!circuitBreaker.isCallPermitted()) { + if (!circuitBreaker.tryObtainPermission()) { promise.completeExceptionally( - new CircuitBreakerOpenException( - String.format("CircuitBreaker '%s' is open", circuitBreaker.getName()))); + new CallNotPermittedException(circuitBreaker)); } else { final long start = System.nanoTime(); - supplier.get().whenComplete((result, throwable) -> { + try { + supplier.get().whenComplete((result, throwable) -> { + long durationInNanos = System.nanoTime() - start; + if (result != null) { + circuitBreaker.onSuccess(durationInNanos); + promise.complete(result); + } else if (throwable instanceof Exception) { + circuitBreaker.onError(durationInNanos, throwable); + promise.completeExceptionally(throwable); + } else { + // Do not handle java.lang.Error + promise.completeExceptionally(throwable); + } + }); + }catch (Exception exception){ long durationInNanos = System.nanoTime() - start; - if (result != null) { - circuitBreaker.onSuccess(durationInNanos); - promise.complete(result); - } else if (throwable instanceof Exception) { - circuitBreaker.onError(durationInNanos, throwable); - promise.completeExceptionally(throwable); - } else{ - // Do not handle java.lang.Error - promise.completeExceptionally(throwable); - } - }); + circuitBreaker.onError(durationInNanos, exception); + promise.completeExceptionally(exception); + } } return promise; @@ -466,7 +501,7 @@ static Supplier> decorateCompletionStage( */ static CheckedRunnable decorateCheckedRunnable(CircuitBreaker circuitBreaker, CheckedRunnable runnable){ return () -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try{ runnable.run(); @@ -492,7 +527,7 @@ static CheckedRunnable decorateCheckedRunnable(CircuitBreaker circuitBreaker, Ch */ static Callable decorateCallable(CircuitBreaker circuitBreaker, Callable callable){ return () -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try { T returnValue = callable.call(); @@ -519,7 +554,7 @@ static Callable decorateCallable(CircuitBreaker circuitBreaker, Callable< */ static Supplier decorateSupplier(CircuitBreaker circuitBreaker, Supplier supplier){ return () -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try { T returnValue = supplier.get(); @@ -546,7 +581,7 @@ static Supplier decorateSupplier(CircuitBreaker circuitBreaker, Supplier< */ static Consumer decorateConsumer(CircuitBreaker circuitBreaker, Consumer consumer){ return (t) -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try { consumer.accept(t); @@ -572,7 +607,7 @@ static Consumer decorateConsumer(CircuitBreaker circuitBreaker, Consumer< */ static CheckedConsumer decorateCheckedConsumer(CircuitBreaker circuitBreaker, CheckedConsumer consumer){ return (t) -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try { consumer.accept(t); @@ -597,7 +632,7 @@ static CheckedConsumer decorateCheckedConsumer(CircuitBreaker circuitBrea */ static Runnable decorateRunnable(CircuitBreaker circuitBreaker, Runnable runnable){ return () -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try{ runnable.run(); @@ -623,7 +658,7 @@ static Runnable decorateRunnable(CircuitBreaker circuitBreaker, Runnable runnabl */ static Function decorateFunction(CircuitBreaker circuitBreaker, Function function){ return (T t) -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try{ R returnValue = function.apply(t); @@ -650,7 +685,7 @@ static Function decorateFunction(CircuitBreaker circuitBreaker, Fun */ static CheckedFunction1 decorateCheckedFunction(CircuitBreaker circuitBreaker, CheckedFunction1 function){ return (T t) -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); + circuitBreaker.obtainPermission(); long start = System.nanoTime(); try{ R returnValue = function.apply(t); diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenException.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenException.java index 2cbe70263d..354ce3dd34 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenException.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenException.java @@ -20,9 +20,20 @@ /** * A {@link CircuitBreakerOpenException} signals that the CircuitBreaker is OPEN. + * @deprecated use {@link CallNotPermittedException} instead */ +@Deprecated public class CircuitBreakerOpenException extends RuntimeException { + /** + * The constructor with a CircuitBreaker. + * + * @param circuitBreaker the CircuitBreaker. + */ + public CircuitBreakerOpenException(CircuitBreaker circuitBreaker) { + super(String.format("CircuitBreaker '%s' is %s and does not permit further calls", circuitBreaker.getName(), circuitBreaker.getState())); + } + /** * The constructor with a message. * diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java index 07b0ed2fa1..c616967209 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerState.java @@ -32,7 +32,9 @@ abstract class CircuitBreakerState{ this.stateMachine = stateMachine; } - abstract boolean isCallPermitted(); + abstract boolean tryObtainPermission(); + + abstract void obtainPermission(); abstract void onError(Throwable throwable); diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java index 764ca3ae91..ece14c4223 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java @@ -125,13 +125,28 @@ public CircuitBreakerStateMachine(String name, Supplier ci */ @Override public boolean isCallPermitted() { - boolean callPermitted = stateReference.get().isCallPermitted(); + return tryObtainPermission(); + } + + @Override + public boolean tryObtainPermission() { + boolean callPermitted = stateReference.get().tryObtainPermission(); if (!callPermitted) { publishCallNotPermittedEvent(); } return callPermitted; } + @Override + public void obtainPermission() { + try { + stateReference.get().obtainPermission(); + } catch(Exception e) { + publishCallNotPermittedEvent(); + throw e; + } + } + @Override public void onError(long durationInNanos, Throwable throwable) { // Handle the case if the completable future throw CompletionException wrapping the original exception diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ClosedState.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ClosedState.java index 6852fde063..7c39833f31 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ClosedState.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ClosedState.java @@ -49,10 +49,18 @@ final class ClosedState extends CircuitBreakerState { * @return always true, because the CircuitBreaker is closed. */ @Override - boolean isCallPermitted() { + boolean tryObtainPermission() { return true; } + /** + * Does not throw an exception, because the CircuitBreaker is closed. + */ + @Override + void obtainPermission() { + // noOp + } + @Override void onError(Throwable throwable) { // CircuitBreakerMetrics is thread-safe diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/DisabledState.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/DisabledState.java index f0d68e04aa..deb4edfaa4 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/DisabledState.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/DisabledState.java @@ -38,10 +38,19 @@ final class DisabledState extends CircuitBreakerState { * @return always true, because the CircuitBreaker is disabled. */ @Override - boolean isCallPermitted() { + boolean tryObtainPermission() { return true; } + /** + * Does not throw an exception, because the CircuitBreaker is disabled. + */ + @Override + void obtainPermission() { + // noOp + } + + @Override void onError(Throwable throwable) { // noOp diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ForcedOpenState.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ForcedOpenState.java index bb7ef9eaf4..2baf68bf57 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ForcedOpenState.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/ForcedOpenState.java @@ -20,6 +20,7 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; final class ForcedOpenState extends CircuitBreakerState { @@ -38,13 +39,19 @@ final class ForcedOpenState extends CircuitBreakerState { * @return always false, since the FORCED_OPEN state always denies calls. */ @Override - boolean isCallPermitted() { + boolean tryObtainPermission() { circuitBreakerMetrics.onCallNotPermitted(); return false; } + @Override + void obtainPermission() { + circuitBreakerMetrics.onCallNotPermitted(); + throw new CallNotPermittedException(stateMachine); + } + /** - * Should never be called when isCallPermitted returns false. + * Should never be called when tryObtainPermission returns false. */ @Override void onError(Throwable throwable) { @@ -52,7 +59,7 @@ void onError(Throwable throwable) { } /** - * Should never be called when isCallPermitted returns false. + * Should never be called when tryObtainPermission returns false. */ @Override void onSuccess() { diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/HalfOpenState.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/HalfOpenState.java index ab794e48ad..b0ccfc6eaf 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/HalfOpenState.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/HalfOpenState.java @@ -18,13 +18,17 @@ */ package io.github.resilience4j.circuitbreaker.internal; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import java.util.concurrent.atomic.AtomicInteger; + final class HalfOpenState extends CircuitBreakerState { private CircuitBreakerMetrics circuitBreakerMetrics; private final float failureRateThreshold; + private final AtomicInteger testRequestCounter; HalfOpenState(CircuitBreakerStateMachine stateMachine) { super(stateMachine); @@ -32,16 +36,31 @@ final class HalfOpenState extends CircuitBreakerState { this.circuitBreakerMetrics = new CircuitBreakerMetrics( circuitBreakerConfig.getRingBufferSizeInHalfOpenState()); this.failureRateThreshold = stateMachine.getCircuitBreakerConfig().getFailureRateThreshold(); + this.testRequestCounter = new AtomicInteger(circuitBreakerConfig.getRingBufferSizeInHalfOpenState()); } /** - * Returns always true, because the CircuitBreaker is half open. + * Checks if test request is allowed. + * + * Returns true, if test request counter is not zero. + * Returns false, if test request counter is zero. * - * @return always true, because the CircuitBreaker is half open. + * @return true, if test request counter is not zero. */ @Override - boolean isCallPermitted() { - return true; + boolean tryObtainPermission() { + if (testRequestCounter.getAndDecrement() > 0) { + return true; + } + circuitBreakerMetrics.onCallNotPermitted(); + return false; + } + + @Override + void obtainPermission() { + if(!tryObtainPermission()){ + throw new CallNotPermittedException(stateMachine); + } } @Override diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/OpenState.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/OpenState.java index 2e693a813b..1c9859f3cd 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/OpenState.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/OpenState.java @@ -18,6 +18,7 @@ */ package io.github.resilience4j.circuitbreaker.internal; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import java.time.Clock; @@ -52,7 +53,7 @@ final class OpenState extends CircuitBreakerState { * @return false, if the wait duration has not elapsed. true, if the wait duration has elapsed. */ @Override - boolean isCallPermitted() { + boolean tryObtainPermission() { // Thread-safe if (clock.instant().isAfter(retryAfterWaitDuration)) { stateMachine.transitionToHalfOpenState(); @@ -62,23 +63,30 @@ boolean isCallPermitted() { return false; } + @Override + void obtainPermission() { + if(!tryObtainPermission()){ + throw new CallNotPermittedException(stateMachine); + } + } + /** - * Should never be called when isCallPermitted returns false. + * Should never be called when tryObtainPermission returns false. */ @Override void onError(Throwable throwable) { - // Could be called when Thread 1 invokes isCallPermitted when the state is CLOSED, but in the meantime another + // Could be called when Thread 1 invokes obtainPermission when the state is CLOSED, but in the meantime another // Thread 2 calls onError and the state changes from CLOSED to OPEN before Thread 1 calls onError. // But the onError event should still be recorded, even if it happened after the state transition. circuitBreakerMetrics.onError(); } /** - * Should never be called when isCallPermitted returns false. + * Should never be called when tryObtainPermission returns false. */ @Override void onSuccess() { - // Could be called when Thread 1 invokes isCallPermitted when the state is CLOSED, but in the meantime another + // Could be called when Thread 1 invokes obtainPermission when the state is CLOSED, but in the meantime another // Thread 2 calls onError and the state changes from CLOSED to OPEN before Thread 1 calls onSuccess. // But the onSuccess event should still be recorded, even if it happened after the state transition. circuitBreakerMetrics.onSuccess(); diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/utils/CircuitBreakerUtils.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/utils/CircuitBreakerUtils.java index fb4af604ce..34b7f54fa4 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/utils/CircuitBreakerUtils.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/utils/CircuitBreakerUtils.java @@ -18,16 +18,17 @@ */ package io.github.resilience4j.circuitbreaker.utils; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; +@Deprecated public final class CircuitBreakerUtils { private CircuitBreakerUtils(){} public static void isCallPermitted(CircuitBreaker circuitBreaker) { - if(!circuitBreaker.isCallPermitted()) { - throw new CircuitBreakerOpenException(String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())); + if(!circuitBreaker.tryObtainPermission()) { + throw new CallNotPermittedException(circuitBreaker); } } } diff --git a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CallNotPermittedExceptionTest.java b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CallNotPermittedExceptionTest.java new file mode 100644 index 0000000000..771de00d05 --- /dev/null +++ b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CallNotPermittedExceptionTest.java @@ -0,0 +1,22 @@ +package io.github.resilience4j.circuitbreaker; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CallNotPermittedExceptionTest { + + @Test + public void shouldReturnCorrectMessageWhenStateIsOpen(){ + CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); + circuitBreaker.transitionToOpenState(); + assertThat(new CallNotPermittedException(circuitBreaker).getMessage()).isEqualTo("CircuitBreaker 'testName' is OPEN and does not permit further calls"); + } + + @Test + public void shouldReturnCorrectMessageWhenStateIsForcedOpen(){ + CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); + circuitBreaker.transitionToForcedOpenState(); + assertThat(new CallNotPermittedException(circuitBreaker).getMessage()).isEqualTo("CircuitBreaker 'testName' is FORCED_OPEN and does not permit further calls"); + } +} diff --git a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerEventPublisherTest.java b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerEventPublisherTest.java index 20d57ac0e6..bb25945cb5 100644 --- a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerEventPublisherTest.java +++ b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerEventPublisherTest.java @@ -116,7 +116,7 @@ public void shouldConsumeCallNotPermittedEvent() { circuitBreaker.onError(1000, new IOException("BAM!")); circuitBreaker.onError(1000, new IOException("BAM!")); - circuitBreaker.isCallPermitted(); + circuitBreaker.tryObtainPermission(); then(logger).should(times(1)).info("NOT_PERMITTED"); } @@ -135,7 +135,7 @@ public void shouldNotProduceEventsInDisabledState() { //And we execute other calls that should generate events circuitBreaker.onError(1000, new IOException("BAM!")); circuitBreaker.onError(1000, new IOException("BAM!")); - circuitBreaker.isCallPermitted(); + circuitBreaker.tryObtainPermission(); circuitBreaker.onSuccess(0); circuitBreaker.onError(1000, new IOException("BAM!")); diff --git a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenExceptionTest.java b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenExceptionTest.java new file mode 100644 index 0000000000..4040ac6ede --- /dev/null +++ b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerOpenExceptionTest.java @@ -0,0 +1,22 @@ +package io.github.resilience4j.circuitbreaker; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CircuitBreakerOpenExceptionTest { + + @Test + public void shouldReturnCorrectMessageWhenStateIsOpen(){ + CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); + circuitBreaker.transitionToOpenState(); + assertThat(new CircuitBreakerOpenException(circuitBreaker).getMessage()).isEqualTo("CircuitBreaker 'testName' is OPEN and does not permit further calls"); + } + + @Test + public void shouldReturnCorrectMessageWhenStateIsForcedOpen(){ + CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); + circuitBreaker.transitionToForcedOpenState(); + assertThat(new CircuitBreakerOpenException(circuitBreaker).getMessage()).isEqualTo("CircuitBreaker 'testName' is FORCED_OPEN and does not permit further calls"); + } +} diff --git a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerTest.java b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerTest.java index 6cafa9324f..b0635c82bf 100644 --- a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerTest.java +++ b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerTest.java @@ -33,10 +33,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.time.Duration; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -751,7 +748,7 @@ public void shouldExecuteCompletionStageAndReturnWithSuccess() throws ExecutionE } @Test - public void shouldRethrowExceptionAndNotRecordAsAFailure() { + public void shouldDecorateCompletionStageAndReturnWithExceptionAtSyncStage() { // Given CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName"); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); @@ -759,19 +756,19 @@ public void shouldRethrowExceptionAndNotRecordAsAFailure() { // When Supplier> completionStageSupplier = () -> { - throw new WebServiceException("BAM! At sync stage"); + throw new CompletionException(new RuntimeException("BAM! At sync stage")); }; Supplier> decoratedCompletionStageSupplier = CircuitBreaker.decorateCompletionStage(circuitBreaker, completionStageSupplier); - Try> result = Try.of(decoratedCompletionStageSupplier::get); - assertThat(result.isFailure()).isEqualTo(true); - assertThat(result.failed().get()).isInstanceOf(WebServiceException.class); + CompletionStage decoratedCompletionStage = decoratedCompletionStageSupplier.get(); + assertThatThrownBy(decoratedCompletionStage.toCompletableFuture()::get) + .isInstanceOf(ExecutionException.class).hasCause(new RuntimeException("BAM! At sync stage")); CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics(); - assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(0); - assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(0); + assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1); + assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(1); } @Test diff --git a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachineTest.java b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachineTest.java index 678ff7f6cb..f9b14a91b7 100644 --- a/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachineTest.java +++ b/resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachineTest.java @@ -19,6 +19,7 @@ package io.github.resilience4j.circuitbreaker.internal; import com.statemachinesystems.mockclock.MockClock; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import org.junit.Before; @@ -27,6 +28,7 @@ import java.time.Duration; import java.time.ZoneId; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.BDDAssertions.assertThat; public class CircuitBreakerStateMachineTest { @@ -51,34 +53,82 @@ public void shouldReturnTheCorrectName() { assertThat(circuitBreaker.getName()).isEqualTo("testName"); } + @Test() + public void shouldThrowCallNotPermittedExceptionWhenStateIsOpen() { + circuitBreaker.transitionToOpenState(); + assertThatThrownBy(circuitBreaker::obtainPermission).isInstanceOf(CallNotPermittedException.class); + assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(1); + } + + @Test() + public void shouldThrowCallNotPermittedExceptionWhenStateIsForcedOpen() { + circuitBreaker.transitionToForcedOpenState(); + assertThatThrownBy(circuitBreaker::obtainPermission).isInstanceOf(CallNotPermittedException.class); + assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(1); + } + + @Test + public void shouldThrowCallNotPermittedExceptionWhenNotFurtherTestCallsArePermitted() { + circuitBreaker.transitionToOpenState(); + circuitBreaker.transitionToHalfOpenState(); + circuitBreaker.tryObtainPermission(); + circuitBreaker.tryObtainPermission(); + circuitBreaker.tryObtainPermission(); + circuitBreaker.tryObtainPermission(); + assertThatThrownBy(circuitBreaker::obtainPermission).isInstanceOf(CallNotPermittedException.class); + assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(1); + } + + @Test + public void shouldOnlyAllowFourTestRequests() { + assertThatMetricsAreReset(); + circuitBreaker.transitionToOpenState(); + circuitBreaker.transitionToHalfOpenState(); + assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.HALF_OPEN); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + circuitBreaker.tryObtainPermission(); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); + circuitBreaker.transitionToOpenState(); + circuitBreaker.transitionToHalfOpenState(); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + circuitBreaker.tryObtainPermission(); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); + } + @Test public void shouldOpenAfterRingBufferIsFull() { // A ring buffer with size 5 is used in closed state // Initially the CircuitBreaker is closed - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); assertThatMetricsAreReset(); // Call 1 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); assertCircuitBreakerMetricsEqualTo(-1f, 0, 1, 5, 1, 0L); // Call 2 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); assertCircuitBreakerMetricsEqualTo(-1f, 0, 2, 5, 2, 0L); // Call 3 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); assertCircuitBreakerMetricsEqualTo(-1f, 0, 3, 5, 3, 0L); // Call 4 is a success - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should create a CircuitBreakerOnSuccessEvent assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); assertCircuitBreakerMetricsEqualTo(-1f, 1, 4, 5, 3, 0L); @@ -93,8 +143,8 @@ public void shouldOpenAfterRingBufferIsFull() { assertThat(circuitBreaker.getMetrics().getFailureRate()).isEqualTo(60.0f); assertCircuitBreakerMetricsEqualTo(60.0f, 2, 5, 5, 3, 0L); - // Call to isCallPermitted records a notPermittedCall - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(false); + // Call to tryObtainPermission records a notPermittedCall + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); assertCircuitBreakerMetricsEqualTo(60.0f, 2, 5, 5, 3, 1L); } @@ -105,20 +155,20 @@ public void shouldTransitionToHalfOpenAfterWaitDuration() { assertThatMetricsAreReset(); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.OPEN); - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(false); // Should create a CircuitBreakerOnCallNotPermittedEvent + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); // Should create a CircuitBreakerOnCallNotPermittedEvent mockClock.advanceBySeconds(3); // The CircuitBreaker is still open, because the wait duration of 5 seconds is not elapsed. assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.OPEN); - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(false); // Should create a CircuitBreakerOnCallNotPermittedEvent + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); // Should create a CircuitBreakerOnCallNotPermittedEvent assertCircuitBreakerMetricsEqualTo(-1f, 0, 0, 5, 0, 2L); mockClock.advanceBySeconds(3); // The CircuitBreaker switches to half open, because the wait duration of 5 seconds is elapsed. - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.HALF_OPEN); // Should create a CircuitBreakerOnStateTransitionEvent (9) // Metrics are reset assertCircuitBreakerMetricsEqualTo(-1f, 0, 0, 4, 0, 0L); @@ -134,17 +184,17 @@ public void shouldTransitionBackToOpenStateWhenFailureIsAboveThreshold() { // A ring buffer with size 3 is used in half open state // Call 1 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent // Call 2 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent // Call 3 is a success - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should create a CircuitBreakerOnSuccessEvent (12) // Call 2 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent // The ring buffer is filled and the failure rate is above 50% @@ -163,29 +213,29 @@ public void shouldTransitionBackToClosedStateWhenFailureIsBelowThreshold() { assertCircuitBreakerMetricsEqualTo(-1f, 0, 0, 4, 0, 0L); // Call 1 is a failure - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should create a CircuitBreakerOnErrorEvent // Call 2 is a success - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should create a CircuitBreakerOnSuccessEvent // Call 3 is a success - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should create a CircuitBreakerOnSuccessEvent // Call 4 is a success - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should create a CircuitBreakerOnSuccessEvent // The ring buffer is filled and the failure rate is below 50% // The state machine transitions back to CLOSED state - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); // Should create a CircuitBreakerOnStateTransitionEvent assertCircuitBreakerMetricsEqualTo(-1f, 3, 4, 5, 1, 0L); // // Call 5 is a success and fills the buffer in closed state - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should create a CircuitBreakerOnSuccessEvent assertCircuitBreakerMetricsEqualTo(20.0f, 4, 5, 5, 1, 0L); @@ -213,16 +263,19 @@ public void shouldDisableCircuitBreaker() { assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.DISABLED); // Should create a CircuitBreakerOnStateTransitionEvent (21) assertThatMetricsAreReset(); - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onSuccess(0); // Should not create a CircuitBreakerOnSuccessEvent - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); + circuitBreaker.onError(0, new RuntimeException()); // Should not create a CircuitBreakerOnErrorEvent + + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should not create a CircuitBreakerOnErrorEvent - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(true); circuitBreaker.onError(0, new RuntimeException()); // Should not create a CircuitBreakerOnErrorEvent - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(true); + circuitBreaker.obtainPermission(); circuitBreaker.onError(0, new RuntimeException()); // Should not create a CircuitBreakerOnErrorEvent assertThatMetricsAreReset(); @@ -234,11 +287,11 @@ public void shouldForceOpenCircuitBreaker() { assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.FORCED_OPEN); // Should create a CircuitBreakerOnStateTransitionEvent - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(false); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); mockClock.advanceBySeconds(6); - assertThat(circuitBreaker.isCallPermitted()).isEqualTo(false); + assertThat(circuitBreaker.tryObtainPermission()).isEqualTo(false); // The CircuitBreaker should not transition to half open, even if the wait duration of 5 seconds is elapsed. diff --git a/resilience4j-feign/src/test/java/io/github/resilience4j/feign/Resilience4jFeignCircuitBreakerTest.java b/resilience4j-feign/src/test/java/io/github/resilience4j/feign/Resilience4jFeignCircuitBreakerTest.java index 9e2929ed01..930bd12969 100644 --- a/resilience4j-feign/src/test/java/io/github/resilience4j/feign/Resilience4jFeignCircuitBreakerTest.java +++ b/resilience4j-feign/src/test/java/io/github/resilience4j/feign/Resilience4jFeignCircuitBreakerTest.java @@ -16,27 +16,20 @@ */ package io.github.resilience4j.feign; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; -import static com.github.tomakehurst.wiremock.client.WireMock.verify; -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - import com.github.tomakehurst.wiremock.junit.WireMockRule; - import feign.FeignException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import io.github.resilience4j.feign.test.TestService; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.time.Duration; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests the integration of the {@link Resilience4jFeign} with {@link CircuitBreaker} @@ -118,7 +111,7 @@ public void testCircuitBreakerOpen() throws Exception { assertThat(exceptionThrown) .describedAs("CircuitBreakerOpenException thrown") .isTrue(); - assertThat(circuitBreaker.isCallPermitted()) + assertThat(circuitBreaker.tryObtainPermission()) .describedAs("CircuitBreaker Closed") .isFalse(); } @@ -146,7 +139,7 @@ public void testCircuitBreakerClosed() throws Exception { assertThat(exceptionThrown) .describedAs("CircuitBreakerOpenException thrown") .isFalse(); - assertThat(circuitBreaker.isCallPermitted()) + assertThat(circuitBreaker.tryObtainPermission()) .describedAs("CircuitBreaker Closed") .isTrue(); } diff --git a/resilience4j-metrics/src/test/java/io/github/resilience4j/metrics/StateTransitionMetricsTest.java b/resilience4j-metrics/src/test/java/io/github/resilience4j/metrics/StateTransitionMetricsTest.java index 07e25a9f91..72e0f915b2 100644 --- a/resilience4j-metrics/src/test/java/io/github/resilience4j/metrics/StateTransitionMetricsTest.java +++ b/resilience4j-metrics/src/test/java/io/github/resilience4j/metrics/StateTransitionMetricsTest.java @@ -75,7 +75,7 @@ public void circuitBreakerMetricsUsesFirstStateObjectInstance() throws Exception await().atMost(1500, TimeUnit.MILLISECONDS) .until(() -> { - circuitBreaker.isCallPermitted(); + circuitBreaker.tryObtainPermission(); return circuitBreaker.getState().equals(CircuitBreaker.State.HALF_OPEN); }); diff --git a/resilience4j-micrometer/src/test/java/io/github/resilience4j/micrometer/tagged/TaggedBulkheadMetricsTest.java b/resilience4j-micrometer/src/test/java/io/github/resilience4j/micrometer/tagged/TaggedBulkheadMetricsTest.java index 3ec64d75a2..6d51a43210 100644 --- a/resilience4j-micrometer/src/test/java/io/github/resilience4j/micrometer/tagged/TaggedBulkheadMetricsTest.java +++ b/resilience4j-micrometer/src/test/java/io/github/resilience4j/micrometer/tagged/TaggedBulkheadMetricsTest.java @@ -45,8 +45,8 @@ public void setUp() { bulkhead = bulkheadRegistry.bulkhead("backendA"); // record some basic stats - bulkhead.isCallPermitted(); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); + bulkhead.tryObtainPermission(); TaggedBulkheadMetrics.ofBulkheadRegistry(bulkheadRegistry).bindTo(meterRegistry); } diff --git a/resilience4j-prometheus/src/test/java/io/github/resilience4j/prometheus/collectors/BulkheadMetricsCollectorTest.java b/resilience4j-prometheus/src/test/java/io/github/resilience4j/prometheus/collectors/BulkheadMetricsCollectorTest.java index 2aeb26efef..aa7a632749 100644 --- a/resilience4j-prometheus/src/test/java/io/github/resilience4j/prometheus/collectors/BulkheadMetricsCollectorTest.java +++ b/resilience4j-prometheus/src/test/java/io/github/resilience4j/prometheus/collectors/BulkheadMetricsCollectorTest.java @@ -35,8 +35,8 @@ public void setup() { registry = new CollectorRegistry(); bulkhead = Bulkhead.ofDefaults("backendA"); // record some basic stats - bulkhead.isCallPermitted(); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); + bulkhead.tryObtainPermission(); BulkheadMetricsCollector.ofBulkhead(bulkhead).register(registry); } diff --git a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadMethodInterceptor.java b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadMethodInterceptor.java index ece01618c5..1abb98cc24 100644 --- a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadMethodInterceptor.java +++ b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadMethodInterceptor.java @@ -76,7 +76,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable { } return result; } else if (CompletionStage.class.isAssignableFrom(returnType)) { - if (bulkhead.isCallPermitted()) { + if (bulkhead.tryObtainPermission()) { return ((CompletionStage) invocation.proceed()).handle((o, throwable) -> { bulkhead.onComplete(); if (throwable != null) { @@ -104,7 +104,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable { } private Object handleOther(MethodInvocation invocation, io.github.resilience4j.bulkhead.Bulkhead bulkhead, RecoveryFunction recoveryFunction) throws Throwable { - boolean permission = bulkhead.isCallPermitted(); + boolean permission = bulkhead.tryObtainPermission(); if (!permission) { Throwable t = new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName())); diff --git a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadTransformer.java b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadTransformer.java index 7d96ff3492..af1f7f985b 100644 --- a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadTransformer.java +++ b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadTransformer.java @@ -58,7 +58,7 @@ public BulkheadTransformer recover(Function recoverer @Override public Upstream apply(Upstream upstream) throws Exception { return down -> { - if (bulkhead.isCallPermitted()) { + if (bulkhead.tryObtainPermission()) { // do not allow permits to leak upstream.connect(new Downstream() { diff --git a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerMethodInterceptor.java b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerMethodInterceptor.java index 52b6a3aeff..bd36718bec 100644 --- a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerMethodInterceptor.java +++ b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerMethodInterceptor.java @@ -16,7 +16,7 @@ package io.github.resilience4j.ratpack.circuitbreaker; import com.google.inject.Inject; -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import io.github.resilience4j.core.lang.Nullable; import io.github.resilience4j.ratpack.recovery.RecoveryFunction; @@ -75,7 +75,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable { return result; } else if (CompletionStage.class.isAssignableFrom(returnType)) { final CompletableFuture promise = new CompletableFuture<>(); - if (breaker.isCallPermitted()) { + if (breaker.tryObtainPermission()) { CompletionStage result = (CompletionStage) proceed(invocation, breaker, recoveryFunction); if (result != null) { long start = System.nanoTime(); @@ -95,7 +95,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable { }); } } else { - Throwable t = new CircuitBreakerOpenException(String.format("CircuitBreaker '%s' is open", breaker.getName())); + Throwable t = new CallNotPermittedException(breaker); try { promise.complete(recoveryFunction.apply((Throwable) t)); } catch (Throwable t2) { diff --git a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerTransformer.java b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerTransformer.java index eb779995a3..28d26c6c7c 100644 --- a/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerTransformer.java +++ b/resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerTransformer.java @@ -15,8 +15,8 @@ */ package io.github.resilience4j.ratpack.circuitbreaker; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import io.github.resilience4j.ratpack.internal.AbstractTransformer; import ratpack.exec.Downstream; import ratpack.exec.Upstream; @@ -57,7 +57,7 @@ public CircuitBreakerTransformer recover(Function rec public Upstream apply(Upstream upstream) throws Exception { return down -> { long start; - if (circuitBreaker.isCallPermitted()) { + if (circuitBreaker.tryObtainPermission()) { start = System.nanoTime(); upstream.connect(new Downstream() { @@ -81,7 +81,7 @@ public void complete() { } }); } else { - Throwable t = new CircuitBreakerOpenException(String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())); + Throwable t = new CallNotPermittedException(circuitBreaker); handleRecovery(down, t); } }; diff --git a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/endpoint/BulkheadChainSpec.groovy b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/endpoint/BulkheadChainSpec.groovy index 4b84cfe72c..01d75313fa 100644 --- a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/endpoint/BulkheadChainSpec.groovy +++ b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/bulkhead/endpoint/BulkheadChainSpec.groovy @@ -67,8 +67,8 @@ class BulkheadChainSpec extends Specification { and: "some bulkhead events" ['test1', 'test2'].each { bulkheadRegistry.bulkhead(it).with { - isCallPermitted() - isCallPermitted() + tryObtainPermission() + tryObtainPermission() onComplete() } } @@ -146,8 +146,8 @@ class BulkheadChainSpec extends Specification { when: "we get all bulkhead events" ['test1', 'test2'].each { bulkheadRegistry.bulkhead(it).with { - isCallPermitted() - isCallPermitted() + tryObtainPermission() + tryObtainPermission() onComplete() } } @@ -219,8 +219,8 @@ class BulkheadChainSpec extends Specification { and: "some bulkhead events" ['test1', 'test2'].each { bulkheadRegistry.bulkhead(it).with { - isCallPermitted() - isCallPermitted() + tryObtainPermission() + tryObtainPermission() onComplete() } } diff --git a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerSpec.groovy b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerSpec.groovy index fc41a42cca..76ca770173 100644 --- a/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerSpec.groovy +++ b/resilience4j-ratpack/src/test/groovy/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerSpec.groovy @@ -157,7 +157,7 @@ class CircuitBreakerSpec extends Specification { then: actual.body.text == expectedText actual.statusCode == 200 - breaker.callPermitted + breaker.tryObtainPermission() breaker.state == io.github.resilience4j.circuitbreaker.CircuitBreaker.State.CLOSED when: @@ -166,7 +166,7 @@ class CircuitBreakerSpec extends Specification { then: actual.statusCode == 500 - !breaker.callPermitted + !breaker.tryObtainPermission() breaker.state == io.github.resilience4j.circuitbreaker.CircuitBreaker.State.OPEN when: @@ -176,7 +176,7 @@ class CircuitBreakerSpec extends Specification { then: actual.body.text == "recovered" actual.statusCode == 200 - !breaker.callPermitted + !breaker.tryObtainPermission() breaker.state == io.github.resilience4j.circuitbreaker.CircuitBreaker.State.OPEN where: diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ResilienceBaseSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ResilienceBaseSubscriber.java index 66d258b075..4a7a0e83ce 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ResilienceBaseSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ResilienceBaseSubscriber.java @@ -154,12 +154,12 @@ protected void hookOnPermitAcquired() { /** * @return true if call is permitted, false otherwise */ - protected abstract boolean isCallPermitted(); + protected abstract boolean obtainPermission(); protected boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { - callPermitted = isCallPermitted(); + callPermitted = obtainPermission(); if (!callPermitted) { permitted.set(Permit.REJECTED); } else { diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java index 4b66440472..634cefaabd 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java @@ -59,8 +59,8 @@ public void hookOnError(Throwable t) { } @Override - protected boolean isCallPermitted() { - return bulkhead.isCallPermitted(); + protected boolean obtainPermission() { + return bulkhead.tryObtainPermission(); } @Override diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java index 376a234dbe..53d9028582 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java @@ -15,8 +15,8 @@ */ package io.github.resilience4j.reactor.circuitbreaker.operator; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import io.github.resilience4j.core.StopWatch; import io.github.resilience4j.core.lang.Nullable; import io.github.resilience4j.reactor.ResilienceBaseSubscriber; @@ -90,14 +90,13 @@ protected void hookOnPermitAcquired() { } @Override - protected boolean isCallPermitted() { - return circuitBreaker.isCallPermitted(); + protected boolean obtainPermission() { + return circuitBreaker.tryObtainPermission(); } @Override protected Throwable getThrowable() { - return new CircuitBreakerOpenException( - String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())); + return new CallNotPermittedException(circuitBreaker); } private void markFailure(Throwable e) { diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java index e25ba6659d..106d8e9d06 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java @@ -61,7 +61,7 @@ public void hookOnError(Throwable t) { } @Override - protected boolean isCallPermitted() { + protected boolean obtainPermission() { return rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration()); } diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java index f6b0530dab..1222fd256b 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/FluxBulkheadTest.java @@ -59,7 +59,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); StepVerifier.create( Flux.just("Event") @@ -73,7 +73,7 @@ public void shouldEmitErrorWithBulkheadFullException() { @Test public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); StepVerifier.create( Flux.error(new IOException("BAM!")) @@ -87,7 +87,7 @@ public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() { @Test public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); StepVerifier.create( Flux.error(new IOException("BAM!"), true) diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/MonoBulkheadTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/MonoBulkheadTest.java index 3b7431005c..9fb5c6c105 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/MonoBulkheadTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/MonoBulkheadTest.java @@ -58,7 +58,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); StepVerifier.create( Mono.just("Event") @@ -72,7 +72,7 @@ public void shouldEmitErrorWithBulkheadFullException() { @Test public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); StepVerifier.create( Mono.error(new IOException("BAM!")) @@ -86,7 +86,7 @@ public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() { @Test public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); StepVerifier.create( Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1)) diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriber.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriber.java index 5bb83a2a41..c869df3ca1 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriber.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriber.java @@ -78,7 +78,7 @@ public void cancel() { private boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { - callPermitted = bulkhead.isCallPermitted(); + callPermitted = bulkhead.tryObtainPermission(); if (!callPermitted) { permitted.set(Permit.REJECTED); } diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/DisposableBulkhead.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/DisposableBulkhead.java index 65b6a8bdfc..a7dcf53740 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/DisposableBulkhead.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/bulkhead/operator/DisposableBulkhead.java @@ -124,7 +124,7 @@ private boolean isInvocationPermitted() { private boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { - callPermitted = bulkhead.isCallPermitted(); + callPermitted = bulkhead.tryObtainPermission(); if (!callPermitted) { permitted.set(Permit.REJECTED); } diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriber.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriber.java index 2bb4642a09..f40d88c43a 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriber.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriber.java @@ -1,8 +1,8 @@ package io.github.resilience4j.circuitbreaker.operator; import io.github.resilience4j.adapter.Permit; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import io.github.resilience4j.core.StopWatch; import io.github.resilience4j.core.lang.Nullable; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -39,7 +39,7 @@ public void onSubscribe(Subscription subscription) { } else { cancel(); childSubscriber.onSubscribe(this); - childSubscriber.onError(new CircuitBreakerOpenException(String.format("CircuitBreaker '%s' is open", circuitBreaker.getName()))); + childSubscriber.onError(new CallNotPermittedException(circuitBreaker.getName())); } } } @@ -80,7 +80,7 @@ public void cancel() { private boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { - callPermitted = circuitBreaker.isCallPermitted(); + callPermitted = circuitBreaker.tryObtainPermission(); if (!callPermitted) { permitted.set(Permit.REJECTED); } else { diff --git a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/DisposableCircuitBreaker.java b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/DisposableCircuitBreaker.java index b7111c2161..7a5ce4a959 100644 --- a/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/DisposableCircuitBreaker.java +++ b/resilience4j-rxjava2/src/main/java/io/github/resilience4j/circuitbreaker/operator/DisposableCircuitBreaker.java @@ -1,8 +1,8 @@ package io.github.resilience4j.circuitbreaker.operator; import io.github.resilience4j.adapter.Permit; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import io.github.resilience4j.core.StopWatch; import io.github.resilience4j.core.lang.Nullable; import io.reactivex.disposables.Disposable; @@ -122,7 +122,7 @@ protected final void onNextInner(T value) { private boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { - callPermitted = circuitBreaker.isCallPermitted(); + callPermitted = circuitBreaker.tryObtainPermission(); if (!callPermitted) { permitted.set(Permit.REJECTED); } else { @@ -137,7 +137,7 @@ private boolean isInvocationPermitted() { } private Exception circuitBreakerOpenException() { - return new CircuitBreakerOpenException(String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())); + return new CallNotPermittedException(circuitBreaker); } private void markFailure(Throwable e) { diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadCompletableObserverTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadCompletableObserverTest.java index a7fa6c8d99..8eb060ac16 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadCompletableObserverTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadCompletableObserverTest.java @@ -1,13 +1,5 @@ package io.github.resilience4j.bulkhead.operator; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.io.IOException; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.bulkhead.BulkheadFullException; @@ -16,6 +8,12 @@ import io.reactivex.disposables.Disposable; import org.junit.Test; +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * Unit test for {@link BulkheadCompletableObserver} using {@link BulkheadOperator}. */ @@ -48,7 +46,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); Completable.complete() .lift(BulkheadOperator.of(bulkhead)) @@ -100,7 +98,7 @@ public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() Disposable disposable = mock(Disposable.class); CompletableObserver childObserver = mock(CompletableObserver.class); CompletableObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); decoratedObserver.onSubscribe(disposable); diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadMaybeObserverTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadMaybeObserverTest.java index da94cbebf2..ee29aa3bfd 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadMaybeObserverTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadMaybeObserverTest.java @@ -1,14 +1,5 @@ package io.github.resilience4j.bulkhead.operator; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.util.Arrays; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.bulkhead.BulkheadFullException; @@ -18,6 +9,13 @@ import io.reactivex.disposables.Disposable; import org.junit.Test; +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * Unit test for {@link BulkheadMaybeObserver} using {@link BulkheadOperator}. */ @@ -49,7 +47,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); Maybe.just(1) .lift(BulkheadOperator.of(bulkhead)) @@ -118,7 +116,7 @@ public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); decoratedObserver.onSubscribe(disposable); diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadObserverTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadObserverTest.java index 8ede8f55ac..de7b3ade11 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadObserverTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadObserverTest.java @@ -1,14 +1,5 @@ package io.github.resilience4j.bulkhead.operator; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.IOException; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.bulkhead.BulkheadFullException; @@ -17,6 +8,12 @@ import io.reactivex.disposables.Disposable; import org.junit.Test; +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * Unit test for {@link BulkheadObserver} using {@link BulkheadOperator}. */ @@ -48,7 +45,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); Observable.fromArray("Event 1", "Event 2") .lift(BulkheadOperator.of(bulkhead)) @@ -119,7 +116,7 @@ public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() Disposable disposable = mock(Disposable.class); Observer childObserver = mock(Observer.class); Observer decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); decoratedObserver.onSubscribe(disposable); diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSingleObserverTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSingleObserverTest.java index ba09c815f6..f0c38ee374 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSingleObserverTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSingleObserverTest.java @@ -1,14 +1,5 @@ package io.github.resilience4j.bulkhead.operator; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.io.IOException; -import java.util.Arrays; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.bulkhead.BulkheadFullException; @@ -18,6 +9,13 @@ import io.reactivex.disposables.Disposable; import org.junit.Test; +import java.io.IOException; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * Unit test for {@link BulkheadSingleObserver} using {@link BulkheadOperator}. */ @@ -49,7 +47,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); Single.just(1) .lift(BulkheadOperator.of(bulkhead)) @@ -101,7 +99,7 @@ public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() Disposable disposable = mock(Disposable.class); SingleObserver childObserver = mock(SingleObserver.class); SingleObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); decoratedObserver.onSubscribe(disposable); diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriberTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriberTest.java index ce8c410d16..253801b023 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriberTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/bulkhead/operator/BulkheadSubscriberTest.java @@ -1,14 +1,5 @@ package io.github.resilience4j.bulkhead.operator; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.IOException; - import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.bulkhead.BulkheadFullException; @@ -17,6 +8,12 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * Unit test for {@link BulkheadSubscriber} using {@link BulkheadOperator}. */ @@ -48,7 +45,7 @@ public void shouldPropagateError() { @Test public void shouldEmitErrorWithBulkheadFullException() { - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); Flowable.fromArray("Event 1", "Event 2") .lift(BulkheadOperator.of(bulkhead)) @@ -118,7 +115,7 @@ public void shouldNotReleaseBulkheadWhenWasCancelledAfterNotPermittedSubscribe() Subscription subscription = mock(Subscription.class); Subscriber childObserver = mock(Subscriber.class); Subscriber decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); - bulkhead.isCallPermitted(); + bulkhead.tryObtainPermission(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); decoratedObserver.onSubscribe(subscription); diff --git a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriberTest.java b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriberTest.java index 4d13a3aaf8..d6683b5ea2 100644 --- a/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriberTest.java +++ b/resilience4j-rxjava2/src/test/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerSubscriberTest.java @@ -1,19 +1,16 @@ package io.github.resilience4j.circuitbreaker.operator; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.IOException; - -import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.reactivex.Flowable; import org.junit.Test; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + /** * Unit test for {@link CircuitBreakerSubscriber}. */ @@ -50,7 +47,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() { .lift(CircuitBreakerOperator.of(circuitBreaker)) .test() .assertSubscribed() - .assertError(CircuitBreakerOpenException.class) + .assertError(CallNotPermittedException.class) .assertNotComplete(); assertNoRegisteredCall(); diff --git a/resilience4j-vertx/src/main/java/io/github/resilience4j/circuitbreaker/VertxCircuitBreaker.java b/resilience4j-vertx/src/main/java/io/github/resilience4j/circuitbreaker/VertxCircuitBreaker.java index 1ac78b950f..55366d2737 100644 --- a/resilience4j-vertx/src/main/java/io/github/resilience4j/circuitbreaker/VertxCircuitBreaker.java +++ b/resilience4j-vertx/src/main/java/io/github/resilience4j/circuitbreaker/VertxCircuitBreaker.java @@ -51,10 +51,8 @@ static Supplier> decorateFuture(CircuitBreaker circuitBreaker, Sup return () -> { final Future future = Future.future(); - if (!circuitBreaker.isCallPermitted()) { - future.fail( - new CircuitBreakerOpenException( - String.format("CircuitBreaker '%s' is open", circuitBreaker.getName()))); + if (!circuitBreaker.tryObtainPermission()) { + future.fail(new CallNotPermittedException(circuitBreaker)); } else { long start = System.nanoTime();