Skip to content

Commit

Permalink
Fix other places that may swallow OnErrorFailedException
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Oct 24, 2015
1 parent 92fe02d commit 518d9f0
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 19 deletions.
8 changes: 2 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8176,10 +8176,8 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down Expand Up @@ -8271,10 +8269,8 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down
18 changes: 5 additions & 13 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,14 @@ public void call(Subscriber<? super R> o) {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
Exceptions.throwIfFatal(e);
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
Expand Down Expand Up @@ -1507,10 +1503,8 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down Expand Up @@ -1596,10 +1590,8 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down
111 changes: 111 additions & 0 deletions src/test/java/rx/exceptions/ExceptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import org.junit.Test;

import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
Expand Down Expand Up @@ -226,4 +229,112 @@ public void onNext(Integer integer) {
}
});
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSubscribe() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s1) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).subscribe(s1);
}
}
).subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromUnsafeSubscribe() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s1) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).unsafeSubscribe(s1);
}
}
).subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSingleDoOnSuccess() throws Exception {
Single.just(1)
.doOnSuccess(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException();
}
})
.subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSingleSubscribe() {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s1) {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).subscribe(s1);
}
}
).subscribe(new OnErrorFailedSubscriber());
}

@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromSingleUnsafeSubscribe() {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s1) {
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> s2) {
throw new IllegalArgumentException("original exception");
}
}).unsafeSubscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
s1.onError(e);
}

@Override
public void onNext(Integer v) {
s1.onSuccess(v);
}

});
}
}
).subscribe(new OnErrorFailedSubscriber());
}

private class OnErrorFailedSubscriber extends Subscriber<Integer> {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
throw new RuntimeException();
}

@Override
public void onNext(Integer value) {
}
}
}

0 comments on commit 518d9f0

Please sign in to comment.