From 62c1bd1611a0d184cd930b1f848672228488cd1c Mon Sep 17 00:00:00 2001 From: Bogdan Storozhuk Date: Sun, 14 May 2017 22:13:05 +0300 Subject: [PATCH] Issue #135 CompletionStage support in RateLimiter (#136) * Issue #135 CompletionStage support in RateLimiter * Issue #135 codacy issue fix * Issue #135 codacy issue fix * Issue #135 Decorators interface support --- .../resilience4j/decorators/Decorators.java | 5 ++ .../operator/CircuitBreakerOperator.java | 53 +++++--------- .../resilience4j/ratelimiter/RateLimiter.java | 33 +++++++++ .../operator/RateLimiterOperator.java | 53 +++++--------- .../ratelimiter/RateLimiterTest.java | 72 +++++++++++++++---- 5 files changed, 130 insertions(+), 86 deletions(-) diff --git a/resilience4j-all/src/main/java/io/github/resilience4j/decorators/Decorators.java b/resilience4j-all/src/main/java/io/github/resilience4j/decorators/Decorators.java index 63620d5ed6..31147f5c7a 100644 --- a/resilience4j-all/src/main/java/io/github/resilience4j/decorators/Decorators.java +++ b/resilience4j-all/src/main/java/io/github/resilience4j/decorators/Decorators.java @@ -303,6 +303,11 @@ public DecorateCompletionStage withBulkhead(Bulkhead bulkhead) { return this; } + public DecorateCompletionStage withRateLimiter(RateLimiter rateLimiter) { + stageSupplier = RateLimiter.decorateCompletionStage(rateLimiter, stageSupplier); + return this; + } + public Supplier> decorate() { return stageSupplier; } diff --git a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerOperator.java b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerOperator.java index 5587c6a8ae..6d079e454c 100644 --- a/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerOperator.java +++ b/resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerOperator.java @@ -19,6 +19,11 @@ package io.github.resilience4j.circuitbreaker.operator; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import io.github.resilience4j.core.StopWatch; @@ -28,10 +33,6 @@ import io.reactivex.SingleObserver; import io.reactivex.SingleOperator; import io.reactivex.disposables.Disposable; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -102,9 +103,7 @@ private final class CircuitBreakerSubscriber implements Subscriber, Subscript @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; - if (LOG.isDebugEnabled()) { - LOG.info("onSubscribe"); - } + LOG.debug("onSubscribe"); if (circuitBreaker.isCallPermitted()) { stopWatch = StopWatch.start(circuitBreaker.getName()); childSubscriber.onSubscribe(this); @@ -121,9 +120,7 @@ public void onSubscribe(Subscription subscription) { */ @Override public void onNext(T event) { - if (LOG.isDebugEnabled()) { - LOG.info("onNext: {}", event); - } + LOG.debug("onNext: {}", event); if (!isCancelled()) { childSubscriber.onNext(event); } @@ -134,9 +131,7 @@ public void onNext(T event) { */ @Override public void onError(Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.info("onError", e); - } + LOG.debug("onError", e); if (!isCancelled()) { circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e); childSubscriber.onError(e); @@ -149,9 +144,7 @@ public void onError(Throwable e) { */ @Override public void onComplete() { - if (LOG.isDebugEnabled()) { - LOG.info("onComplete"); - } + LOG.debug("onComplete"); if (!isCancelled()) { circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos()); childSubscriber.onComplete(); @@ -199,9 +192,7 @@ private final class CircuitBreakerObserver implements Observer, Disposable { @Override public void onSubscribe(Disposable disposable) { this.disposable = disposable; - if (LOG.isDebugEnabled()) { - LOG.info("onSubscribe"); - } + LOG.debug("onSubscribe"); if (circuitBreaker.isCallPermitted()) { stopWatch = StopWatch.start(circuitBreaker.getName()); childObserver.onSubscribe(this); @@ -218,9 +209,7 @@ public void onSubscribe(Disposable disposable) { */ @Override public void onNext(T event) { - if (LOG.isDebugEnabled()) { - LOG.info("onNext: {}", event); - } + LOG.debug("onNext: {}", event); if (!isDisposed()) { childObserver.onNext(event); } @@ -231,9 +220,7 @@ public void onNext(T event) { */ @Override public void onError(Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.info("onError", e); - } + LOG.debug("onError", e); if (!isDisposed()) { circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e); childObserver.onError(e); @@ -245,9 +232,7 @@ public void onError(Throwable e) { */ @Override public void onComplete() { - if (LOG.isDebugEnabled()) { - LOG.info("onComplete"); - } + LOG.debug("onComplete"); if (!isDisposed()) { circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos()); childObserver.onComplete(); @@ -292,9 +277,7 @@ private class CircuitBreakerSingleObserver implements SingleObserver, Disposa @Override public void onSubscribe(Disposable disposable) { this.disposable = disposable; - if (LOG.isDebugEnabled()) { - LOG.info("onSubscribe"); - } + LOG.debug("onSubscribe"); if (circuitBreaker.isCallPermitted()) { stopWatch = StopWatch.start(circuitBreaker.getName()); childObserver.onSubscribe(this); @@ -311,9 +294,7 @@ public void onSubscribe(Disposable disposable) { */ @Override public void onError(Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.info("onError", e); - } + LOG.debug("onError", e); if (!isDisposed()) { circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e); childObserver.onError(e); @@ -325,9 +306,7 @@ public void onError(Throwable e) { */ @Override public void onSuccess(T value) { - if (LOG.isDebugEnabled()) { - LOG.info("onComplete"); - } + LOG.debug("onComplete"); if (!isDisposed()) { circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos()); childObserver.onSuccess(value); diff --git a/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/RateLimiter.java b/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/RateLimiter.java index e55ade32fd..de19e398cf 100644 --- a/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/RateLimiter.java +++ b/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/RateLimiter.java @@ -27,6 +27,8 @@ import java.time.Duration; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -69,6 +71,37 @@ static RateLimiter ofDefaults(String name) { return new AtomicRateLimiter(name, RateLimiterConfig.ofDefaults()); } + /** + * Returns a supplier which is decorated by a rateLimiter. + * + * @param rateLimiter the rateLimiter + * @param supplier the original supplier + * @param the type of the returned CompletionStage's result + * @return a supplier which is decorated by a RateLimiter. + */ + static Supplier> decorateCompletionStage(RateLimiter rateLimiter, Supplier> supplier) { + return () -> { + + final CompletableFuture promise = new CompletableFuture<>(); + try { + waitForPermission(rateLimiter); + supplier.get() + .whenComplete( + (result, throwable) -> { + if (throwable != null) { + promise.completeExceptionally(throwable); + } else { + promise.complete(result); + } + } + ); + } catch (Throwable throwable) { + promise.completeExceptionally(throwable); + } + return promise; + }; + } + /** * Creates a supplier which is restricted by a RateLimiter. * diff --git a/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/operator/RateLimiterOperator.java b/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/operator/RateLimiterOperator.java index 9823663239..6fff7ef8c3 100644 --- a/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/operator/RateLimiterOperator.java +++ b/resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/operator/RateLimiterOperator.java @@ -16,6 +16,11 @@ package io.github.resilience4j.ratelimiter.operator; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.github.resilience4j.ratelimiter.RateLimiter; import io.github.resilience4j.ratelimiter.RequestNotPermitted; import io.reactivex.FlowableOperator; @@ -24,10 +29,6 @@ import io.reactivex.SingleObserver; import io.reactivex.SingleOperator; import io.reactivex.disposables.Disposable; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,9 +93,7 @@ private final class RateLimiterSubscriber implements Subscriber, Subscription @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; - if (LOG.isDebugEnabled()) { - LOG.info("onSubscribe"); - } + LOG.debug("onSubscribe"); if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) { childSubscriber.onSubscribe(this); } else { @@ -109,9 +108,7 @@ public void onSubscribe(Subscription subscription) { */ @Override public void onNext(T event) { - if (LOG.isDebugEnabled()) { - LOG.info("onNext: {}", event); - } + LOG.debug("onNext: {}", event); if (!isCancelled()) { if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) { childSubscriber.onNext(event); @@ -127,9 +124,7 @@ public void onNext(T event) { */ @Override public void onError(Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.info("onError", e); - } + LOG.debug("onError", e); if (!isCancelled()) { childSubscriber.onError(e); @@ -141,9 +136,7 @@ public void onError(Throwable e) { */ @Override public void onComplete() { - if (LOG.isDebugEnabled()) { - LOG.info("onComplete"); - } + LOG.debug("onComplete"); if (!isCancelled()) { childSubscriber.onComplete(); } @@ -189,9 +182,7 @@ private final class RateLimiterObserver implements Observer, Disposable { @Override public void onSubscribe(Disposable disposable) { this.disposable = disposable; - if (LOG.isDebugEnabled()) { - LOG.info("onSubscribe"); - } + LOG.debug("onSubscribe"); if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) { childObserver.onSubscribe(this); } else { @@ -206,9 +197,7 @@ public void onSubscribe(Disposable disposable) { */ @Override public void onNext(T event) { - if (LOG.isDebugEnabled()) { - LOG.info("onNext: {}", event); - } + LOG.debug("onNext: {}", event); if (!isDisposed()) { if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) { childObserver.onNext(event); @@ -224,9 +213,7 @@ public void onNext(T event) { */ @Override public void onError(Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.info("onError", e); - } + LOG.debug("onError", e); if (!isDisposed()) { childObserver.onError(e); } @@ -237,9 +224,7 @@ public void onError(Throwable e) { */ @Override public void onComplete() { - if (LOG.isDebugEnabled()) { - LOG.info("onComplete"); - } + LOG.debug("onComplete"); if (!isDisposed()) { childObserver.onComplete(); } @@ -282,9 +267,7 @@ private class RateLimiterSingleObserver implements SingleObserver, Disposable @Override public void onSubscribe(Disposable disposable) { this.disposable = disposable; - if (LOG.isDebugEnabled()) { - LOG.info("onSubscribe"); - } + LOG.debug("onSubscribe"); if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) { childObserver.onSubscribe(this); } else { @@ -299,9 +282,7 @@ public void onSubscribe(Disposable disposable) { */ @Override public void onError(Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.info("onError", e); - } + LOG.debug("onError", e); if (!isDisposed()) { childObserver.onError(e); } @@ -312,9 +293,7 @@ public void onError(Throwable e) { */ @Override public void onSuccess(T value) { - if (LOG.isDebugEnabled()) { - LOG.info("onComplete"); - } + LOG.debug("onComplete"); if (!isDisposed()) { childObserver.onSuccess(value); } diff --git a/resilience4j-ratelimiter/src/test/java/io/github/resilience4j/ratelimiter/RateLimiterTest.java b/resilience4j-ratelimiter/src/test/java/io/github/resilience4j/ratelimiter/RateLimiterTest.java index e3c7fe565c..cd920c6be4 100644 --- a/resilience4j-ratelimiter/src/test/java/io/github/resilience4j/ratelimiter/RateLimiterTest.java +++ b/resilience4j-ratelimiter/src/test/java/io/github/resilience4j/ratelimiter/RateLimiterTest.java @@ -18,31 +18,43 @@ */ package io.github.resilience4j.ratelimiter; +import static com.jayway.awaitility.Awaitility.await; +import static org.assertj.core.api.BDDAssertions.then; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static io.vavr.API.$; +import static io.vavr.API.Case; +import static io.vavr.API.Match; +import static io.vavr.Predicates.instanceOf; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.BDDMockito; + import io.vavr.CheckedFunction0; import io.vavr.CheckedFunction1; import io.vavr.CheckedRunnable; import io.vavr.control.Try; -import org.junit.Before; -import org.junit.Test; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import static com.jayway.awaitility.Awaitility.await; -import static io.vavr.API.$; -import static io.vavr.API.Case; -import static io.vavr.API.Match; -import static io.vavr.Predicates.instanceOf; -import static org.assertj.core.api.BDDAssertions.then; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; - @SuppressWarnings("unchecked") public class RateLimiterTest { @@ -206,6 +218,42 @@ public void decorateFunction() throws Exception { verify(function, times(1)).apply(1); } + @Test + public void decorateCompletionStage() throws Exception { + Supplier supplier = mock(Supplier.class); + BDDMockito.given(supplier.get()).willReturn("Resource"); + Supplier> completionStage = () -> supplyAsync(supplier); + + Supplier> decorated = RateLimiter.decorateCompletionStage(limit, completionStage); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(false); + + AtomicReference error = new AtomicReference<>(null); + CompletableFuture notPermittedFuture = decorated.get() + .whenComplete((v, e) -> error.set(e)) + .toCompletableFuture(); + Try errorResult = Try.of(notPermittedFuture::get); + assertTrue(errorResult.isFailure()); + then(errorResult.getCause()).isInstanceOf(ExecutionException.class); + then(notPermittedFuture.isCompletedExceptionally()).isTrue(); + then(error.get()).isExactlyInstanceOf(RequestNotPermitted.class); + verify(supplier, never()).get(); + + when(limit.getPermission(config.getTimeoutDuration())) + .thenReturn(true); + + AtomicReference shouldBeEmpty = new AtomicReference<>(null); + CompletableFuture success = decorated.get() + .whenComplete((v, e) -> error.set(e)) + .toCompletableFuture(); + Try successResult = Try.of(success::get); + then(successResult.isSuccess()).isTrue(); + then(success.isCompletedExceptionally()).isFalse(); + then(shouldBeEmpty.get()).isNull(); + verify(supplier).get(); + } + @Test public void waitForPermissionWithOne() throws Exception { when(limit.getPermission(config.getTimeoutDuration()))