Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add Single/Completable retryUntil + marbles #6869

Merged
merged 1 commit into from
Jan 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,26 @@ public final Completable retry(@NonNull Predicate<? super Throwable> predicate)
return fromPublisher(toFlowable().retry(predicate));
}

/**
* Retries until the given stop function returns {@code true}.
* <p>
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retryUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param stop the function that should return {@code true} to stop retrying
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code stop} is {@code null}
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retryUntil(@NonNull BooleanSupplier stop) {
Objects.requireNonNull(stop, "stop is null");
return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
}

/**
* Returns a {@code Completable} which given a {@link Publisher} and when this {@code Completable} emits an error, delivers
* that error through a {@link Flowable} and the {@code Publisher} should signal a value indicating a retry in response
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -4433,6 +4433,8 @@ public final Maybe<T> retry(@NonNull Predicate<? super Throwable> predicate) {

/**
* Retries until the given stop function returns {@code true}.
* <p>
* <img width="640" height="285" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.retryUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -3747,6 +3747,26 @@ public final Single<T> retry(@NonNull Predicate<? super Throwable> predicate) {
return toSingle(toFlowable().retry(predicate));
}

/**
* Retries until the given stop function returns {@code true}.
* <p>
* <img width="640" height="364" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.retryUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param stop the function that should return {@code true} to stop retrying
* @return the new {@code Single} instance
* @throws NullPointerException if {@code stop} is {@code null}
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryUntil(@NonNull BooleanSupplier stop) {
Objects.requireNonNull(stop, "stop is null");
return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
}

/**
* Re-subscribes to the current {@code Single} if and when the {@link Publisher} returned by the handler
* function signals a value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.Functions;

Expand Down Expand Up @@ -113,4 +114,42 @@ public void retryTimesPredicateWithZeroRetries() {

assertEquals(1, numberOfSubscribeCalls.get());
}

@Test
public void untilTrueEmpty() {
Completable.complete()
.retryUntil(() -> true)
.test()
.assertResult();
}

@Test
public void untilFalseEmpty() {
Completable.complete()
.retryUntil(() -> false)
.test()
.assertResult();
}

@Test
public void untilTrueError() {
Completable.error(new TestException())
.retryUntil(() -> true)
.test()
.assertFailure(TestException.class);
}

@Test
public void untilFalseError() {
AtomicInteger counter = new AtomicInteger();
Completable.defer(() -> {
if (counter.getAndIncrement() == 0) {
return Completable.error(new TestException());
}
return Completable.complete();
})
.retryUntil(() -> false)
.test()
.assertResult();
}
}
55 changes: 55 additions & 0 deletions src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.functions.Functions;

Expand Down Expand Up @@ -120,4 +121,58 @@ public void retryTimesPredicateWithZeroRetries() {

assertEquals(1, numberOfSubscribeCalls.get());
}

@Test
public void untilTrueJust() {
Maybe.just(1)
.retryUntil(() -> true)
.test()
.assertResult(1);
}

@Test
public void untilFalseJust() {
Maybe.just(1)
.retryUntil(() -> false)
.test()
.assertResult(1);
}

@Test
public void untilTrueEmpty() {
Maybe.empty()
.retryUntil(() -> true)
.test()
.assertResult();
}

@Test
public void untilFalseEmpty() {
Maybe.empty()
.retryUntil(() -> false)
.test()
.assertResult();
}

@Test
public void untilTrueError() {
Maybe.error(new TestException())
.retryUntil(() -> true)
.test()
.assertFailure(TestException.class);
}

@Test
public void untilFalseError() {
AtomicInteger counter = new AtomicInteger();
Maybe.defer(() -> {
if (counter.getAndIncrement() == 0) {
return Maybe.error(new TestException());
}
return Maybe.just(1);
})
.retryUntil(() -> false)
.test()
.assertResult(1);
}
}
39 changes: 39 additions & 0 deletions src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.functions.Functions;

Expand Down Expand Up @@ -120,4 +121,42 @@ public void retryTimesPredicateWithZeroRetries() {

assertEquals(1, numberOfSubscribeCalls.get());
}

@Test
public void untilTrueJust() {
Single.just(1)
.retryUntil(() -> true)
.test()
.assertResult(1);
}

@Test
public void untilFalseJust() {
Single.just(1)
.retryUntil(() -> false)
.test()
.assertResult(1);
}

@Test
public void untilTrueError() {
Single.error(new TestException())
.retryUntil(() -> true)
.test()
.assertFailure(TestException.class);
}

@Test
public void untilFalseError() {
AtomicInteger counter = new AtomicInteger();
Single.defer(() -> {
if (counter.getAndIncrement() == 0) {
return Single.error(new TestException());
}
return Single.just(1);
})
.retryUntil(() -> false)
.test()
.assertResult(1);
}
}