From b9e61a904695b06b8e25c679a127b1ccb6b1f072 Mon Sep 17 00:00:00 2001 From: Oleksandr Goldobin Date: Wed, 18 Jan 2017 10:55:48 +0100 Subject: [PATCH] Issue #43: Added CircuitBreaker and Retry decorations for CompletionStage --- .gitignore | 1 + .../robwin/circuitbreaker/CircuitBreaker.java | 43 +++- .../github/robwin/decorators/Decorators.java | 31 +++ .../io/github/robwin/retry/AsyncRetry.java | 147 ++++++++++++ .../retry/internal/AsyncRetryContext.java | 78 +++++++ .../circuitbreaker/CircuitBreakerTest.java | 56 ++++- .../robwin/retry/internal/AsyncRetryTest.java | 211 ++++++++++++++++++ .../robwin/test/AsyncHelloWorldService.java | 11 + 8 files changed, 559 insertions(+), 19 deletions(-) create mode 100644 src/main/java/io/github/robwin/retry/AsyncRetry.java create mode 100644 src/main/java/io/github/robwin/retry/internal/AsyncRetryContext.java create mode 100644 src/test/java/io/github/robwin/retry/internal/AsyncRetryTest.java create mode 100644 src/test/java/io/github/robwin/test/AsyncHelloWorldService.java diff --git a/.gitignore b/.gitignore index 2770962649..f577d8b61f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.iml .gradle build +classes \ No newline at end of file diff --git a/src/main/java/io/github/robwin/circuitbreaker/CircuitBreaker.java b/src/main/java/io/github/robwin/circuitbreaker/CircuitBreaker.java index 575d18a079..78d34de82b 100644 --- a/src/main/java/io/github/robwin/circuitbreaker/CircuitBreaker.java +++ b/src/main/java/io/github/robwin/circuitbreaker/CircuitBreaker.java @@ -28,6 +28,7 @@ import java.time.Duration; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -286,17 +287,43 @@ static Try.CheckedSupplier decorateCheckedSupplier(CircuitBreaker circuit * * @return a supplier which is secured by a CircuitBreaker. */ - static Supplier> decorateCompletableFuture(CircuitBreaker circuitBreaker, Supplier> supplier){ + static Supplier> decorateCompletionStage( + CircuitBreaker circuitBreaker, + Supplier> supplier + ) { return () -> { - CircuitBreakerUtils.isCallPermitted(circuitBreaker); - StopWatch stopWatch = StopWatch.start(circuitBreaker.getName()); - return supplier.get().whenComplete((returnValue, throwable) -> { - if (returnValue != null) { - circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration()); - } else { + + final CompletableFuture promise = new CompletableFuture<>(); + + if (!circuitBreaker.isCallPermitted()) { + promise.completeExceptionally( + new CircuitBreakerOpenException( + String.format("CircuitBreaker '%s' is open", circuitBreaker.getName()))); + + } else { + final StopWatch stopWatch = StopWatch.start(circuitBreaker.getName()); + + try { + supplier.get().whenComplete((result, throwable) -> { + + final Duration duration = stopWatch.stop().getProcessingDuration(); + + if (throwable != null) { + circuitBreaker.onError(duration, throwable); + promise.completeExceptionally(throwable); + + } else { + circuitBreaker.onSuccess(duration); + promise.complete(result); + } + }); + } catch (Throwable throwable) { circuitBreaker.onError(stopWatch.stop().getProcessingDuration(), throwable); + throw throwable; } - }); + } + + return promise; }; } diff --git a/src/main/java/io/github/robwin/decorators/Decorators.java b/src/main/java/io/github/robwin/decorators/Decorators.java index f7a5fc40ce..cf38842009 100644 --- a/src/main/java/io/github/robwin/decorators/Decorators.java +++ b/src/main/java/io/github/robwin/decorators/Decorators.java @@ -3,9 +3,12 @@ import io.github.robwin.cache.Cache; import io.github.robwin.circuitbreaker.CircuitBreaker; import io.github.robwin.ratelimiter.RateLimiter; +import io.github.robwin.retry.AsyncRetry; import io.github.robwin.retry.Retry; import javaslang.control.Try; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -38,6 +41,11 @@ static DecorateCheckedRunnable ofCheckedRunnable(Try.CheckedRunnable supplier){ return new DecorateCheckedRunnable(supplier); } + static DecorateCompletionStage ofCompletionStage(Supplier> stageSupplier){ + return new DecorateCompletionStage<>(stageSupplier); + } + + class DecorateSupplier{ private Supplier supplier; @@ -209,4 +217,27 @@ public Try.CheckedRunnable decorate() { return runnable; } } + + class DecorateCompletionStage { + + private Supplier> stageSupplier; + + public DecorateCompletionStage(Supplier> stageSupplier) { + this.stageSupplier = stageSupplier; + } + + public DecorateCompletionStage withCircuitBreaker(CircuitBreaker circuitBreaker) { + stageSupplier = CircuitBreaker.decorateCompletionStage(circuitBreaker, stageSupplier); + return this; + } + + public DecorateCompletionStage withRetry(AsyncRetry retryContext, ScheduledExecutorService scheduler) { + stageSupplier = AsyncRetry.decorateCompletionStage(retryContext, scheduler, stageSupplier); + return this; + } + + public Supplier> decorate() { + return stageSupplier; + } + } } \ No newline at end of file diff --git a/src/main/java/io/github/robwin/retry/AsyncRetry.java b/src/main/java/io/github/robwin/retry/AsyncRetry.java new file mode 100644 index 0000000000..9527e290bf --- /dev/null +++ b/src/main/java/io/github/robwin/retry/AsyncRetry.java @@ -0,0 +1,147 @@ +package io.github.robwin.retry; + +import io.github.robwin.retry.event.RetryEvent; +import io.github.robwin.retry.internal.AsyncRetryContext; +import io.reactivex.Flowable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public interface AsyncRetry { + + /** + * Returns the ID of this Retry. + * + * @return the ID of this Retry + */ + String getId(); + + /** + * Records a successful call. + */ + void onSuccess(); + + /** + * Records an failed call. + * @return delay in milliseconds until the next try + */ + long onError(Throwable throwable); + + /** + * Returns a reactive stream of RetryEvents. + * + * @return a reactive stream of RetryEvents + */ + Flowable getEventStream(); + + /** + * Creates a Retry with a custom Retry configuration. + * + * @param id the ID of the Retry + * @param retryConfig a custom Retry configuration + * + * @return a Retry with a custom Retry configuration. + */ + static AsyncRetry of(String id, RetryConfig retryConfig){ + return new AsyncRetryContext(id, retryConfig); + } + + /** + * Creates a Retry with a custom Retry configuration. + * + * @param id the ID of the Retry + * @param retryConfigSupplier a supplier of a custom Retry configuration + * + * @return a Retry with a custom Retry configuration. + */ + static AsyncRetry of(String id, Supplier retryConfigSupplier){ + return of(id, retryConfigSupplier.get()); + } + + /** + * Creates a Retry with default configuration. + * + * @param id the ID of the Retry + * @return a Retry with default configuration + */ + static AsyncRetry ofDefaults(String id){ + return of(id, RetryConfig.ofDefaults()); + } + + /** + * Decorates CompletionStageSupplier with Retry + * + * @param retryContext retry context + * @param scheduler execution service to use to schedule retries + * @param supplier completion stage supplier + * @param type of completion stage result + * @return decorated supplier + */ + static Supplier> decorateCompletionStage( + AsyncRetry retryContext, + ScheduledExecutorService scheduler, + Supplier> supplier + ) { + return () -> { + + final CompletableFuture promise = new CompletableFuture<>(); + final Runnable block = new AsyncRetryBlock<>(scheduler, retryContext, supplier, promise); + block.run(); + + return promise; + }; + } +} + +class AsyncRetryBlock implements Runnable { + private final ScheduledExecutorService scheduler; + private final AsyncRetry retryContext; + private final Supplier> supplier; + private final CompletableFuture promise; + + AsyncRetryBlock( + ScheduledExecutorService scheduler, + AsyncRetry retryContext, + Supplier> supplier, + CompletableFuture promise + ) { + this.scheduler = scheduler; + this.retryContext = retryContext; + this.supplier = supplier; + this.promise = promise; + } + + @Override + public void run() { + final CompletionStage stage; + + try { + stage = supplier.get(); + } catch (Throwable t) { + onError(t); + return; + } + + stage.whenComplete((result, t) -> { + if (t != null) { + onError(t); + } else { + promise.complete(result); + retryContext.onSuccess(); + } + }); + } + + private void onError(Throwable t) { + final long delay = retryContext.onError(t); + + if (delay < 1) { + promise.completeExceptionally(t); + } else { + scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } +} diff --git a/src/main/java/io/github/robwin/retry/internal/AsyncRetryContext.java b/src/main/java/io/github/robwin/retry/internal/AsyncRetryContext.java new file mode 100644 index 0000000000..f93a114f3a --- /dev/null +++ b/src/main/java/io/github/robwin/retry/internal/AsyncRetryContext.java @@ -0,0 +1,78 @@ +package io.github.robwin.retry.internal; + +import io.github.robwin.retry.AsyncRetry; +import io.github.robwin.retry.RetryConfig; +import io.github.robwin.retry.event.RetryEvent; +import io.github.robwin.retry.event.RetryOnErrorEvent; +import io.github.robwin.retry.event.RetryOnSuccessEvent; +import io.reactivex.Flowable; +import io.reactivex.processors.FlowableProcessor; +import io.reactivex.processors.PublishProcessor; +import javaslang.collection.Stream; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; + +public class AsyncRetryContext implements AsyncRetry { + + private final String id; + private final int maxAttempts; + private Duration waitDuration; + private final Function backoffFunction; + private final FlowableProcessor eventPublisher; + + private final AtomicInteger numOfAttempts = new AtomicInteger(0); + + public AsyncRetryContext(String id, RetryConfig config) { + this.id = id; + this.maxAttempts = config.getMaxAttempts(); + this.backoffFunction = config.getBackoffFunction(); + this.waitDuration = config.getWaitDuration(); + + PublishProcessor publisher = PublishProcessor.create(); + this.eventPublisher = publisher.toSerialized(); + } + + @Override + public String getId() { + return id; + } + + @Override + public void onSuccess() { + int currentNumOfAttempts = numOfAttempts.get(); + publishRetryEvent(() -> new RetryOnSuccessEvent(id, currentNumOfAttempts, null)); + } + + @Override + public long onError(Throwable throwable) { + int attempt = numOfAttempts.addAndGet(1); + publishRetryEvent(() -> new RetryOnErrorEvent(id, attempt, throwable)); + return calculateInterval(attempt); + } + + @Override + public Flowable getEventStream() { + return eventPublisher; + } + + + private long calculateInterval(int attempt) { + + if (attempt > maxAttempts) { + return -1; + } else { + return Stream.iterate(waitDuration, backoffFunction) + .get(attempt - 1) + .toMillis(); + } + } + + private void publishRetryEvent(Supplier event) { + if(eventPublisher.hasSubscribers()) { + eventPublisher.onNext(event.get()); + } + } +} diff --git a/src/test/java/io/github/robwin/circuitbreaker/CircuitBreakerTest.java b/src/test/java/io/github/robwin/circuitbreaker/CircuitBreakerTest.java index 0e8ed24b0d..0787809262 100644 --- a/src/test/java/io/github/robwin/circuitbreaker/CircuitBreakerTest.java +++ b/src/test/java/io/github/robwin/circuitbreaker/CircuitBreakerTest.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; @@ -638,7 +639,7 @@ public void shouldInvokeAsyncApply() throws ExecutionException, InterruptedExcep } @Test - public void shouldDecorateCompletableFutureAndReturnWithSuccess() throws ExecutionException, InterruptedException { + public void shouldDecorateCompletionStageAndReturnWithSuccess() throws ExecutionException, InterruptedException { // Given CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName"); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); @@ -646,15 +647,18 @@ public void shouldDecorateCompletableFutureAndReturnWithSuccess() throws Executi given(helloWorldService.returnHelloWorld()).willReturn("Hello"); // When - Supplier> completableFutureSupplier = () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld); + Supplier> completionStageSupplier = + () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld); - Supplier> decoratedCompletableFutureSupplier = CircuitBreaker.decorateCompletableFuture(circuitBreaker, completableFutureSupplier); - CompletableFuture decoratedCompletableFuture = decoratedCompletableFutureSupplier.get() + Supplier> decoratedCompletionStageSupplier = + CircuitBreaker.decorateCompletionStage(circuitBreaker, completionStageSupplier); + CompletionStage decoratedCompletionStage = decoratedCompletionStageSupplier + .get() .thenApply(value -> value + " world"); // Then the helloWorldService should be invoked 1 time BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld(); - assertThat(decoratedCompletableFuture.get()).isEqualTo("Hello world"); + assertThat(decoratedCompletionStage.toCompletableFuture().get()).isEqualTo("Hello world"); CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics(); assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1); @@ -662,21 +666,51 @@ public void shouldDecorateCompletableFutureAndReturnWithSuccess() throws Executi } @Test - public void shouldDecorateCompletableFutureAndReturnWithException() throws ExecutionException, InterruptedException { + public void shouldDecorateCompletionStageAndReturnWithExceptionAtSyncStage() throws ExecutionException, InterruptedException { // Given CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName"); assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); // Given the HelloWorldService throws an exception - given(helloWorldService.returnHelloWorld()).willThrow(new RuntimeException("BAM!")); // When - Supplier> completableFutureSupplier = () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld); - Supplier> decoratedCompletableFutureSupplier = CircuitBreaker.decorateCompletableFuture(circuitBreaker, completableFutureSupplier); - CompletableFuture decoratedCompletableFuture = decoratedCompletableFutureSupplier.get(); + Supplier> completionStageSupplier = () -> { + throw new WebServiceException("BAM! At sync stage"); + }; + + Supplier> decoratedCompletionStageSupplier = + CircuitBreaker.decorateCompletionStage(circuitBreaker, completionStageSupplier); + Try> result = Try.of(decoratedCompletionStageSupplier::get); + + // Then the helloWorldService should be invoked 1 time + BDDMockito.then(helloWorldService).should(times(0)).returnHelloWorld(); + + assertThat(result.isFailure()).isEqualTo(true); + assertThat(result.failed().get()).isInstanceOf(RuntimeException.class); + + CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics(); + assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1); + assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(1); + } + + @Test + public void shouldDecorateCompletionStageAndReturnWithExceptionAtAsyncStage() throws ExecutionException, InterruptedException { + // Given + CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName"); + assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED); + // Given the HelloWorldService throws an exception + given(helloWorldService.returnHelloWorld()).willThrow(new RuntimeException("BAM! At async stage")); + + // When + Supplier> completionStageSupplier = + () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld); + Supplier> decoratedCompletionStageSupplier = + CircuitBreaker.decorateCompletionStage(circuitBreaker, completionStageSupplier); + CompletionStage decoratedCompletionStage = decoratedCompletionStageSupplier.get(); // Then the helloWorldService should be invoked 1 time BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld(); - assertThatThrownBy(decoratedCompletableFuture::get).isInstanceOf(ExecutionException.class).hasCause(new RuntimeException("BAM!")); + assertThatThrownBy(decoratedCompletionStage.toCompletableFuture()::get) + .isInstanceOf(ExecutionException.class).hasCause(new RuntimeException("BAM! At async stage")); CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics(); assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1); diff --git a/src/test/java/io/github/robwin/retry/internal/AsyncRetryTest.java b/src/test/java/io/github/robwin/retry/internal/AsyncRetryTest.java new file mode 100644 index 0000000000..423523a66f --- /dev/null +++ b/src/test/java/io/github/robwin/retry/internal/AsyncRetryTest.java @@ -0,0 +1,211 @@ +package io.github.robwin.retry.internal; + +import io.github.robwin.retry.AsyncRetry; +import io.github.robwin.retry.RetryConfig; +import io.github.robwin.test.AsyncHelloWorldService; +import javaslang.control.Try; +import org.junit.Before; +import org.junit.Test; +import org.mockito.BDDMockito; + +import javax.xml.ws.WebServiceException; +import java.util.concurrent.*; +import java.util.function.Supplier; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +public class AsyncRetryTest { + private static final long DEFAULT_TIMEOUT_SECONDS = 5; + + private AsyncHelloWorldService helloWorldService; + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + + @Before + public void setUp(){ + helloWorldService = mock(AsyncHelloWorldService.class); + } + + @Test + public void shouldNotRetry() throws InterruptedException, ExecutionException, TimeoutException { + // Given the HelloWorldService returns Hello world + given(helloWorldService.returnHelloWorld()).willReturn(completedFuture("Hello world")); + // Create a Retry with default configuration + AsyncRetry retryContext = AsyncRetry.ofDefaults("id"); + // Decorate the invocation of the HelloWorldService + Supplier> supplier = AsyncRetry.decorateCompletionStage( + retryContext, + scheduler, + () -> helloWorldService.returnHelloWorld()); + + // When + String result = awaitResult(supplier); + // Then the helloWorldService should be invoked 1 time + BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld(); + assertThat(result).isEqualTo("Hello world"); + } + + @Test + public void shouldRetryInCaseOfExceptionAtSyncStage() { + // Given the HelloWorldService throws an exception + given(helloWorldService.returnHelloWorld()) + .willThrow(new WebServiceException("BAM!")) + .willReturn(completedFuture("Hello world")); + + // Create a Retry with default configuration + AsyncRetry retryContext = AsyncRetry.ofDefaults("id"); + // Decorate the invocation of the HelloWorldService + Supplier> supplier = AsyncRetry.decorateCompletionStage( + retryContext, + scheduler, + () -> helloWorldService.returnHelloWorld()); + + // When + String result = awaitResult(supplier.get()); + + // Then the helloWorldService should be invoked 2 times + BDDMockito.then(helloWorldService).should(times(2)).returnHelloWorld(); + assertThat(result).isEqualTo("Hello world"); + } + + @Test + public void shouldRetryInCaseOfAnExceptionAtAsyncStage() { + // Given the HelloWorldService throws an exception + given(helloWorldService.returnHelloWorld()) + .willReturn(supplyAsync(() -> { throw new WebServiceException("BAM!"); })) + .willReturn(completedFuture("Hello world")); + + // Create a Retry with default configuration + AsyncRetry retryContext = AsyncRetry.ofDefaults("id"); + // Decorate the invocation of the HelloWorldService + Supplier> supplier = AsyncRetry.decorateCompletionStage( + retryContext, + scheduler, + () -> helloWorldService.returnHelloWorld()); + + // When + String result = awaitResult(supplier.get()); + + // Then the helloWorldService should be invoked 2 times + BDDMockito.then(helloWorldService).should(times(2)).returnHelloWorld(); + assertThat(result).isEqualTo("Hello world"); + } + + @Test + public void shouldCompleteFutureAfterOneAttemptInCaseOfExceptionAtSyncStage() { + shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(1); + } + + @Test + public void shouldCompleteFutureAfterTwoAttemptsInCaseOfExceptionAtSyncStage() { + shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(2); + } + + @Test + public void shouldCompleteFutureAfterThreeAttemptsInCaseOfExceptionAtSyncStage() { + shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(3); + } + + private void shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(int noOfAttempts) { + // Given the HelloWorldService throws an exception + given(helloWorldService.returnHelloWorld()) + .willThrow(new WebServiceException("BAM!")); + + // Create a Retry with default configuration + AsyncRetry retryContext = AsyncRetry.of( + "id", + RetryConfig + .custom() + .maxAttempts(noOfAttempts) + .build()); + // Decorate the invocation of the HelloWorldService + Supplier> supplier = AsyncRetry.decorateCompletionStage( + retryContext, + scheduler, + () -> helloWorldService.returnHelloWorld()); + + // When + Try resultTry = Try.of(() -> awaitResult(supplier.get())); + + // Then the helloWorldService should be invoked n + 1 times + BDDMockito.then(helloWorldService).should(times(noOfAttempts + 1)).returnHelloWorld(); + assertThat(resultTry.isFailure()).isTrue(); + assertThat(resultTry.getCause().getCause()).isInstanceOf(WebServiceException.class); + } + + @Test + public void shouldCompleteFutureAfterOneAttemptInCaseOfExceptionAtAsyncStage() { + shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtAsyncStage(1); + } + + @Test + public void shouldCompleteFutureAfterTwoAttemptsInCaseOfExceptionAtAsyncStage() { + shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtAsyncStage(2); + } + + @Test + public void shouldCompleteFutureAfterThreeAttemptsInCaseOfExceptionAtAsyncStage() { + shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtAsyncStage(3); + } + + private void shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtAsyncStage(int noOfAttempts) { + // Given the HelloWorldService throws an exception + given(helloWorldService.returnHelloWorld()) + .willReturn(supplyAsync(() -> { throw new WebServiceException("BAM!"); })); + + // Create a Retry with default configuration + AsyncRetry retryContext = AsyncRetry.of( + "id", + RetryConfig + .custom() + .maxAttempts(noOfAttempts) + .build()); + // Decorate the invocation of the HelloWorldService + Supplier> supplier = AsyncRetry.decorateCompletionStage( + retryContext, + scheduler, + () -> helloWorldService.returnHelloWorld()); + + // When + Try resultTry = Try.of(() -> awaitResult(supplier.get())); + + // Then the helloWorldService should be invoked n + 1 times + BDDMockito.then(helloWorldService).should(times(noOfAttempts + 1)).returnHelloWorld(); + assertThat(resultTry.isFailure()).isTrue(); + assertThat(resultTry.getCause().getCause()).isInstanceOf(WebServiceException.class); + } + + private static class RuntimeExecutionException extends RuntimeException { + RuntimeExecutionException(Throwable cause) { + super(cause); + } + } + + private static T awaitResult(CompletionStage completionStage, long timeoutSeconds) { + try { + return completionStage.toCompletableFuture().get(timeoutSeconds, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { + throw new AssertionError(e); + } catch (ExecutionException e) { + throw new RuntimeExecutionException(e.getCause()); + } + } + + private static T awaitResult(CompletionStage completionStage) { + return awaitResult(completionStage, DEFAULT_TIMEOUT_SECONDS); + } + + private static T awaitResult(Supplier> completionStageSupplier, long timeoutSeconds) { + return awaitResult(completionStageSupplier.get(), timeoutSeconds); + } + + private static T awaitResult(Supplier> completionStageSupplier) { + return awaitResult(completionStageSupplier, DEFAULT_TIMEOUT_SECONDS); + } + +} diff --git a/src/test/java/io/github/robwin/test/AsyncHelloWorldService.java b/src/test/java/io/github/robwin/test/AsyncHelloWorldService.java new file mode 100644 index 0000000000..6dbc3bfa02 --- /dev/null +++ b/src/test/java/io/github/robwin/test/AsyncHelloWorldService.java @@ -0,0 +1,11 @@ +package io.github.robwin.test; + +import java.util.concurrent.CompletionStage; + +public interface AsyncHelloWorldService { + CompletionStage returnHelloWorld(); + CompletionStage returnHelloWorldWithName(String name); + + CompletionStage sayHelloWorld(); + CompletionStage sayHelloWorldWithName(String name); +}