From 974b651dcae927d26b53b150316c602721bfe233 Mon Sep 17 00:00:00 2001 From: konmik Date: Sat, 17 Oct 2015 00:40:37 +0300 Subject: [PATCH] OnErrorFailedException fix --- src/main/java/rx/Observable.java | 8 +-- src/main/java/rx/exceptions/Exceptions.java | 7 +- .../java/rx/exceptions/ExceptionsTest.java | 72 +++++++++++++++++-- 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 0a2ebab8ce..0acb0e9b2e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -164,15 +164,11 @@ public void call(Subscriber o) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling - if (e instanceof OnErrorNotImplementedException) { - throw (OnErrorNotImplementedException) e; - } + Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { - if (e instanceof OnErrorNotImplementedException) { - throw (OnErrorNotImplementedException) e; - } + Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); diff --git a/src/main/java/rx/exceptions/Exceptions.java b/src/main/java/rx/exceptions/Exceptions.java index a0439028eb..4701e2bb5f 100644 --- a/src/main/java/rx/exceptions/Exceptions.java +++ b/src/main/java/rx/exceptions/Exceptions.java @@ -76,12 +76,7 @@ public static void throwIfFatal(Throwable t) { if (t instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) t; } else if (t instanceof OnErrorFailedException) { - Throwable cause = t.getCause(); - if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else { - throw (OnErrorFailedException) t; - } + throw (OnErrorFailedException) t; } // values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495 else if (t instanceof StackOverflowError) { diff --git a/src/test/java/rx/exceptions/ExceptionsTest.java b/src/test/java/rx/exceptions/ExceptionsTest.java index 4148f1b9e6..96396ccb75 100644 --- a/src/test/java/rx/exceptions/ExceptionsTest.java +++ b/src/test/java/rx/exceptions/ExceptionsTest.java @@ -25,6 +25,8 @@ import rx.Observable; import rx.Observer; import rx.functions.Action1; +import rx.functions.Func1; +import rx.observables.GroupedObservable; import rx.subjects.PublishSubject; public class ExceptionsTest { @@ -45,7 +47,7 @@ public void call(Integer t1) { public void testStackOverflowWouldOccur() { final PublishSubject a = PublishSubject.create(); final PublishSubject b = PublishSubject.create(); - final int MAX_STACK_DEPTH = 1000; + final int MAX_STACK_DEPTH = 800; final AtomicInteger depth = new AtomicInteger(); a.subscribe(new Observer() { @@ -156,10 +158,72 @@ public void onNext(Object o) { } }); fail("expecting an exception to be thrown"); - } catch (CompositeException t) { - assertTrue(t.getExceptions().get(0) instanceof IllegalArgumentException); - assertTrue(t.getExceptions().get(1) instanceof IllegalStateException); + } catch (OnErrorFailedException t) { + CompositeException cause = (CompositeException) t.getCause(); + assertTrue(cause.getExceptions().get(0) instanceof IllegalArgumentException); + assertTrue(cause.getExceptions().get(1) instanceof IllegalStateException); } } + /** + * https://github.com/ReactiveX/RxJava/issues/2998 + */ + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromGroupBy() throws Exception { + Observable + .just(1) + .groupBy(new Func1() { + @Override + public Integer call(Integer integer) { + throw new RuntimeException(); + } + }) + .subscribe(new Observer>() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(); + } + + @Override + public void onNext(GroupedObservable integerIntegerGroupedObservable) { + + } + }); + } + + /** + * https://github.com/ReactiveX/RxJava/issues/2998 + */ + @Test(expected = OnErrorFailedException.class) + public void testOnErrorExceptionIsThrownFromOnNext() throws Exception { + Observable + .just(1) + .doOnNext(new Action1() { + @Override + public void call(Integer integer) { + throw new RuntimeException(); + } + }) + .subscribe(new Observer() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(); + } + + @Override + public void onNext(Integer integer) { + + } + }); + } }