Skip to content

Commit

Permalink
Add response predicate to retry sync and async for enhancement Reacti…
Browse files Browse the repository at this point in the history
  • Loading branch information
Romeh authored and RobWin committed Nov 9, 2018
1 parent 657c81d commit 9dec4b6
Show file tree
Hide file tree
Showing 7 changed files with 572 additions and 284 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.github.resilience4j.retry;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.retry.event.RetryEvent;
import io.github.resilience4j.retry.event.RetryOnErrorEvent;
Expand All @@ -8,12 +14,6 @@
import io.github.resilience4j.retry.event.RetryOnSuccessEvent;
import io.github.resilience4j.retry.internal.AsyncRetryImpl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* A AsyncRetry instance is thread-safe can be used to decorate multiple requests.
*/
Expand Down Expand Up @@ -110,7 +110,7 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
return () -> {

final CompletableFuture<T> promise = new CompletableFuture<>();
final Runnable block = new AsyncRetryBlock<>(scheduler, retry.context(), supplier, promise);
@SuppressWarnings("unchecked") final Runnable block = new AsyncRetryBlock<>(scheduler, retry.context(), supplier, promise);
block.run();

return promise;
Expand Down Expand Up @@ -155,7 +155,7 @@ interface Metrics {
long getNumberOfFailedCallsWithRetryAttempt();
}

interface Context {
interface Context<T> {

/**
* Records a successful call.
Expand All @@ -168,6 +168,14 @@ interface Context {
* @return delay in milliseconds until the next try
*/
long onError(Throwable throwable);

/**
* check the result call.
*
* @param result the result to validate
* @return delay in milliseconds until the next try if the result match the predicate
*/
long onResult(T result);
}

/**
Expand All @@ -189,15 +197,15 @@ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<Retr

class AsyncRetryBlock<T> implements Runnable {
private final ScheduledExecutorService scheduler;
private final AsyncRetry.Context retryContext;
private final AsyncRetry.Context<T> retryContext;
private final Supplier<CompletionStage<T>> supplier;
private final CompletableFuture<T> promise;

AsyncRetryBlock(
ScheduledExecutorService scheduler,
AsyncRetry.Context retryContext,
Supplier<CompletionStage<T>> supplier,
CompletableFuture<T> promise
ScheduledExecutorService scheduler,
AsyncRetry.Context<T> retryContext,
Supplier<CompletionStage<T>> supplier,
CompletableFuture<T> promise
) {
this.scheduler = scheduler;
this.retryContext = retryContext;
Expand All @@ -217,11 +225,10 @@ public void run() {
}

stage.whenComplete((result, t) -> {
if (t != null) {
if (result != null) {
onResult(result);
} else if (t != null) {
onError(t);
} else {
promise.complete(result);
retryContext.onSuccess();
}
});
}
Expand All @@ -235,4 +242,15 @@ private void onError(Throwable t) {
scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
}

private void onResult(T result) {
final long delay = retryContext.onResult(result);

if (delay < 1) {
promise.complete(result);
retryContext.onSuccess();
} else {
scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package io.github.resilience4j.retry;

import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Supplier;

import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.retry.event.RetryEvent;
import io.github.resilience4j.retry.event.RetryOnErrorEvent;
Expand All @@ -29,10 +33,6 @@
import io.vavr.CheckedFunction1;
import io.vavr.CheckedRunnable;

import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A Retry instance is thread-safe can be used to decorate multiple requests.
* A Retry.
Expand Down Expand Up @@ -145,11 +145,15 @@ default void executeRunnable(Runnable runnable){
*/
static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunction0<T> supplier){
return () -> {
Retry.Context context = retry.context();
@SuppressWarnings("unchecked")
Retry.Context<T> context = retry.context();
do try {
T result = supplier.apply();
context.onSuccess();
return result;
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (Exception exception) {
context.onError(exception);
} while (true);
Expand Down Expand Up @@ -189,11 +193,15 @@ static CheckedRunnable decorateCheckedRunnable(Retry retry, CheckedRunnable runn
*/
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, CheckedFunction1<T, R> function){
return (T t) -> {
Retry.Context context = retry.context();
@SuppressWarnings("unchecked")
Retry.Context<R> context = retry.context();
do try {
R result = function.apply(t);
context.onSuccess();
return result;
R result = function.apply(t);
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (Exception exception) {
context.onError(exception);
} while (true);
Expand All @@ -211,11 +219,15 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, Checke
*/
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier){
return () -> {
Retry.Context context = retry.context();
@SuppressWarnings("unchecked")
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
context.onSuccess();
return result;
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
Expand All @@ -233,11 +245,15 @@ static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier){
*/
static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier){
return () -> {
Retry.Context context = retry.context();
@SuppressWarnings("unchecked")
Retry.Context<T> context = retry.context();
do try {
T result = supplier.call();
context.onSuccess();
return result;
T result = supplier.call();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
Expand Down Expand Up @@ -277,11 +293,15 @@ static Runnable decorateRunnable(Retry retry, Runnable runnable){
*/
static <T, R> Function<T, R> decorateFunction(Retry retry, Function<T, R> function){
return (T t) -> {
Retry.Context context = retry.context();
@SuppressWarnings("unchecked")
Retry.Context<R> context = retry.context();
do try {
R result = function.apply(t);
context.onSuccess();
return result;
R result = function.apply(t);
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
Expand Down Expand Up @@ -326,13 +346,24 @@ interface Metrics {
long getNumberOfFailedCallsWithRetryAttempt();
}

interface Context {
/**
* the retry context which will be used during the retry iteration to decide what can be done on error , result, on runtime error
*
* @param <T> the result type
*/
interface Context<T> {

/**
* Records a successful call.
*/
void onSuccess();

/**
* @param result the returned result from the called logic
* @return true if we need to retry again or false if no retry anymore
*/
boolean onResult(T result);

/**
* Handles a checked exception
*
Expand Down
Loading

0 comments on commit 9dec4b6

Please sign in to comment.