Skip to content

Commit

Permalink
Fix SafeObserver handling of onComplete errors
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Dec 24, 2013
1 parent a252dca commit f667642
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 32 deletions.
82 changes: 50 additions & 32 deletions rxjava-core/src/main/java/rx/operators/SafeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observer;
import rx.Subscription;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.CompositeException;
import rx.util.OnErrorNotImplementedException;

Expand Down Expand Up @@ -59,7 +61,12 @@ public class SafeObserver<T> implements Observer<T> {

private final Observer<? super T> actual;
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final SafeObservableSubscription subscription;
private final Subscription subscription;

public SafeObserver(Observer<? super T> actual) {
this.subscription = Subscriptions.empty();
this.actual = actual;
}

public SafeObserver(SafeObservableSubscription subscription, Observer<? super T> actual) {
this.subscription = subscription;
Expand All @@ -73,44 +80,18 @@ public void onCompleted() {
actual.onCompleted();
} catch (Throwable e) {
// handle errors if the onCompleted implementation fails, not just if the Observable fails
onError(e);
_onError(e);
} finally {
// auto-unsubscribe
subscription.unsubscribe();
}
// auto-unsubscribe
subscription.unsubscribe();
}
}

@Override
public void onError(Throwable e) {
if (isFinished.compareAndSet(false, true)) {
try {
actual.onError(e);
} catch (Throwable e2) {
if (e2 instanceof OnErrorNotImplementedException) {
/**
* onError isn't implemented so throw
*
* https://github.com/Netflix/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
* to rethrow the exception on the thread that the message comes out from the observable sequence.
* The OnCompleted behavior in this case is to do nothing."
*/
throw (OnErrorNotImplementedException) e2;
} else {
// if the onError itself fails then pass to the plugin
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
// and throw exception despite that not being proper for Rx
// https://github.com/Netflix/RxJava/issues/198
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
}
// auto-unsubscribe
subscription.unsubscribe();
_onError(e);
}
}

Expand All @@ -126,4 +107,41 @@ public void onNext(T args) {
}
}

/*
* The logic for `onError` without the `isFinished` check so it can be called from within `onCompleted`.
*
* See https://github.com/Netflix/RxJava/issues/630 for the report of this bug.
*/
protected void _onError(Throwable e) {
try {
actual.onError(e);
} catch (Throwable e2) {
if (e2 instanceof OnErrorNotImplementedException) {
/**
* onError isn't implemented so throw
*
* https://github.com/Netflix/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
* to rethrow the exception on the thread that the message comes out from the observable sequence.
* The OnCompleted behavior in this case is to do nothing."
*/
throw (OnErrorNotImplementedException) e2;
} else {
// if the onError itself fails then pass to the plugin
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
// and throw exception despite that not being proper for Rx
// https://github.com/Netflix/RxJava/issues/198
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
} finally {
// auto-unsubscribe
subscription.unsubscribe();
}
}

}
197 changes: 197 additions & 0 deletions rxjava-core/src/test/java/rx/operators/SafeObserverTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;

import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.Observer;

public class SafeObserverTest {

@Test
public void onNextFailure() {
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
try {
OBSERVER_ONNEXT_FAIL(onError).onNext("one");
fail("expects exception to be thrown");
} catch (Exception e) {
// expected
assertNull(onError.get());
}
}

@Test
public void onNextFailureSafe() {
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
try {
new SafeObserver<String>(OBSERVER_ONNEXT_FAIL(onError)).onNext("one");
assertNotNull(onError.get());
} catch (Exception e) {
fail("expects exception to be passed to onError");
}
}

@Test
public void onCompletedFailure() {
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
try {
OBSERVER_ONCOMPLETED_FAIL(onError).onCompleted();
fail("expects exception to be thrown");
} catch (Exception e) {
// expected
assertNull(onError.get());
}
}

@Test
public void onCompletedFailureSafe() {
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
try {
new SafeObserver<String>(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted();
assertNotNull(onError.get());
} catch (Exception e) {
fail("expects exception to be passed to onError");
}
}

@Test
public void onErrorFailure() {
try {
OBSERVER_ONERROR_FAIL().onError(new RuntimeException("error!"));
fail("expects exception to be thrown");
} catch (Exception e) {
// expected
}
}

@Test
public void onErrorFailureSafe() {
try {
new SafeObserver<String>(OBSERVER_ONERROR_FAIL()).onError(new RuntimeException("error!"));
fail("expects exception to be thrown");
} catch (Exception e) {
// expected since onError fails so SafeObserver can't help
}
}

@Test
public void onNextOnErrorFailure() {
try {
OBSERVER_ONNEXT_ONERROR_FAIL().onError(new RuntimeException("error!"));
fail("expects exception to be thrown");
} catch (Exception e) {
// expected
}
}

@Test
public void onNextOnErrorFailureSafe() {
try {
new SafeObserver<String>(OBSERVER_ONNEXT_ONERROR_FAIL()).onError(new RuntimeException("error!"));
fail("expects exception to be thrown");
} catch (Exception e) {
// expected since onError fails so SafeObserver can't help
}
}

private static Observer<String> OBSERVER_ONNEXT_FAIL(final AtomicReference<Throwable> onError) {
return new Observer<String>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
onError.set(e);
}

@Override
public void onNext(String args) {
throw new RuntimeException("onNextFail");
}
};

}

private static Observer<String> OBSERVER_ONNEXT_ONERROR_FAIL() {
return new Observer<String>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException("onErrortFail");
}

@Override
public void onNext(String args) {
throw new RuntimeException("onNextFail");
}

};
}

private static Observer<String> OBSERVER_ONERROR_FAIL() {
return new Observer<String>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException("onErrorFail");
}

@Override
public void onNext(String args) {

}

};
}

private static Observer<String> OBSERVER_ONCOMPLETED_FAIL(final AtomicReference<Throwable> onError) {
return new Observer<String>() {

@Override
public void onCompleted() {
throw new RuntimeException("onCompletedFail");
}

@Override
public void onError(Throwable e) {
onError.set(e);
}

@Override
public void onNext(String args) {

}

};
}
}

0 comments on commit f667642

Please sign in to comment.