diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index f0b1f2bea0..81028f726e 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -2317,6 +2317,26 @@ public final Completable retry(@NonNull Predicate predicate) return fromPublisher(toFlowable().retry(predicate)); } + /** + * Retries until the given stop function returns {@code true}. + *

+ * + *

+ *
Scheduler:
+ *
{@code retryUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param stop the function that should return {@code true} to stop retrying + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code stop} is {@code null} + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable retryUntil(@NonNull BooleanSupplier stop) { + Objects.requireNonNull(stop, "stop is null"); + return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop)); + } + /** * Returns a {@code Completable} which given a {@link Publisher} and when this {@code Completable} emits an error, delivers * that error through a {@link Flowable} and the {@code Publisher} should signal a value indicating a retry in response diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 7bdfe05ee5..98065a40d5 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -4433,6 +4433,8 @@ public final Maybe retry(@NonNull Predicate predicate) { /** * Retries until the given stop function returns {@code true}. + *

+ * *

*
Scheduler:
*
{@code retryUntil} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index f6e41cee5c..e17ff9e8ec 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -3747,6 +3747,26 @@ public final Single retry(@NonNull Predicate predicate) { return toSingle(toFlowable().retry(predicate)); } + /** + * Retries until the given stop function returns {@code true}. + *

+ * + *

+ *
Scheduler:
+ *
{@code retryUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param stop the function that should return {@code true} to stop retrying + * @return the new {@code Single} instance + * @throws NullPointerException if {@code stop} is {@code null} + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Single retryUntil(@NonNull BooleanSupplier stop) { + Objects.requireNonNull(stop, "stop is null"); + return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop)); + } + /** * Re-subscribes to the current {@code Single} if and when the {@link Publisher} returned by the handler * function signals a value. diff --git a/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java b/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java index 909ce35669..c805f68c6f 100644 --- a/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java +++ b/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; @@ -113,4 +114,42 @@ public void retryTimesPredicateWithZeroRetries() { assertEquals(1, numberOfSubscribeCalls.get()); } + + @Test + public void untilTrueEmpty() { + Completable.complete() + .retryUntil(() -> true) + .test() + .assertResult(); + } + + @Test + public void untilFalseEmpty() { + Completable.complete() + .retryUntil(() -> false) + .test() + .assertResult(); + } + + @Test + public void untilTrueError() { + Completable.error(new TestException()) + .retryUntil(() -> true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void untilFalseError() { + AtomicInteger counter = new AtomicInteger(); + Completable.defer(() -> { + if (counter.getAndIncrement() == 0) { + return Completable.error(new TestException()); + } + return Completable.complete(); + }) + .retryUntil(() -> false) + .test() + .assertResult(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java b/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java index f260a78a82..6c88f8501c 100644 --- a/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java +++ b/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.Predicate; import io.reactivex.rxjava3.internal.functions.Functions; @@ -120,4 +121,58 @@ public void retryTimesPredicateWithZeroRetries() { assertEquals(1, numberOfSubscribeCalls.get()); } + + @Test + public void untilTrueJust() { + Maybe.just(1) + .retryUntil(() -> true) + .test() + .assertResult(1); + } + + @Test + public void untilFalseJust() { + Maybe.just(1) + .retryUntil(() -> false) + .test() + .assertResult(1); + } + + @Test + public void untilTrueEmpty() { + Maybe.empty() + .retryUntil(() -> true) + .test() + .assertResult(); + } + + @Test + public void untilFalseEmpty() { + Maybe.empty() + .retryUntil(() -> false) + .test() + .assertResult(); + } + + @Test + public void untilTrueError() { + Maybe.error(new TestException()) + .retryUntil(() -> true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void untilFalseError() { + AtomicInteger counter = new AtomicInteger(); + Maybe.defer(() -> { + if (counter.getAndIncrement() == 0) { + return Maybe.error(new TestException()); + } + return Maybe.just(1); + }) + .retryUntil(() -> false) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java b/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java index 1d5a1bff33..2fe5fa9f49 100644 --- a/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java +++ b/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.Predicate; import io.reactivex.rxjava3.internal.functions.Functions; @@ -120,4 +121,42 @@ public void retryTimesPredicateWithZeroRetries() { assertEquals(1, numberOfSubscribeCalls.get()); } + + @Test + public void untilTrueJust() { + Single.just(1) + .retryUntil(() -> true) + .test() + .assertResult(1); + } + + @Test + public void untilFalseJust() { + Single.just(1) + .retryUntil(() -> false) + .test() + .assertResult(1); + } + + @Test + public void untilTrueError() { + Single.error(new TestException()) + .retryUntil(() -> true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void untilFalseError() { + AtomicInteger counter = new AtomicInteger(); + Single.defer(() -> { + if (counter.getAndIncrement() == 0) { + return Single.error(new TestException()); + } + return Single.just(1); + }) + .retryUntil(() -> false) + .test() + .assertResult(1); + } }