diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index f0b1f2bea0..81028f726e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -2317,6 +2317,26 @@ public final Completable retry(@NonNull Predicate super Throwable> predicate)
return fromPublisher(toFlowable().retry(predicate));
}
+ /**
+ * Retries until the given stop function returns {@code true}.
+ *
+ *
+ *
+ * - Scheduler:
+ * - {@code retryUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param stop the function that should return {@code true} to stop retrying
+ * @return the new {@code Completable} instance
+ * @throws NullPointerException if {@code stop} is {@code null}
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Completable retryUntil(@NonNull BooleanSupplier stop) {
+ Objects.requireNonNull(stop, "stop is null");
+ return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
+ }
+
/**
* Returns a {@code Completable} which given a {@link Publisher} and when this {@code Completable} emits an error, delivers
* that error through a {@link Flowable} and the {@code Publisher} should signal a value indicating a retry in response
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 7bdfe05ee5..98065a40d5 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -4433,6 +4433,8 @@ public final Maybe retry(@NonNull Predicate super Throwable> predicate) {
/**
* Retries until the given stop function returns {@code true}.
+ *
+ *
*
* - Scheduler:
* - {@code retryUntil} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index f6e41cee5c..e17ff9e8ec 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -3747,6 +3747,26 @@ public final Single retry(@NonNull Predicate super Throwable> predicate) {
return toSingle(toFlowable().retry(predicate));
}
+ /**
+ * Retries until the given stop function returns {@code true}.
+ *
+ *
+ *
+ * - Scheduler:
+ * - {@code retryUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param stop the function that should return {@code true} to stop retrying
+ * @return the new {@code Single} instance
+ * @throws NullPointerException if {@code stop} is {@code null}
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Single retryUntil(@NonNull BooleanSupplier stop) {
+ Objects.requireNonNull(stop, "stop is null");
+ return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop));
+ }
+
/**
* Re-subscribes to the current {@code Single} if and when the {@link Publisher} returned by the handler
* function signals a value.
diff --git a/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java b/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java
index 909ce35669..c805f68c6f 100644
--- a/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java
+++ b/src/test/java/io/reactivex/rxjava3/completable/CompletableRetryTest.java
@@ -20,6 +20,7 @@
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.Functions;
@@ -113,4 +114,42 @@ public void retryTimesPredicateWithZeroRetries() {
assertEquals(1, numberOfSubscribeCalls.get());
}
+
+ @Test
+ public void untilTrueEmpty() {
+ Completable.complete()
+ .retryUntil(() -> true)
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void untilFalseEmpty() {
+ Completable.complete()
+ .retryUntil(() -> false)
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void untilTrueError() {
+ Completable.error(new TestException())
+ .retryUntil(() -> true)
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void untilFalseError() {
+ AtomicInteger counter = new AtomicInteger();
+ Completable.defer(() -> {
+ if (counter.getAndIncrement() == 0) {
+ return Completable.error(new TestException());
+ }
+ return Completable.complete();
+ })
+ .retryUntil(() -> false)
+ .test()
+ .assertResult();
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java b/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java
index f260a78a82..6c88f8501c 100644
--- a/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java
+++ b/src/test/java/io/reactivex/rxjava3/maybe/MaybeRetryTest.java
@@ -21,6 +21,7 @@
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.functions.Functions;
@@ -120,4 +121,58 @@ public void retryTimesPredicateWithZeroRetries() {
assertEquals(1, numberOfSubscribeCalls.get());
}
+
+ @Test
+ public void untilTrueJust() {
+ Maybe.just(1)
+ .retryUntil(() -> true)
+ .test()
+ .assertResult(1);
+ }
+
+ @Test
+ public void untilFalseJust() {
+ Maybe.just(1)
+ .retryUntil(() -> false)
+ .test()
+ .assertResult(1);
+ }
+
+ @Test
+ public void untilTrueEmpty() {
+ Maybe.empty()
+ .retryUntil(() -> true)
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void untilFalseEmpty() {
+ Maybe.empty()
+ .retryUntil(() -> false)
+ .test()
+ .assertResult();
+ }
+
+ @Test
+ public void untilTrueError() {
+ Maybe.error(new TestException())
+ .retryUntil(() -> true)
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void untilFalseError() {
+ AtomicInteger counter = new AtomicInteger();
+ Maybe.defer(() -> {
+ if (counter.getAndIncrement() == 0) {
+ return Maybe.error(new TestException());
+ }
+ return Maybe.just(1);
+ })
+ .retryUntil(() -> false)
+ .test()
+ .assertResult(1);
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java b/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java
index 1d5a1bff33..2fe5fa9f49 100644
--- a/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java
+++ b/src/test/java/io/reactivex/rxjava3/single/SingleRetryTest.java
@@ -21,6 +21,7 @@
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.functions.Functions;
@@ -120,4 +121,42 @@ public void retryTimesPredicateWithZeroRetries() {
assertEquals(1, numberOfSubscribeCalls.get());
}
+
+ @Test
+ public void untilTrueJust() {
+ Single.just(1)
+ .retryUntil(() -> true)
+ .test()
+ .assertResult(1);
+ }
+
+ @Test
+ public void untilFalseJust() {
+ Single.just(1)
+ .retryUntil(() -> false)
+ .test()
+ .assertResult(1);
+ }
+
+ @Test
+ public void untilTrueError() {
+ Single.error(new TestException())
+ .retryUntil(() -> true)
+ .test()
+ .assertFailure(TestException.class);
+ }
+
+ @Test
+ public void untilFalseError() {
+ AtomicInteger counter = new AtomicInteger();
+ Single.defer(() -> {
+ if (counter.getAndIncrement() == 0) {
+ return Single.error(new TestException());
+ }
+ return Single.just(1);
+ })
+ .retryUntil(() -> false)
+ .test()
+ .assertResult(1);
+ }
}