diff --git a/rxjava-core/src/main/java/rx/operators/SafeObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java index 3e6508dac9..4768fb8653 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -19,7 +19,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import rx.Observer; +import rx.Subscription; import rx.plugins.RxJavaPlugins; +import rx.subscriptions.Subscriptions; import rx.util.CompositeException; import rx.util.OnErrorNotImplementedException; @@ -59,7 +61,12 @@ public class SafeObserver implements Observer { private final Observer actual; private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final SafeObservableSubscription subscription; + private final Subscription subscription; + + public SafeObserver(Observer actual) { + this.subscription = Subscriptions.empty(); + this.actual = actual; + } public SafeObserver(SafeObservableSubscription subscription, Observer actual) { this.subscription = subscription; @@ -73,44 +80,18 @@ public void onCompleted() { actual.onCompleted(); } catch (Throwable e) { // handle errors if the onCompleted implementation fails, not just if the Observable fails - onError(e); + _onError(e); + } finally { + // auto-unsubscribe + subscription.unsubscribe(); } - // auto-unsubscribe - subscription.unsubscribe(); } } @Override public void onError(Throwable e) { if (isFinished.compareAndSet(false, true)) { - try { - actual.onError(e); - } catch (Throwable e2) { - if (e2 instanceof OnErrorNotImplementedException) { - /** - * onError isn't implemented so throw - * - * https://github.com/Netflix/RxJava/issues/198 - * - * Rx Design Guidelines 5.2 - * - * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be - * to rethrow the exception on the thread that the message comes out from the observable sequence. - * The OnCompleted behavior in this case is to do nothing." - */ - throw (OnErrorNotImplementedException) e2; - } else { - // if the onError itself fails then pass to the plugin - // see https://github.com/Netflix/RxJava/issues/216 for further discussion - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - RxJavaPlugins.getInstance().getErrorHandler().handleError(e2); - // and throw exception despite that not being proper for Rx - // https://github.com/Netflix/RxJava/issues/198 - throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); - } - } - // auto-unsubscribe - subscription.unsubscribe(); + _onError(e); } } @@ -126,4 +107,41 @@ public void onNext(T args) { } } + /* + * The logic for `onError` without the `isFinished` check so it can be called from within `onCompleted`. + * + * See https://github.com/Netflix/RxJava/issues/630 for the report of this bug. + */ + protected void _onError(Throwable e) { + try { + actual.onError(e); + } catch (Throwable e2) { + if (e2 instanceof OnErrorNotImplementedException) { + /** + * onError isn't implemented so throw + * + * https://github.com/Netflix/RxJava/issues/198 + * + * Rx Design Guidelines 5.2 + * + * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be + * to rethrow the exception on the thread that the message comes out from the observable sequence. + * The OnCompleted behavior in this case is to do nothing." + */ + throw (OnErrorNotImplementedException) e2; + } else { + // if the onError itself fails then pass to the plugin + // see https://github.com/Netflix/RxJava/issues/216 for further discussion + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaPlugins.getInstance().getErrorHandler().handleError(e2); + // and throw exception despite that not being proper for Rx + // https://github.com/Netflix/RxJava/issues/198 + throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); + } + } finally { + // auto-unsubscribe + subscription.unsubscribe(); + } + } + } diff --git a/rxjava-core/src/test/java/rx/operators/SafeObserverTest.java b/rxjava-core/src/test/java/rx/operators/SafeObserverTest.java new file mode 100644 index 0000000000..d59a6db6b3 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/SafeObserverTest.java @@ -0,0 +1,197 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observer; + +public class SafeObserverTest { + + @Test + public void onNextFailure() { + AtomicReference onError = new AtomicReference(); + try { + OBSERVER_ONNEXT_FAIL(onError).onNext("one"); + fail("expects exception to be thrown"); + } catch (Exception e) { + // expected + assertNull(onError.get()); + } + } + + @Test + public void onNextFailureSafe() { + AtomicReference onError = new AtomicReference(); + try { + new SafeObserver(OBSERVER_ONNEXT_FAIL(onError)).onNext("one"); + assertNotNull(onError.get()); + } catch (Exception e) { + fail("expects exception to be passed to onError"); + } + } + + @Test + public void onCompletedFailure() { + AtomicReference onError = new AtomicReference(); + try { + OBSERVER_ONCOMPLETED_FAIL(onError).onCompleted(); + fail("expects exception to be thrown"); + } catch (Exception e) { + // expected + assertNull(onError.get()); + } + } + + @Test + public void onCompletedFailureSafe() { + AtomicReference onError = new AtomicReference(); + try { + new SafeObserver(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted(); + assertNotNull(onError.get()); + } catch (Exception e) { + fail("expects exception to be passed to onError"); + } + } + + @Test + public void onErrorFailure() { + try { + OBSERVER_ONERROR_FAIL().onError(new RuntimeException("error!")); + fail("expects exception to be thrown"); + } catch (Exception e) { + // expected + } + } + + @Test + public void onErrorFailureSafe() { + try { + new SafeObserver(OBSERVER_ONERROR_FAIL()).onError(new RuntimeException("error!")); + fail("expects exception to be thrown"); + } catch (Exception e) { + // expected since onError fails so SafeObserver can't help + } + } + + @Test + public void onNextOnErrorFailure() { + try { + OBSERVER_ONNEXT_ONERROR_FAIL().onError(new RuntimeException("error!")); + fail("expects exception to be thrown"); + } catch (Exception e) { + // expected + } + } + + @Test + public void onNextOnErrorFailureSafe() { + try { + new SafeObserver(OBSERVER_ONNEXT_ONERROR_FAIL()).onError(new RuntimeException("error!")); + fail("expects exception to be thrown"); + } catch (Exception e) { + // expected since onError fails so SafeObserver can't help + } + } + + private static Observer OBSERVER_ONNEXT_FAIL(final AtomicReference onError) { + return new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + onError.set(e); + } + + @Override + public void onNext(String args) { + throw new RuntimeException("onNextFail"); + } + }; + + } + + private static Observer OBSERVER_ONNEXT_ONERROR_FAIL() { + return new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException("onErrortFail"); + } + + @Override + public void onNext(String args) { + throw new RuntimeException("onNextFail"); + } + + }; + } + + private static Observer OBSERVER_ONERROR_FAIL() { + return new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException("onErrorFail"); + } + + @Override + public void onNext(String args) { + + } + + }; + } + + private static Observer OBSERVER_ONCOMPLETED_FAIL(final AtomicReference onError) { + return new Observer() { + + @Override + public void onCompleted() { + throw new RuntimeException("onCompletedFail"); + } + + @Override + public void onError(Throwable e) { + onError.set(e); + } + + @Override + public void onNext(String args) { + + } + + }; + } +}