Skip to content

Commit

Permalink
Removed blocking code in RxJava2 RetryTransformer analog to Spring Re… (
Browse files Browse the repository at this point in the history
  • Loading branch information
RobWin authored Jan 30, 2020
1 parent f1c4bd7 commit 52fe20e
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 125 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.github.resilience4j.reactor.IllegalPublisherException;
import io.github.resilience4j.retry.Retry;
import io.vavr.control.Try;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -81,13 +80,13 @@ void onComplete() {
}

void handleResult(T result) {
long waitingDurationMillis = retryContext.onResult(result);
if (waitingDurationMillis != -1) {
throw new RetryDueToResultException(waitingDurationMillis);
long waitDurationMillis = retryContext.onResult(result);
if (waitDurationMillis != -1) {
throw new RetryDueToResultException(waitDurationMillis);
}
}

Mono<Long> handleErrors(Throwable throwable) {
Publisher<Long> handleErrors(Throwable throwable) {
if (throwable instanceof RetryDueToResultException) {
long waitDurationMillis = ((RetryDueToResultException) throwable).waitDurationMillis;
return Mono.delay(Duration.ofMillis(waitDurationMillis));
Expand All @@ -97,15 +96,13 @@ Mono<Long> handleErrors(Throwable throwable) {
throw (Error) throwable;
}

long waitingDurationMillis = Try.of(() -> retryContext
.onError(throwable))
.get();
long waitDurationMillis = retryContext.onError(throwable);

if (waitingDurationMillis == -1) {
Try.failure(throwable).get();
if (waitDurationMillis == -1) {
return Mono.error(throwable);
}

return Mono.delay(Duration.ofMillis(waitingDurationMillis));
return Mono.delay(Duration.ofMillis(waitDurationMillis));
}

private static class RetryDueToResultException extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import reactor.blockhound.integration.ReactorIntegration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
Expand Down Expand Up @@ -64,16 +63,16 @@ public void returnOnCompleteUsingMono() {
.willThrow(new HelloWorldException())
.willReturn("Hello world");

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld).compose(retryOperator)
.subscribeOn(Schedulers.single()))
StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.compose(retryOperator))
.expectNext("Hello world")
.expectComplete()
.verify(Duration.ofMillis(50));
StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld).compose(retryOperator)
.subscribeOn(Schedulers.single()))
.verify(Duration.ofSeconds(1));
StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.compose(retryOperator))
.expectNext("Hello world")
.expectComplete()
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));

then(helloWorldService).should(times(4)).returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -96,7 +95,7 @@ public void shouldNotRetryUsingMonoStackOverFlow() {
.compose(retryOperator))
.expectSubscription()
.expectError(StackOverflowError.class)
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));
}

@Test
Expand All @@ -111,7 +110,7 @@ public void shouldNotRetryWhenItThrowErrorMono() {
.compose(retryOperator))
.expectSubscription()
.expectError(Error.class)
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));

then(helloWorldService).should().returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -129,18 +128,16 @@ public void returnOnErrorUsingMono() {
.willThrow(new HelloWorldException());

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(retryOperator))
.expectSubscription()
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(retryOperator))
.expectSubscription()
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));

then(helloWorldService).should(times(6)).returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -162,7 +159,7 @@ public void doNotRetryFromPredicateUsingMono() {
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));

then(helloWorldService).should().returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -182,11 +179,11 @@ public void retryOnResultUsingMono() {
.willReturn("success");

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNext("success")
.expectComplete().verify(Duration.ofMillis(50));
.expectComplete()
.verify(Duration.ofSeconds(1));

then(helloWorldService).should(times(2)).returnHelloWorld();
Retry.Metrics metrics = retry.getMetrics();
Expand All @@ -205,11 +202,11 @@ public void retryOnResultFailAfterMaxAttemptsUsingMono() {
.willReturn("retry");

StepVerifier.create(Mono.fromCallable(helloWorldService::returnHelloWorld)
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNextCount(1)
.expectComplete().verify(Duration.ofMillis(50));
.expectComplete()
.verify(Duration.ofSeconds(1));

then(helloWorldService).should(times(3)).returnHelloWorld();
}
Expand All @@ -221,11 +218,10 @@ public void shouldFailWithExceptionFlux() {
RetryOperator<Object> retryOperator = RetryOperator.of(retry);

StepVerifier.create(Flux.error(new HelloWorldException())
.subscribeOn(Schedulers.single())
.compose(retryOperator))
.expectSubscription()
.expectError(HelloWorldException.class)
.verify(Duration.ofMillis(50));
.verify(Duration.ofSeconds(1));

Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfSuccessfulCallsWithoutRetryAttempt()).isEqualTo(0);
Expand All @@ -243,7 +239,6 @@ public void retryOnResultUsingFlux() {
Retry retry = Retry.of("testName", config);

StepVerifier.create(Flux.just("retry", "success")
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNext("retry")
Expand All @@ -265,11 +260,11 @@ public void retryOnResultFailAfterMaxAttemptsUsingFlux() {
Retry retry = Retry.of("testName", config);

StepVerifier.create(Flux.just("retry")
.subscribeOn(Schedulers.single())
.compose(RetryOperator.of(retry)))
.expectSubscription()
.expectNextCount(1)
.expectComplete().verify(Duration.ofMillis(50));
.expectComplete()
.verify(Duration.ofSeconds(1));

Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ interface AsyncContext<T> {

/**
* Records a successful call or retryable call with the needed generated retry events. When
* there is a successful retry before reaching the max retries limit , it will generate
* {@link RetryOnSuccessEvent} When the retry reach the max retries limit , it will generate
* there is a successful retry before reaching the max retries limit, it will generate
* {@link RetryOnSuccessEvent}. When the retry reach the max retries limit, it will generate
* {@link RetryOnErrorEvent} with last exception or {@link MaxRetriesExceeded} if no other
* exception thrown
* exception is thrown.
*/
void onComplete();

Expand Down Expand Up @@ -580,10 +580,10 @@ interface Context<T> {

/**
* Records a successful call or retryable call with the needed generated retry events. When
* there is a successful retry before reaching the max retries limit , it will generate
* {@link RetryOnSuccessEvent} When the retry reach the max retries limit , it will generate
* there is a successful retry before reaching the max retries limit, it will generate a
* {@link RetryOnSuccessEvent}. When the retry reaches the max retries limit, it will generate a
* {@link RetryOnErrorEvent} with last exception or {@link MaxRetriesExceeded} if no other
* exception thrown
* exceptions is thrown.
*/
void onComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.reactivex.*;
import org.reactivestreams.Publisher;

import java.util.concurrent.TimeUnit;

public class RetryTransformer<T> implements FlowableTransformer<T, T>, ObservableTransformer<T, T>,
SingleTransformer<T, T>, CompletableTransformer, MaybeTransformer<T, T> {

Expand All @@ -42,86 +44,107 @@ public static <T> RetryTransformer<T> of(Retry retry) {

@Override
public Publisher<T> apply(Flowable<T> upstream) {
Context<T> context = new Context<>(retry.context());
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
Context<T> context = new Context<>(retry.asyncContext());
return upstream.doOnNext(context::handleResult)
.retryWhen(errors -> errors.flatMap(context::handleFlowableErrors))
.doOnComplete(context::onComplete);
}

@Override
public ObservableSource<T> apply(Observable<T> upstream) {
Context<T> context = new Context<>(retry.context());
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
Context<T> context = new Context<>(retry.asyncContext());
return upstream.doOnNext(context::handleResult)
.retryWhen(errors -> errors.flatMap(context::handleObservableErrors))
.doOnComplete(context::onComplete);
}

@Override
public SingleSource<T> apply(Single<T> upstream) {
Context<T> context = new Context<>(retry.context());
return upstream.doOnSuccess(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
Context<T> context = new Context<>(retry.asyncContext());
return upstream.doOnSuccess(context::handleResult)
.retryWhen(errors -> errors.flatMap(context::handleFlowableErrors))
.doOnSuccess(t -> context.onComplete());
}

@Override
public CompletableSource apply(Completable upstream) {
Context<T> context = new Context<>(retry.context());
return upstream.retryWhen(errors -> errors.doOnNext(context::onError))
Context<T> context = new Context<>(retry.asyncContext());
return upstream.retryWhen(errors -> errors.flatMap(context::handleFlowableErrors))
.doOnComplete(context::onComplete);
}

@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
Context<T> context = new Context<>(retry.context());
return upstream.doOnSuccess(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
Context<T> context = new Context<>(retry.asyncContext());
return upstream.doOnSuccess(context::handleResult)
.retryWhen(errors -> errors.flatMap(context::handleFlowableErrors))
.doOnSuccess(t -> context.onComplete())
.doOnComplete(context::onComplete);
}

private static class Context<T> {

private final Retry.Context<T> context;
private final Retry.AsyncContext<T> retryContext;

Context(Retry.Context<T> context) {
this.context = context;
Context(Retry.AsyncContext<T> retryContext) {
this.retryContext = retryContext;
}

void onComplete() {
this.context.onComplete();
this.retryContext.onComplete();
}

void throwExceptionToForceRetryOnResult(T value) {
if (context.onResult(value)) {
throw new RetryDueToResultException();
void handleResult(T result) {
long waitDurationMillis = retryContext.onResult(result);
if (waitDurationMillis != -1) {
throw new RetryDueToResultException(waitDurationMillis);
}
}

void onError(Throwable throwable) throws Exception {
Publisher<Long> handleFlowableErrors(Throwable throwable) {
if (throwable instanceof RetryDueToResultException) {
return;
long waitDurationMillis = ((RetryDueToResultException) throwable).waitDurationMillis;
return Flowable.timer(waitDurationMillis, TimeUnit.MILLISECONDS);
}
// Filter Error to not retry on it
if (throwable instanceof Error) {
throw (Error) throwable;
}
try {
context.onError(castToException(throwable));
} catch (Throwable t) {
throw castToException(t);

long waitDurationMillis = retryContext.onError(throwable);

if (waitDurationMillis == -1) {
return Flowable.error(throwable);
}

return Flowable.timer(waitDurationMillis, TimeUnit.MILLISECONDS);
}

private Exception castToException(Throwable throwable) {
return throwable instanceof Exception ? (Exception) throwable
: new Exception(throwable);
ObservableSource<Long> handleObservableErrors(Throwable throwable) {
if (throwable instanceof RetryDueToResultException) {
long waitDurationMillis = ((RetryDueToResultException) throwable).waitDurationMillis;
return Observable.timer(waitDurationMillis, TimeUnit.MILLISECONDS);
}
// Filter Error to not retry on it
if (throwable instanceof Error) {
throw (Error) throwable;
}

long waitDurationMillis = retryContext.onError(throwable);

if (waitDurationMillis == -1) {
return Observable.error(throwable);
}

return Observable.timer(waitDurationMillis, TimeUnit.MILLISECONDS);
}

private static class RetryDueToResultException extends RuntimeException {
private final long waitDurationMillis;

RetryDueToResultException() {
RetryDueToResultException(long waitDurationMillis) {
super("retry due to retryOnResult predicate");
this.waitDurationMillis = waitDurationMillis;
}
}
}
Expand Down
Loading

0 comments on commit 52fe20e

Please sign in to comment.