diff --git a/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java b/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java index 275e33d0db..0d98b3248f 100644 --- a/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java +++ b/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java @@ -17,6 +17,7 @@ import rx.Observable.Operator; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.functions.Func1; import rx.internal.util.UtilityFunctions; @@ -56,7 +57,13 @@ public Subscriber call(final Subscriber child) { @Override public void onNext(T t) { U currentKey = previousKey; - U key = keySelector.call(t); + final U key; + try { + key = keySelector.call(t); + } catch (Throwable e) { + Exceptions.throwOrReport(e, child, t); + return; + } previousKey = key; if (hasPrevious) { diff --git a/src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java b/src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java index fc81a6a906..a913345026 100644 --- a/src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java +++ b/src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.inOrder; @@ -23,6 +24,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; @@ -31,17 +33,18 @@ import rx.Observable; import rx.Observer; +import rx.functions.Action1; import rx.functions.Func1; public class OperatorDistinctUntilChangedTest { @Mock - Observer w; + private Observer w; @Mock - Observer w2; + private Observer w2; // nulls lead to exceptions - final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { + private final static Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { @Override public String call(String s) { if (s.equals("x")) { @@ -50,6 +53,13 @@ public String call(String s) { return s.toUpperCase(); } }; + + private final static Func1 THROWS_NON_FATAL = new Func1() { + @Override + public String call(String s) { + throw new RuntimeException(); + } + }; @Before public void before() { @@ -138,4 +148,20 @@ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() { inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, never()).onCompleted(); } + + @Test + public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() { + Observable src = Observable.just("a", "b", null, "c"); + final AtomicBoolean errorOccurred = new AtomicBoolean(false); + src + .doOnError(new Action1() { + @Override + public void call(Throwable t) { + errorOccurred.set(true); + } + }) + .distinctUntilChanged(THROWS_NON_FATAL) + .subscribe(w); + assertFalse(errorOccurred.get()); + } }