Skip to content

Commit

Permalink
Merge pull request #3455 from konmik/on-error-failed-exception-fix
Browse files Browse the repository at this point in the history
OnErrorFailedException fix
  • Loading branch information
akarnokd committed Oct 20, 2015
2 parents cc28527 + 974b651 commit 92fe02d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
8 changes: 2 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,11 @@ public void call(Subscriber<? super R> o) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ public static void throwIfFatal(Throwable t) {
if (t instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) t;
} else if (t instanceof OnErrorFailedException) {
Throwable cause = t.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw (OnErrorFailedException) t;
}
throw (OnErrorFailedException) t;
}
// values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
else if (t instanceof StackOverflowError) {
Expand Down
72 changes: 68 additions & 4 deletions src/test/java/rx/exceptions/ExceptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;

public class ExceptionsTest {
Expand All @@ -45,7 +47,7 @@ public void call(Integer t1) {
public void testStackOverflowWouldOccur() {
final PublishSubject<Integer> a = PublishSubject.create();
final PublishSubject<Integer> b = PublishSubject.create();
final int MAX_STACK_DEPTH = 1000;
final int MAX_STACK_DEPTH = 800;
final AtomicInteger depth = new AtomicInteger();

a.subscribe(new Observer<Integer>() {
Expand Down Expand Up @@ -156,10 +158,72 @@ public void onNext(Object o) {
}
});
fail("expecting an exception to be thrown");
} catch (CompositeException t) {
assertTrue(t.getExceptions().get(0) instanceof IllegalArgumentException);
assertTrue(t.getExceptions().get(1) instanceof IllegalStateException);
} catch (OnErrorFailedException t) {
CompositeException cause = (CompositeException) t.getCause();
assertTrue(cause.getExceptions().get(0) instanceof IllegalArgumentException);
assertTrue(cause.getExceptions().get(1) instanceof IllegalStateException);
}
}

/**
* https://github.com/ReactiveX/RxJava/issues/2998
*/
@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromGroupBy() throws Exception {
Observable
.just(1)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
throw new RuntimeException();
}
})
.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException();
}

@Override
public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

}
});
}

/**
* https://github.com/ReactiveX/RxJava/issues/2998
*/
@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromOnNext() throws Exception {
Observable
.just(1)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException();
}

@Override
public void onNext(Integer integer) {

}
});
}
}

0 comments on commit 92fe02d

Please sign in to comment.