Skip to content

Commit

Permalink
Merge pull request #1817 from benjchristensen/lift-error-handling
Browse files Browse the repository at this point in the history
Fix Synchronous OnSubscribe Exception Skips Operators
  • Loading branch information
benjchristensen committed Nov 2, 2014
2 parents d8ebda5 + de7b1f5 commit 3722fe0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
21 changes: 15 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,25 @@ public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(lift).call(o);
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.mockito.Mockito;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
Expand Down Expand Up @@ -105,6 +106,55 @@ public String call(String s) {
verify(observer, times(1)).onNext("twoResume");
verify(observer, times(1)).onNext("threeResume");
}

@Test
public void testResumeNextWithFailedOnSubscribe() {
Subscription s = mock(Subscription.class);
Observable<String> testObservable = Observable.create(new OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> t1) {
throw new RuntimeException("force failure");
}

});
Observable<String> resume = Observable.just("resume");
Observable<String> observable = testObservable.onErrorResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
observable.subscribe(observer);

verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
verify(observer, times(1)).onNext("resume");
}

@Test
public void testResumeNextWithFailedOnSubscribeAsync() {
Subscription s = mock(Subscription.class);
Observable<String> testObservable = Observable.create(new OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> t1) {
throw new RuntimeException("force failure");
}

});
Observable<String> resume = Observable.just("resume");
Observable<String> observable = testObservable.subscribeOn(Schedulers.io()).onErrorResumeNext(resume);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
observable.subscribe(ts);

ts.awaitTerminalEvent();

verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
verify(observer, times(1)).onNext("resume");
}

private static class TestObservable implements Observable.OnSubscribe<String> {

Expand Down

0 comments on commit 3722fe0

Please sign in to comment.