Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LambdaConsumerIntrospection #5590

Merged
merged 10 commits into from
Sep 12, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class CallbackCompletableObserver
extends AtomicReference<Disposable> implements CompletableObserver, Disposable, Consumer<Throwable> {
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable, Consumer<Throwable>, LambdaConsumerIntrospection {


private static final long serialVersionUID = -4361286194466301354L;
Expand Down Expand Up @@ -82,4 +84,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasCustomOnError() {
return onError != this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class ConsumerSingleObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable {
implements SingleObserver<T>, Disposable, LambdaConsumerIntrospection {


private static final long serialVersionUID = -7012088219455310787L;
Expand Down Expand Up @@ -74,4 +76,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class EmptyCompletableObserver
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable {
implements CompletableObserver, Disposable, LambdaConsumerIntrospection {


private static final long serialVersionUID = -7545121636549663526L;
Expand Down Expand Up @@ -55,4 +56,8 @@ public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public boolean hasCustomOnError() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
Expand Down Expand Up @@ -101,4 +104,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand All @@ -29,7 +31,7 @@
*/
public final class MaybeCallbackObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {
implements MaybeObserver<T>, Disposable, LambdaConsumerIntrospection {


private static final long serialVersionUID = -6076952298809384986L;
Expand Down Expand Up @@ -96,5 +98,8 @@ public void onComplete() {
}
}


@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import org.reactivestreams.Subscription;

import io.reactivex.FlowableSubscriber;
Expand All @@ -24,7 +26,8 @@
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaSubscriber<T> extends AtomicReference<Subscription> implements FlowableSubscriber<T>, Subscription, Disposable {
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
Expand Down Expand Up @@ -115,4 +118,9 @@ public void request(long n) {
public void cancel() {
SubscriptionHelper.cancel(this);
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.reactivex.observers;

import io.reactivex.annotations.Experimental;

/**
* An interface that indicates that the implementing type is composed of individual components and exposes information
* about their behavior.
*
* <p><em>NOTE:</em> This is considered a read-only public API and is not intended to be implemented externally.
*
* @since 2.1.4 - experimental
*/
@Experimental
public interface LambdaConsumerIntrospection {

/**
* @return {@code true} if a custom onError consumer implementation was supplied. Returns {@code false} if the
* implementation is missing an error consumer and thus using a throwing default implementation.
*/
@Experimental
boolean hasCustomOnError();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we provide hasDefaultOnError() instead? cc @JakeWharton

Because if you think about it from RxJava perspective — that's what library can say about itself: is something has a default value/implementation in compare to what library thinks is default.

hasCustomOnError() basically checks for hasDefaultOnError() and then negates the result which is kinda confusing.

TL;TR: "custom" in the API feels wrong, but maybe it's just me :)


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.reactivex.internal.observers;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import static org.junit.Assert.*;

public final class CallbackCompletableObserverTest {

@Test
public void emptyActionShouldReportNoCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);

assertTrue(o.hasCustomOnError());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.reactivex.internal.observers;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import static org.junit.Assert.*;

public final class ConsumerSingleObserverTest {

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer());

assertTrue(o.hasCustomOnError());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.reactivex.internal.observers;

import org.junit.Test;

import static org.junit.Assert.assertFalse;

public final class EmptyCompletableObserverTest {

@Test
public void defaultShouldReportNoCustomOnError() {
EmptyCompletableObserver o = new EmptyCompletableObserver();

assertFalse(o.hasCustomOnError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.*;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import io.reactivex.Observable;
Expand Down Expand Up @@ -342,4 +343,24 @@ public void accept(Disposable s) throws Exception {

assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
LambdaObserver<Integer> o = new LambdaObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
Functions.<Disposable>emptyConsumer());

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
LambdaObserver<Integer> o = new LambdaObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
Functions.<Disposable>emptyConsumer());

assertTrue(o.hasCustomOnError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@

package io.reactivex.internal.operators.maybe;

import static org.junit.Assert.*;

import java.util.List;

import org.junit.Test;

import io.reactivex.TestHelper;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.*;

public class MaybeCallbackObserverTest {

Expand Down Expand Up @@ -121,4 +120,22 @@ public void run() throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);

assertTrue(o.hasCustomOnError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@

package io.reactivex.internal.subscribers;

import static org.junit.Assert.*;

import java.util.*;

import org.junit.Test;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.flowable.FlowableInternalHelper;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import org.junit.Test;
import org.reactivestreams.*;

import java.util.*;

import static org.junit.Assert.*;

public class LambdaSubscriberTest {

Expand Down Expand Up @@ -347,4 +348,24 @@ public void accept(Subscription s) throws Exception {

assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
LambdaSubscriber<Integer> o = new LambdaSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
LambdaSubscriber<Integer> o = new LambdaSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);

assertTrue(o.hasCustomOnError());
}
}