diff --git a/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java b/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java index cfb758552a..3555751a8e 100644 --- a/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java +++ b/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java @@ -20,10 +20,12 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.observers.LambdaConsumerIntrospection; import io.reactivex.plugins.RxJavaPlugins; public final class CallbackCompletableObserver -extends AtomicReference implements CompletableObserver, Disposable, Consumer { +extends AtomicReference + implements CompletableObserver, Disposable, Consumer, LambdaConsumerIntrospection { private static final long serialVersionUID = -4361286194466301354L; @@ -82,4 +84,9 @@ public void dispose() { public boolean isDisposed() { return get() == DisposableHelper.DISPOSED; } + + @Override + public boolean hasCustomOnError() { + return onError != this; + } } diff --git a/src/main/java/io/reactivex/internal/observers/ConsumerSingleObserver.java b/src/main/java/io/reactivex/internal/observers/ConsumerSingleObserver.java index 525543838e..7c3c4ad3b7 100644 --- a/src/main/java/io/reactivex/internal/observers/ConsumerSingleObserver.java +++ b/src/main/java/io/reactivex/internal/observers/ConsumerSingleObserver.java @@ -20,11 +20,13 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.Consumer; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.LambdaConsumerIntrospection; import io.reactivex.plugins.RxJavaPlugins; public final class ConsumerSingleObserver extends AtomicReference -implements SingleObserver, Disposable { +implements SingleObserver, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -7012088219455310787L; @@ -74,4 +76,9 @@ public void dispose() { public boolean isDisposed() { return get() == DisposableHelper.DISPOSED; } + + @Override + public boolean hasCustomOnError() { + return onError != Functions.ON_ERROR_MISSING; + } } diff --git a/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java b/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java index 64e295566c..5b70ddb624 100644 --- a/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java +++ b/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java @@ -19,11 +19,12 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.OnErrorNotImplementedException; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.observers.LambdaConsumerIntrospection; import io.reactivex.plugins.RxJavaPlugins; public final class EmptyCompletableObserver extends AtomicReference -implements CompletableObserver, Disposable { +implements CompletableObserver, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -7545121636549663526L; @@ -55,4 +56,8 @@ public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); } + @Override + public boolean hasCustomOnError() { + return false; + } } diff --git a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java index 9f06ecac75..041229a1ea 100644 --- a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java +++ b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java @@ -20,9 +20,12 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.LambdaConsumerIntrospection; import io.reactivex.plugins.RxJavaPlugins; -public final class LambdaObserver extends AtomicReference implements Observer, Disposable { +public final class LambdaObserver extends AtomicReference + implements Observer, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -7251123623727029452L; final Consumer onNext; @@ -101,4 +104,9 @@ public void dispose() { public boolean isDisposed() { return get() == DisposableHelper.DISPOSED; } + + @Override + public boolean hasCustomOnError() { + return onError != Functions.ON_ERROR_MISSING; + } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java index 6b440732f4..1a3da7d04b 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserver.java @@ -20,6 +20,8 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.LambdaConsumerIntrospection; import io.reactivex.plugins.RxJavaPlugins; /** @@ -29,7 +31,7 @@ */ public final class MaybeCallbackObserver extends AtomicReference -implements MaybeObserver, Disposable { +implements MaybeObserver, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -6076952298809384986L; @@ -96,5 +98,8 @@ public void onComplete() { } } - + @Override + public boolean hasCustomOnError() { + return onError != Functions.ON_ERROR_MISSING; + } } diff --git a/src/main/java/io/reactivex/internal/subscribers/LambdaSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/LambdaSubscriber.java index 0d9d0b1cf9..5568cf5156 100644 --- a/src/main/java/io/reactivex/internal/subscribers/LambdaSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/LambdaSubscriber.java @@ -15,6 +15,8 @@ import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.LambdaConsumerIntrospection; import org.reactivestreams.Subscription; import io.reactivex.FlowableSubscriber; @@ -24,7 +26,8 @@ import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.plugins.RxJavaPlugins; -public final class LambdaSubscriber extends AtomicReference implements FlowableSubscriber, Subscription, Disposable { +public final class LambdaSubscriber extends AtomicReference + implements FlowableSubscriber, Subscription, Disposable, LambdaConsumerIntrospection { private static final long serialVersionUID = -7251123623727029452L; final Consumer onNext; @@ -115,4 +118,9 @@ public void request(long n) { public void cancel() { SubscriptionHelper.cancel(this); } + + @Override + public boolean hasCustomOnError() { + return onError != Functions.ON_ERROR_MISSING; + } } diff --git a/src/main/java/io/reactivex/observers/LambdaConsumerIntrospection.java b/src/main/java/io/reactivex/observers/LambdaConsumerIntrospection.java new file mode 100644 index 0000000000..a7371d018e --- /dev/null +++ b/src/main/java/io/reactivex/observers/LambdaConsumerIntrospection.java @@ -0,0 +1,23 @@ +package io.reactivex.observers; + +import io.reactivex.annotations.Experimental; + +/** + * An interface that indicates that the implementing type is composed of individual components and exposes information + * about their behavior. + * + *

NOTE: This is considered a read-only public API and is not intended to be implemented externally. + * + * @since 2.1.4 - experimental + */ +@Experimental +public interface LambdaConsumerIntrospection { + + /** + * @return {@code true} if a custom onError consumer implementation was supplied. Returns {@code false} if the + * implementation is missing an error consumer and thus using a throwing default implementation. + */ + @Experimental + boolean hasCustomOnError(); + +} diff --git a/src/test/java/io/reactivex/internal/observers/CallbackCompletableObserverTest.java b/src/test/java/io/reactivex/internal/observers/CallbackCompletableObserverTest.java new file mode 100644 index 0000000000..f7bbeee6d2 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/CallbackCompletableObserverTest.java @@ -0,0 +1,25 @@ +package io.reactivex.internal.observers; + +import io.reactivex.internal.functions.Functions; +import org.junit.Test; + +import static org.junit.Assert.*; + +public final class CallbackCompletableObserverTest { + + @Test + public void emptyActionShouldReportNoCustomOnError() { + CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION); + + assertFalse(o.hasCustomOnError()); + } + + @Test + public void customOnErrorShouldReportCustomOnError() { + CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.emptyConsumer(), + Functions.EMPTY_ACTION); + + assertTrue(o.hasCustomOnError()); + } + +} diff --git a/src/test/java/io/reactivex/internal/observers/ConsumerSingleObserverTest.java b/src/test/java/io/reactivex/internal/observers/ConsumerSingleObserverTest.java new file mode 100644 index 0000000000..75e217d3e7 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/ConsumerSingleObserverTest.java @@ -0,0 +1,26 @@ +package io.reactivex.internal.observers; + +import io.reactivex.internal.functions.Functions; +import org.junit.Test; + +import static org.junit.Assert.*; + +public final class ConsumerSingleObserverTest { + + @Test + public void onErrorMissingShouldReportNoCustomOnError() { + ConsumerSingleObserver o = new ConsumerSingleObserver(Functions.emptyConsumer(), + Functions.ON_ERROR_MISSING); + + assertFalse(o.hasCustomOnError()); + } + + @Test + public void customOnErrorShouldReportCustomOnError() { + ConsumerSingleObserver o = new ConsumerSingleObserver(Functions.emptyConsumer(), + Functions.emptyConsumer()); + + assertTrue(o.hasCustomOnError()); + } + +} diff --git a/src/test/java/io/reactivex/internal/observers/EmptyCompletableObserverTest.java b/src/test/java/io/reactivex/internal/observers/EmptyCompletableObserverTest.java new file mode 100644 index 0000000000..af72e5a376 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/EmptyCompletableObserverTest.java @@ -0,0 +1,15 @@ +package io.reactivex.internal.observers; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; + +public final class EmptyCompletableObserverTest { + + @Test + public void defaultShouldReportNoCustomOnError() { + EmptyCompletableObserver o = new EmptyCompletableObserver(); + + assertFalse(o.hasCustomOnError()); + } +} diff --git a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java index d28a92be4b..d5d3f647d3 100644 --- a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java +++ b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java @@ -17,6 +17,7 @@ import java.util.*; +import io.reactivex.internal.functions.Functions; import org.junit.Test; import io.reactivex.Observable; @@ -342,4 +343,24 @@ public void accept(Disposable s) throws Exception { assertTrue(errors.toString(), errors.get(0) instanceof TestException); } + + @Test + public void onErrorMissingShouldReportNoCustomOnError() { + LambdaObserver o = new LambdaObserver(Functions.emptyConsumer(), + Functions.ON_ERROR_MISSING, + Functions.EMPTY_ACTION, + Functions.emptyConsumer()); + + assertFalse(o.hasCustomOnError()); + } + + @Test + public void customOnErrorShouldReportCustomOnError() { + LambdaObserver o = new LambdaObserver(Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + Functions.emptyConsumer()); + + assertTrue(o.hasCustomOnError()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserverTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserverTest.java index 5a85c2f69d..e810f63e74 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserverTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeCallbackObserverTest.java @@ -13,18 +13,17 @@ package io.reactivex.internal.operators.maybe; -import static org.junit.Assert.*; - -import java.util.List; - -import org.junit.Test; - import io.reactivex.TestHelper; import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.plugins.RxJavaPlugins; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.*; public class MaybeCallbackObserverTest { @@ -121,4 +120,22 @@ public void run() throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void onErrorMissingShouldReportNoCustomOnError() { + MaybeCallbackObserver o = new MaybeCallbackObserver(Functions.emptyConsumer(), + Functions.ON_ERROR_MISSING, + Functions.EMPTY_ACTION); + + assertFalse(o.hasCustomOnError()); + } + + @Test + public void customOnErrorShouldReportCustomOnError() { + MaybeCallbackObserver o = new MaybeCallbackObserver(Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION); + + assertTrue(o.hasCustomOnError()); + } } diff --git a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java index 4666e77569..f07d7bdc30 100644 --- a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java @@ -13,19 +13,20 @@ package io.reactivex.internal.subscribers; -import static org.junit.Assert.*; - -import java.util.*; - -import org.junit.Test; -import org.reactivestreams.*; - import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableInternalHelper; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import org.junit.Test; +import org.reactivestreams.*; + +import java.util.*; + +import static org.junit.Assert.*; public class LambdaSubscriberTest { @@ -347,4 +348,24 @@ public void accept(Subscription s) throws Exception { assertTrue(errors.toString(), errors.get(0) instanceof TestException); } + + @Test + public void onErrorMissingShouldReportNoCustomOnError() { + LambdaSubscriber o = new LambdaSubscriber(Functions.emptyConsumer(), + Functions.ON_ERROR_MISSING, + Functions.EMPTY_ACTION, + FlowableInternalHelper.RequestMax.INSTANCE); + + assertFalse(o.hasCustomOnError()); + } + + @Test + public void customOnErrorShouldReportCustomOnError() { + LambdaSubscriber o = new LambdaSubscriber(Functions.emptyConsumer(), + Functions.emptyConsumer(), + Functions.EMPTY_ACTION, + FlowableInternalHelper.RequestMax.INSTANCE); + + assertTrue(o.hasCustomOnError()); + } }