Skip to content

Commit

Permalink
Issue ReactiveX#359: Support for retryOnResult method and Completable…
Browse files Browse the repository at this point in the history
…. Added support for Maybe.

* Upgrade RxJava2 version to 2.2.7

Recently RxJava folks inlined the SubscriptionHelper.isCancelled,
so the method had to be inlined in our code as well.

* Retry transformer for RxJava2 Completable type

This type returns no values at all. It finishes either with onComplete
or onError, so in order to properly count the successful calls without
retry attempt we need to count onComplete as success.

* RxJava2 support for retryOnResult

* Stop using RxJava internal API in RetryTransformer + support for Maybe
  • Loading branch information
adik993 authored and RobWin committed Mar 18, 2019
1 parent c7e911e commit 9d4c57e
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 137 deletions.
2 changes: 1 addition & 1 deletion libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// use. In that respect it serves a role similar to <dependencyManagement> in Maven
ext {
vavrVersion = '0.9.2'
rxJavaVersion = '2.1.10'
rxJavaVersion = '2.2.7'
reactorVersion = '3.0.7.RELEASE'
junitVersion = '4.12'
slf4jVersion = '1.7.24'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.resilience4j.bulkhead.operator;

import static io.reactivex.internal.subscriptions.SubscriptionHelper.CANCELLED;
import static java.util.Objects.requireNonNull;

import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -86,7 +87,7 @@ private boolean acquireCallPermit() {
}

private boolean isInvocationPermitted() {
return !SubscriptionHelper.isCancelled(get()) && wasCallPermitted();
return !(get() == CANCELLED) && wasCallPermitted();
}

private boolean wasCallPermitted() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package io.github.resilience4j.circuitbreaker.operator;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.atomic.AtomicReference;

import io.github.resilience4j.adapter.Permit;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
Expand All @@ -12,6 +8,11 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicReference;

import static io.reactivex.internal.subscriptions.SubscriptionHelper.CANCELLED;
import static java.util.Objects.requireNonNull;

/**
* A RxJava {@link Subscriber} to protect another subscriber by a CircuitBreaker.
*
Expand Down Expand Up @@ -92,7 +93,7 @@ private boolean isInvocationPermitted() {
}

private boolean notCancelled() {
return !SubscriptionHelper.isCancelled(get());
return !(get() == CANCELLED);
}

private void markFailure(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package io.github.resilience4j.ratelimiter.operator;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.github.resilience4j.adapter.Permit;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static io.reactivex.internal.subscriptions.SubscriptionHelper.CANCELLED;
import static java.util.Objects.requireNonNull;

/**
* A RxJava {@link Subscriber} to protect another subscriber by a {@link RateLimiter}.
* Consumes one permit when subscribed and one permit per emitted event except the first one.
Expand Down Expand Up @@ -94,7 +95,7 @@ private boolean isInvocationPermitted() {
}

private boolean notCancelled() {
return !SubscriptionHelper.isCancelled(get());
return !(get() == CANCELLED);
}

private boolean wasCallPermitted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,11 @@
package io.github.resilience4j.retry.transformer;

import io.github.resilience4j.retry.Retry;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.SingleTransformer;
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
import io.reactivex.*;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class RetryTransformer<T> implements FlowableTransformer<T, T>, ObservableTransformer<T, T>, SingleTransformer<T, T> {

private static final Logger LOG = LoggerFactory.getLogger(RetryTransformer.class);

public class RetryTransformer<T> implements FlowableTransformer<T, T>, ObservableTransformer<T, T>,
SingleTransformer<T, T>, CompletableTransformer, MaybeTransformer<T, T> {
private final Retry retry;

private RetryTransformer(Retry retry) {
Expand All @@ -58,118 +41,82 @@ public static <T> RetryTransformer<T> of(Retry retry) {

@Override
public Publisher<T> apply(Flowable<T> upstream) {
return Flowable.fromPublisher(downstream -> {
SubscriptionArbiter sa = new SubscriptionArbiter();
downstream.onSubscribe(sa);
RetrySubscriber<T> repeatSubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, upstream, retry);
upstream.subscribe(repeatSubscriber);
});
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
.doOnComplete(context::onComplete);
}

@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return Observable.fromPublisher(downstream -> {
Flowable<T> flowable = upstream.toFlowable(BackpressureStrategy.BUFFER);
SubscriptionArbiter sa = new SubscriptionArbiter();
downstream.onSubscribe(sa);
RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
flowable.subscribe(retrySubscriber);
});
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
return upstream.doOnNext(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
.doOnComplete(context::onComplete);
}

@Override
public SingleSource<T> apply(Single<T> upstream) {
return Single.fromPublisher(downstream -> {
Flowable<T> flowable = upstream.toFlowable();
SubscriptionArbiter sa = new SubscriptionArbiter();
downstream.onSubscribe(sa);
RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
flowable.subscribe(retrySubscriber);
});
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
return upstream.doOnSuccess(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
.doOnSuccess(t -> context.onComplete());
}

@Override
public CompletableSource apply(Completable upstream) {
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
return upstream.retryWhen(errors -> errors.doOnNext(context::onError))
.doOnComplete(context::onComplete);
}

@Override
public MaybeSource<T> apply(Maybe<T> upstream) {
//noinspection unchecked
Context<T> context = new Context<T>(retry.context());
return upstream.doOnSuccess(context::throwExceptionToForceRetryOnResult)
.retryWhen(errors -> errors.doOnNext(context::onError))
.doOnSuccess(t -> context.onComplete())
.doOnComplete(context::onComplete);
}

static final class RetrySubscriber<T> extends AtomicInteger implements Subscriber<T> {

private final Subscriber<? super T> actual;
private final SubscriptionArbiter sa;
private final Publisher<? extends T> source;
private final Retry.Context context;
private long remaining;
RetrySubscriber(Subscriber<? super T> actual, long count,
SubscriptionArbiter sa, Publisher<? extends T> source,
Retry retry) {
this.actual = actual;
this.sa = sa;
this.source = source;
this.context = retry.context();
this.remaining = count;
private static class Context<T> {
private final Retry.Context<T> context;

Context(Retry.Context<T> context) {
this.context = context;
}

@Override
public void onSubscribe(Subscription s) {
if (LOG.isDebugEnabled()) {
LOG.info("onSubscribe");
}
sa.setSubscription(s);
void onComplete() {
this.context.onSuccess();
}

@Override
public void onNext(T t) {
if (LOG.isDebugEnabled()) {
LOG.info("onNext");
}
context.onSuccess();
actual.onNext(t);
sa.produced(1L);
void throwExceptionToForceRetryOnResult(T value) {
if (context.onResult(value))
throw new RetryDueToResultException();
}
@Override
public void onError(Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.info("onError");
}
long r = remaining;
if (r != Long.MAX_VALUE) {
remaining = r - 1;
}
if (r == 0) {
actual.onError(t);
} else {
try {
context.onError((Exception) t);
subscribeNext();
} catch (Throwable t2) {
actual.onError(t2);
}

void onError(Throwable throwable) throws Exception {
if (throwable instanceof RetryDueToResultException) return;
try {
context.onError(castToException(throwable));
} catch (Throwable throwable1) {
throw castToException(throwable);
}
}

@Override
public void onComplete() {
if (LOG.isDebugEnabled()) {
LOG.info("onComplete");
}
actual.onComplete();
private Exception castToException(Throwable throwable) {
return throwable instanceof Exception ? (Exception) throwable : new Exception(throwable);
}

/**
* Subscribes to the source again via trampolining.
*/
private void subscribeNext() {
if (getAndIncrement() == 0) {
int missed = 1;
for (;;) {
if (sa.isCancelled()) {
return;
}
source.subscribe(this);

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
private static class RetryDueToResultException extends RuntimeException {
RetryDueToResultException() {
super("retry due to retryOnResult predicate");
}
}
}

}
Loading

0 comments on commit 9d4c57e

Please sign in to comment.