diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index bacdd90667..a76ddb621e 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1432,7 +1432,38 @@ public final Single onErrorReturn(Func1 resumeFunctio * @see ReactiveX operators documentation: Catch */ public final Single onErrorResumeNext(Single resumeSingleInCaseOfError) { - return new Single(new SingleOperatorOnErrorResumeNextViaSingle(this, resumeSingleInCaseOfError)); + return new Single(SingleOperatorOnErrorResumeNext.withOther(this, resumeSingleInCaseOfError)); + } + + /** + * Instructs a Single to pass control to another Single rather than invoking + * {@link Observer#onError(Throwable)} if it encounters an error. + *

+ * + *

+ * By default, when a Single encounters an error that prevents it from emitting the expected item to + * its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits + * without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this + * behavior. If you pass a function that will return another Single ({@code resumeFunctionInCaseOfError}) to an Single's + * {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its + * Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which + * will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, + * because no Single necessarily invokes {@code onError}, the Observer may never know that an error + * happened. + *

+ * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + *

+ *
Scheduler:
+ *
{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param resumeFunctionInCaseOfError a function that returns a Single that will take control if source Single encounters an error. + * @return the original Single, with appropriately modified behavior. + * @see ReactiveX operators documentation: Catch + */ + public final Single onErrorResumeNext(final Func1> resumeFunctionInCaseOfError) { + return new Single(SingleOperatorOnErrorResumeNext.withFunction(this, resumeFunctionInCaseOfError)); } /** diff --git a/src/main/java/rx/exceptions/Exceptions.java b/src/main/java/rx/exceptions/Exceptions.java index 081e4830a8..2b94504c08 100644 --- a/src/main/java/rx/exceptions/Exceptions.java +++ b/src/main/java/rx/exceptions/Exceptions.java @@ -18,6 +18,7 @@ import java.util.*; import rx.Observer; +import rx.SingleSubscriber; import rx.annotations.Experimental; /** @@ -188,6 +189,7 @@ public static void throwOrReport(Throwable t, Observer o, Object value) { Exceptions.throwIfFatal(t); o.onError(OnErrorThrowable.addValueAsLastCause(t, value)); } + /** * Forwards a fatal exception or reports it to the given Observer. * @param t the exception @@ -199,4 +201,17 @@ public static void throwOrReport(Throwable t, Observer o) { Exceptions.throwIfFatal(t); o.onError(t); } + + /** + * Forwards a fatal exception or reports it to the given Observer. + * + * @param throwable the exception. + * @param subscriber the subscriber to report to. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number). + */ + @Experimental + public static void throwOrReport(Throwable throwable, SingleSubscriber subscriber) { + Exceptions.throwIfFatal(throwable); + subscriber.onError(throwable); + } } diff --git a/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNext.java b/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNext.java new file mode 100644 index 0000000000..0abe1bfb8d --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNext.java @@ -0,0 +1,68 @@ +package rx.internal.operators; + +import rx.Single; +import rx.SingleSubscriber; +import rx.exceptions.Exceptions; +import rx.functions.Func1; +import rx.plugins.RxJavaPlugins; + +public class SingleOperatorOnErrorResumeNext implements Single.OnSubscribe { + + private final Single originalSingle; + private final Func1> resumeFunctionInCaseOfError; + + private SingleOperatorOnErrorResumeNext(Single originalSingle, Func1> resumeFunctionInCaseOfError) { + if (originalSingle == null) { + throw new NullPointerException("originalSingle must not be null"); + } + + if (resumeFunctionInCaseOfError == null) { + throw new NullPointerException("resumeFunctionInCaseOfError must not be null"); + } + + this.originalSingle = originalSingle; + this.resumeFunctionInCaseOfError = resumeFunctionInCaseOfError; + } + + public static SingleOperatorOnErrorResumeNext withFunction(Single originalSingle, Func1> resumeFunctionInCaseOfError) { + return new SingleOperatorOnErrorResumeNext(originalSingle, resumeFunctionInCaseOfError); + } + + public static SingleOperatorOnErrorResumeNext withOther(Single originalSingle, final Single resumeSingleInCaseOfError) { + if (resumeSingleInCaseOfError == null) { + throw new NullPointerException("resumeSingleInCaseOfError must not be null"); + } + + return new SingleOperatorOnErrorResumeNext(originalSingle, new Func1>() { + @Override + public Single call(Throwable throwable) { + return resumeSingleInCaseOfError; + } + }); + } + + @Override + public void call(final SingleSubscriber child) { + final SingleSubscriber parent = new SingleSubscriber() { + @Override + public void onSuccess(T value) { + child.onSuccess(value); + } + + @Override + public void onError(Throwable error) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(error); + + try { + unsubscribe(); + resumeFunctionInCaseOfError.call(error).subscribe(child); + } catch (Throwable innerError) { + Exceptions.throwOrReport(innerError, child); + } + } + }; + + child.add(parent); + originalSingle.subscribe(parent); + } +} diff --git a/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java b/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java deleted file mode 100644 index ca47f9c3e9..0000000000 --- a/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java +++ /dev/null @@ -1,45 +0,0 @@ -package rx.internal.operators; - -import rx.Single; -import rx.SingleSubscriber; -import rx.plugins.RxJavaPlugins; - -public class SingleOperatorOnErrorResumeNextViaSingle implements Single.OnSubscribe { - - private final Single originalSingle; - private final Single resumeSingleInCaseOfError; - - public SingleOperatorOnErrorResumeNextViaSingle(Single originalSingle, Single resumeSingleInCaseOfError) { - if (originalSingle == null) { - throw new NullPointerException("originalSingle must not be null"); - } - - if (resumeSingleInCaseOfError == null) { - throw new NullPointerException("resumeSingleInCaseOfError must not be null"); - } - - this.originalSingle = originalSingle; - this.resumeSingleInCaseOfError = resumeSingleInCaseOfError; - } - - @Override - public void call(final SingleSubscriber child) { - final SingleSubscriber parent = new SingleSubscriber() { - @Override - public void onSuccess(T value) { - child.onSuccess(value); - } - - @Override - public void onError(Throwable error) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(error); - unsubscribe(); - - resumeSingleInCaseOfError.subscribe(child); - } - }; - - child.add(parent); - originalSingle.subscribe(parent); - } -} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index d2457da4e9..9e36f36dc3 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -1220,13 +1220,59 @@ public void onErrorResumeNextViaSingleShouldPreventNullSingle() { try { Single .just("value") - .onErrorResumeNext(null); + .onErrorResumeNext((Single) null); fail(); } catch (NullPointerException expected) { assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage()); } } + @Test + public void onErrorResumeNextViaFunctionShouldNotInterruptSuccesfulSingle() { + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .just("success") + .onErrorResumeNext(new Func1>() { + @Override + public Single call(Throwable throwable) { + return Single.just("fail"); + } + }) + .subscribe(testSubscriber); + + testSubscriber.assertValue("success"); + } + + @Test + public void onErrorResumeNextViaFunctionShouldResumeWithPassedSingleInCaseOfError() { + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + . error(new RuntimeException("test exception")) + .onErrorResumeNext(new Func1>() { + @Override + public Single call(Throwable throwable) { + return Single.just("fallback"); + } + }) + .subscribe(testSubscriber); + + testSubscriber.assertValue("fallback"); + } + + @Test + public void onErrorResumeNextViaFunctionShouldPreventNullFunction() { + try { + Single + .just("value") + .onErrorResumeNext((Func1>) null); + fail(); + } catch (NullPointerException expected) { + assertEquals("resumeFunctionInCaseOfError must not be null", expected.getMessage()); + } + } + @Test(expected = NullPointerException.class) public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() { Single.iterableToArray(null);