Skip to content

Commit

Permalink
Implement LambdaConsumerIntrospection (#5590)
Browse files Browse the repository at this point in the history
* Implement HasDefaultErrorConsumer

Followup from #5569, and allows you to introspect if the resulting observer has missing error consumption and subsequently supplies a default (throwing) one.

* Add `@since`

* Add tests

* Add support in relevant completable observers

* Add support in ConsumerSingleObserver

* Add support in MaybeCallbackObserverTest

* Add support in LambdaSubscriber

* Switch to CompositeObserver and onErrorImplemented()

* Update wording to use Introspection

* Update tests and flip implementation logic to match naming
  • Loading branch information
ZacSweers authored and akarnokd committed Sep 12, 2017
1 parent 797301b commit 99a620a
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class CallbackCompletableObserver
extends AtomicReference<Disposable> implements CompletableObserver, Disposable, Consumer<Throwable> {
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable, Consumer<Throwable>, LambdaConsumerIntrospection {


private static final long serialVersionUID = -4361286194466301354L;
Expand Down Expand Up @@ -82,4 +84,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasCustomOnError() {
return onError != this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class ConsumerSingleObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable {
implements SingleObserver<T>, Disposable, LambdaConsumerIntrospection {


private static final long serialVersionUID = -7012088219455310787L;
Expand Down Expand Up @@ -74,4 +76,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class EmptyCompletableObserver
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable {
implements CompletableObserver, Disposable, LambdaConsumerIntrospection {


private static final long serialVersionUID = -7545121636549663526L;
Expand Down Expand Up @@ -55,4 +56,8 @@ public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public boolean hasCustomOnError() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
Expand Down Expand Up @@ -101,4 +104,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand All @@ -29,7 +31,7 @@
*/
public final class MaybeCallbackObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {
implements MaybeObserver<T>, Disposable, LambdaConsumerIntrospection {


private static final long serialVersionUID = -6076952298809384986L;
Expand Down Expand Up @@ -96,5 +98,8 @@ public void onComplete() {
}
}


@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.LambdaConsumerIntrospection;
import org.reactivestreams.Subscription;

import io.reactivex.FlowableSubscriber;
Expand All @@ -24,7 +26,8 @@
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaSubscriber<T> extends AtomicReference<Subscription> implements FlowableSubscriber<T>, Subscription, Disposable {
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
Expand Down Expand Up @@ -115,4 +118,9 @@ public void request(long n) {
public void cancel() {
SubscriptionHelper.cancel(this);
}

@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.reactivex.observers;

import io.reactivex.annotations.Experimental;

/**
* An interface that indicates that the implementing type is composed of individual components and exposes information
* about their behavior.
*
* <p><em>NOTE:</em> This is considered a read-only public API and is not intended to be implemented externally.
*
* @since 2.1.4 - experimental
*/
@Experimental
public interface LambdaConsumerIntrospection {

/**
* @return {@code true} if a custom onError consumer implementation was supplied. Returns {@code false} if the
* implementation is missing an error consumer and thus using a throwing default implementation.
*/
@Experimental
boolean hasCustomOnError();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.reactivex.internal.observers;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import static org.junit.Assert.*;

public final class CallbackCompletableObserverTest {

@Test
public void emptyActionShouldReportNoCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);

assertTrue(o.hasCustomOnError());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.reactivex.internal.observers;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import static org.junit.Assert.*;

public final class ConsumerSingleObserverTest {

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer());

assertTrue(o.hasCustomOnError());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.reactivex.internal.observers;

import org.junit.Test;

import static org.junit.Assert.assertFalse;

public final class EmptyCompletableObserverTest {

@Test
public void defaultShouldReportNoCustomOnError() {
EmptyCompletableObserver o = new EmptyCompletableObserver();

assertFalse(o.hasCustomOnError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.*;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import io.reactivex.Observable;
Expand Down Expand Up @@ -342,4 +343,24 @@ public void accept(Disposable s) throws Exception {

assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
LambdaObserver<Integer> o = new LambdaObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
Functions.<Disposable>emptyConsumer());

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
LambdaObserver<Integer> o = new LambdaObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
Functions.<Disposable>emptyConsumer());

assertTrue(o.hasCustomOnError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@

package io.reactivex.internal.operators.maybe;

import static org.junit.Assert.*;

import java.util.List;

import org.junit.Test;

import io.reactivex.TestHelper;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.*;

public class MaybeCallbackObserverTest {

Expand Down Expand Up @@ -121,4 +120,22 @@ public void run() throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);

assertTrue(o.hasCustomOnError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@

package io.reactivex.internal.subscribers;

import static org.junit.Assert.*;

import java.util.*;

import org.junit.Test;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.flowable.FlowableInternalHelper;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import org.junit.Test;
import org.reactivestreams.*;

import java.util.*;

import static org.junit.Assert.*;

public class LambdaSubscriberTest {

Expand Down Expand Up @@ -347,4 +348,24 @@ public void accept(Subscription s) throws Exception {

assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void onErrorMissingShouldReportNoCustomOnError() {
LambdaSubscriber<Integer> o = new LambdaSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);

assertFalse(o.hasCustomOnError());
}

@Test
public void customOnErrorShouldReportCustomOnError() {
LambdaSubscriber<Integer> o = new LambdaSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);

assertTrue(o.hasCustomOnError());
}
}

0 comments on commit 99a620a

Please sign in to comment.