Skip to content

Commit

Permalink
Issue ReactiveX#72: Added more support for CompletionStage
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Winkler committed May 12, 2017
1 parent a751458 commit 56ec12e
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -764,11 +764,8 @@ public void shouldDecorateCompletionStageAndReturnWithExceptionAtSyncStage() thr
CircuitBreaker.decorateCompletionStage(circuitBreaker, completionStageSupplier);
Try<CompletionStage<String>> result = Try.of(decoratedCompletionStageSupplier::get);

// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(Mockito.times(0)).returnHelloWorld();

assertThat(result.isFailure()).isEqualTo(true);
assertThat(result.failed().get()).isInstanceOf(RuntimeException.class);
assertThat(result.failed().get()).isInstanceOf(WebServiceException.class);

CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
Expand All @@ -786,6 +783,7 @@ public void shouldDecorateCompletionStageAndReturnWithExceptionAtAsyncStage() th
// When
Supplier<CompletionStage<String>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);

Supplier<CompletionStage<String>> decoratedCompletionStageSupplier =
CircuitBreaker.decorateCompletionStage(circuitBreaker, completionStageSupplier);
CompletionStage<String> decoratedCompletionStage = decoratedCompletionStageSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.vavr.CheckedRunnable;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -74,6 +75,15 @@ static Timer of(String name) {
}


/**
* Decorates and executes the decorated Runnable.
*
* @param runnable the original Callable
*/
default void executeRunnable(Runnable runnable) throws Exception {
decorateRunnable(this, runnable).run();
}

/**
* Decorates and executes the decorated Callable.
*
Expand All @@ -85,6 +95,28 @@ default <T> T executeCallable(Callable<T> callable) throws Exception {
return decorateCallable(this, callable).call();
}

/**
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> T executeSupplier(Supplier<T> supplier){
return decorateSupplier(this, supplier).get();
}

/**
* Decorates and executes the decorated CompletionStage Supplier.
*
* @param supplier the CompletionStage Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> CompletionStage<T> executeCompletionStageSupplier(Supplier<CompletionStage<T>> supplier){
return decorateCompletionStageSupplier(this, supplier).get();
}

/**
* Creates a timed checked supplier.
Expand All @@ -94,7 +126,7 @@ default <T> T executeCallable(Callable<T> callable) throws Exception {
*/
static <T> CheckedFunction0<T> decorateCheckedSupplier(Timer timer, CheckedFunction0<T> supplier){
return () -> {
Context context = timer.time();
final Context context = timer.time();
try {
T returnValue = supplier.apply();
timer.onSuccess(context);
Expand All @@ -115,7 +147,7 @@ static <T> CheckedFunction0<T> decorateCheckedSupplier(Timer timer, CheckedFunct
*/
static CheckedRunnable decorateCheckedRunnable(Timer timer, CheckedRunnable runnable){
return () -> {
Context context = timer.time();
final Context context = timer.time();
try {
runnable.run();
timer.onSuccess(context);
Expand All @@ -126,17 +158,6 @@ static CheckedRunnable decorateCheckedRunnable(Timer timer, CheckedRunnable runn
};
}

/**
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> T executeSupplier(Supplier<T> supplier){
return decorateSupplier(this, supplier).get();
}

/**
* Creates a timed checked supplier.
Expand All @@ -146,7 +167,7 @@ default <T> T executeSupplier(Supplier<T> supplier){
*/
static <T> Supplier<T> decorateSupplier(Timer timer, Supplier<T> supplier){
return () -> {
Context context = timer.time();
final Context context = timer.time();
try {
T returnValue = supplier.get();
timer.onSuccess(context);
Expand All @@ -167,7 +188,7 @@ static <T> Supplier<T> decorateSupplier(Timer timer, Supplier<T> supplier){
*/
static <T> Callable<T> decorateCallable(Timer timer, Callable<T> callable){
return () -> {
Context context = timer.time();
final Context context = timer.time();
try {
T returnValue = callable.call();
timer.onSuccess(context);
Expand All @@ -189,7 +210,7 @@ static <T> Callable<T> decorateCallable(Timer timer, Callable<T> callable){
*/
static Runnable decorateRunnable(Timer timer, Runnable runnable){
return () -> {
Context context = timer.time();
final Context context = timer.time();
try {
runnable.run();
timer.onSuccess(context);
Expand All @@ -210,7 +231,7 @@ static Runnable decorateRunnable(Timer timer, Runnable runnable){
*/
static <T, R> Function<T, R> decorateFunction(Timer timer, Function<T, R> function){
return (T t) -> {
Context context = timer.time();
final Context context = timer.time();
try {
R returnValue = function.apply(t);
timer.onSuccess(context);
Expand All @@ -231,7 +252,7 @@ static <T, R> Function<T, R> decorateFunction(Timer timer, Function<T, R> functi
*/
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Timer timer, CheckedFunction1<T, R> function){
return (T t) -> {
Context context = timer.time();
final Context context = timer.time();
try {
R returnValue = function.apply(t);
timer.onSuccess(context);
Expand All @@ -243,6 +264,34 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Timer timer, Checke
};
}

/**
*
* @param timer the timer to use
* @param stageSupplier the CompletionStage Supplier
* @return a decorated completion stage
*/
static <T> Supplier<CompletionStage<T>> decorateCompletionStageSupplier(Timer timer, Supplier<CompletionStage<T>> stageSupplier) {
return () -> {
final Context context = timer.time();
try {
final CompletionStage<T> stage = stageSupplier.get();

stage.whenComplete((result, throwable) -> {
if (throwable != null) {
timer.onError(context);
} else {
timer.onSuccess(context);
}
});

return stage;
} catch (Throwable throwable) {
timer.onError(context);
throw throwable;
}
};
}

interface Metrics {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import org.mockito.BDDMockito;
import org.mockito.Mockito;

import javax.xml.ws.WebServiceException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -128,6 +132,78 @@ public void shouldDecorateRunnable() throws Throwable {
BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorld();
}

@Test
public void shouldExecuteRunnable() throws Throwable {
// And measure the time with Metrics
timer.executeRunnable(helloWorldService::sayHelloWorld);

assertThat(timer.getMetrics().getNumberOfTotalCalls()).isEqualTo(1);
assertThat(timer.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(1);
assertThat(timer.getMetrics().getNumberOfFailedCalls()).isEqualTo(0);

// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorld();
}

@Test
public void shouldExecuteCompletionStageSupplier() throws Throwable {
// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
// And measure the time with Metrics
Supplier<CompletionStage<String>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);

CompletionStage<String> stringCompletionStage = timer.executeCompletionStageSupplier(completionStageSupplier);
String value = stringCompletionStage.toCompletableFuture().get();

assertThat(value).isEqualTo("Hello world");

assertThat(timer.getMetrics().getNumberOfTotalCalls()).isEqualTo(1);
assertThat(timer.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(1);
assertThat(timer.getMetrics().getNumberOfFailedCalls()).isEqualTo(0);

// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
}

@Test
public void shouldExecuteCompletionStageAndReturnWithExceptionAtSyncStage() throws Throwable {

Supplier<CompletionStage<String>> completionStageSupplier = () -> {
throw new WebServiceException("BAM! At sync stage");
};

Assertions.assertThatThrownBy(() -> timer.executeCompletionStageSupplier(completionStageSupplier))
.isInstanceOf(WebServiceException.class);

assertThat(timer.getMetrics().getNumberOfTotalCalls()).isEqualTo(1);
assertThat(timer.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
assertThat(timer.getMetrics().getNumberOfFailedCalls()).isEqualTo(1);
}


@Test
public void shouldExecuteCompletionStageAndReturnWithExceptionAtASyncStage() throws Throwable {
// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld()).willThrow(new WebServiceException("BAM!"));
// And measure the time with Metrics
Supplier<CompletionStage<String>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);

CompletionStage<String> stringCompletionStage = timer.executeCompletionStageSupplier(completionStageSupplier);

Assertions.assertThatThrownBy(() -> stringCompletionStage.toCompletableFuture().get())
.isInstanceOf(ExecutionException.class).hasCause(new WebServiceException("BAM!"));

assertThat(timer.getMetrics().getNumberOfTotalCalls()).isEqualTo(1);
assertThat(timer.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
assertThat(timer.getMetrics().getNumberOfFailedCalls()).isEqualTo(1);

// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(Mockito.times(1)).returnHelloWorld();
}


@Test
public void shouldDecorateCheckedRunnableAndReturnWithSuccess() throws Throwable {
// And measure the time with Metrics
Expand Down

0 comments on commit 56ec12e

Please sign in to comment.