Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: add Completable.safeSubscribe option + RxJavaPlugins hook support #3942

Merged
merged 8 commits into from
Jun 1, 2016
149 changes: 104 additions & 45 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.functions.*;
import rx.internal.operators.*;
import rx.internal.util.*;
import rx.observers.SafeCompletableSubscriber;
import rx.plugins.*;
import rx.schedulers.Schedulers;
import rx.subscriptions.*;
Expand All @@ -37,6 +38,12 @@
*/
@Experimental
public class Completable {
/** The error handler instance. */
static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler();

/** The completable hook. */
static RxJavaCompletableExecutionHook HOOK = RxJavaPlugins.getInstance().getCompletableExecutionHook();

/**
* Callback used for building deferred computations that takes a CompletableSubscriber.
*/
Expand Down Expand Up @@ -100,9 +107,6 @@ public void call(CompletableSubscriber s) {
}
});

/** The error handler instance. */
static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler();

/**
* Returns a Completable which terminates as soon as one of the source Completables
* terminates (normally or with an error) and cancels all other Completables.
Expand Down Expand Up @@ -172,7 +176,7 @@ public void onSubscribe(Subscription d) {
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
c.unsafeSubscribe(inner);
}
}
});
Expand Down Expand Up @@ -300,7 +304,7 @@ public void onSubscribe(Subscription d) {
}

// no need to have separate subscribers because inner is stateless
c.subscribe(inner);
c.unsafeSubscribe(inner);
}
}
});
Expand Down Expand Up @@ -415,7 +419,7 @@ public void call(CompletableSubscriber s) {
return;
}

c.subscribe(s);
c.unsafeSubscribe(s);
}
});
}
Expand Down Expand Up @@ -898,7 +902,7 @@ public void call(final CompletableSubscriber s) {

final AtomicBoolean once = new AtomicBoolean();

cs.subscribe(new CompletableSubscriber() {
cs.unsafeSubscribe(new CompletableSubscriber() {
Subscription d;
void dispose() {
d.unsubscribe();
Expand Down Expand Up @@ -974,7 +978,7 @@ public void call() {
* not null (not verified)
*/
protected Completable(CompletableOnSubscribe onSubscribe) {
this.onSubscribe = onSubscribe;
this.onSubscribe = HOOK.onCreate(onSubscribe);
}

/**
Expand All @@ -998,7 +1002,7 @@ public final void await() {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] err = new Throwable[1];

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1049,7 +1053,7 @@ public final boolean await(long timeout, TimeUnit unit) {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] err = new Throwable[1];

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1189,7 +1193,7 @@ public void call(final CompletableSubscriber s) {
final Scheduler.Worker w = scheduler.createWorker();
set.add(w);

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {


@Override
Expand Down Expand Up @@ -1301,7 +1305,7 @@ protected final Completable doOnLifecycle(
return create(new CompletableOnSubscribe() {
@Override
public void call(final CompletableSubscriber s) {
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1433,7 +1437,7 @@ public final Throwable get() {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] err = new Throwable[1];

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1477,7 +1481,7 @@ public final Throwable get(long timeout, TimeUnit unit) {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] err = new Throwable[1];

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1525,11 +1529,10 @@ public final Completable lift(final CompletableOperator onLift) {
@Override
public void call(CompletableSubscriber s) {
try {
// TODO plugin wrapping

CompletableSubscriber sw = onLift.call(s);
CompletableOperator onLiftDecorated = HOOK.onLift(onLift);
CompletableSubscriber sw = onLiftDecorated.call(s);

subscribe(sw);
unsafeSubscribe(sw);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Expand Down Expand Up @@ -1570,7 +1573,7 @@ public void call(final CompletableSubscriber s) {

s.onSubscribe(ad);

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1632,7 +1635,7 @@ public final Completable onErrorComplete(final Func1<? super Throwable, Boolean>
return create(new CompletableOnSubscribe() {
@Override
public void call(final CompletableSubscriber s) {
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1681,7 +1684,7 @@ public final Completable onErrorResumeNext(final Func1<? super Throwable, ? exte
@Override
public void call(final CompletableSubscriber s) {
final SerialSubscription sd = new SerialSubscription();
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand All @@ -1707,7 +1710,7 @@ public void onError(Throwable e) {
return;
}

c.subscribe(new CompletableSubscriber() {
c.unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -1843,7 +1846,7 @@ public final <T> Observable<T> startWith(Observable<T> other) {
*/
public final Subscription subscribe() {
final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription();
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SafeCompletableSubscriber is not used here nor the others subscribe methods, except for subscribe(Subscriber). I think subscribe methods should call between themselves (as in Observable#subscribe methods) to refactor code and avoid forgetting this kind of things.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafeSubscribe should call between themselves too.

Copy link

@bryant1410 bryant1410 May 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should call subscribe now instead, doesn't it? And the others subscribe methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the rest are intermediate operators or already perform custom logic in addition to reporting the error to a plugin. Plus, this particulare use case, the subscriber is not expected to crash.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the idea of the safe subscribe is to ensure the caller that the contract will be met. Why in this particular case it won't?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But SafeSubscriber in the first line of the javadocs says:

SafeSubscriber is a wrapper around Subscriber that ensures that the Subscriber complies with the Observable contract.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. However, this method doesn't run any user supplied code and thus no need to wrap it into a safe completable subscriber.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. However, the other 2 subscribe methods that don't use SafeCompletableSubscriber should do it. To assert that the Action instances are called at most once, and not both, complying with the contract.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the safeguard to those methods.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Why not reusing SafeCompletableSubscriber?

@Override
public void onCompleted() {
mad.unsubscribe();
Expand Down Expand Up @@ -1876,7 +1879,7 @@ public final Subscription subscribe(final Action0 onComplete) {
requireNonNull(onComplete);

final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription();
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onCompleted() {
try {
Expand Down Expand Up @@ -1918,7 +1921,7 @@ public final Subscription subscribe(final Action1<? super Throwable> onError, fi
requireNonNull(onComplete);

final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription();
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onCompleted() {
try {
Expand Down Expand Up @@ -1962,56 +1965,112 @@ private static void deliverUncaughtException(Throwable e) {
* @param s the CompletableSubscriber, not null
* @throws NullPointerException if s is null
*/
public final void subscribe(CompletableSubscriber s) {
public final void unsafeSubscribe(CompletableSubscriber s) {
requireNonNull(s);
try {
// TODO plugin wrapping the subscriber
CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe);

onSubscribe.call(s);
onSubscribeDecorated.call(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
ex = HOOK.onSubscribeError(ex);
ERROR_HANDLER.handleError(ex);
Copy link

@bryant1410 bryant1410 May 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafeSubscribe shouldn't call the ERROR_HANDLER.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? That exception has to be reported otherwise it could get lost.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see from the javadocs of Observable#unsafeSubscribe (and from the code) that it should subscribe to an Observable and call OnSubscribe without error handling.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't get lost as it's supposed to be called in nested subscriptions, like when implementing operators.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In observable, if the subscription fails, the subscribe() tries to call onNext on the subscriber. If the subscriber is unsubscribed, the original exception is sent to the plugin handler. If not but crashes further, a custom fatal exception is thrown. In Completable, we can't know if the completable subscriber is ready to receive the exception or not, thus the safe bet is to call the plugin handler and rethrow as NullPointerException. If nested, NPE is thrown as is further up without calling the handler again.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this should get compared to unsafeSubscribe, not subscribe, doesn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observable is kind of in an inconsistent state in this regard, really could use a cleanup. The only difference should be that the incoming subscriber is or isn't wrapped. Crash management should be the same.

Copy link

@bryant1410 bryant1410 May 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, ok. I also see that it's supposed to not call the hook, but it does. However, couldn't be the case that multiple calls to the error handler are made for the same error (e.g., due to operators)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if the exception is fatal, it skips the hook. Once the hook has been called, the exception is wrapped into a NullPointerException and thrown up, but then there is a catch (NullPointerException) to prevent it from calling the hook again.

throw toNpe(ex);
}
}

/**
* Subscribes the given CompletableSubscriber to this Completable instance
* and handles exceptions thrown by its onXXX methods.
* @param s the CompletableSubscriber, not null
* @throws NullPointerException if s is null
*/
public final void subscribe(CompletableSubscriber s) {
requireNonNull(s);
try {
CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe);

onSubscribeDecorated.call(new SafeCompletableSubscriber(s));
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
ex = HOOK.onSubscribeError(ex);
ERROR_HANDLER.handleError(ex);
throw toNpe(ex);
}
}

/**
* Subscribes a reactive-streams Subscriber to this Completable instance which
* Subscribes a regular Subscriber to this Completable instance which
* will receive only an onError or onComplete event.
* @param s the reactive-streams Subscriber, not null
* @throws NullPointerException if s is null
*/
public final <T> void subscribe(Subscriber<T> s) {
public final <T> void unsafeSubscribe(final Subscriber<T> s) {
requireNonNull(s);
try {
final Subscriber<?> sw = s; // FIXME hooking in 1.x is kind of strange to me

if (sw == null) {
throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber");
}

subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onCompleted() {
sw.onCompleted();
s.onCompleted();
}

@Override
public void onError(Throwable e) {
sw.onError(e);
s.onError(e);
}

@Override
public void onSubscribe(Subscription d) {
sw.add(d);
s.add(d);
}
});

RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the Completable hook be used here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method deals with Subscriber.

} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
ex = HOOK.onSubscribeError(ex);
ERROR_HANDLER.handleError(ex);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafeSubscribe shouldn't call the ERROR_HANDLER.

throw toNpe(ex);
}
}

/**
* Subscribes a regular Subscriber to this Completable instance which
* will receive only an onError or onComplete event
* and handles exceptions thrown by its onXXX methods.
* @param s the reactive-streams Subscriber, not null
* @throws NullPointerException if s is null
*/
public final <T> void subscribe(final Subscriber<T> s) {
requireNonNull(s);
try {
unsafeSubscribe(new SafeCompletableSubscriber(new CompletableSubscriber() {
@Override
public void onCompleted() {
s.onCompleted();
}

@Override
public void onError(Throwable e) {
s.onError(e);
}

@Override
public void onSubscribe(Subscription d) {
s.add(d);
}
}));
RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
ex = HOOK.onSubscribeError(ex);
ERROR_HANDLER.handleError(ex);
throw toNpe(ex);
}
Expand All @@ -2038,7 +2097,7 @@ public void call(final CompletableSubscriber s) {
@Override
public void call() {
try {
subscribe(s);
unsafeSubscribe(s);
} finally {
w.unsubscribe();
}
Expand Down Expand Up @@ -2141,7 +2200,7 @@ public final <T> Observable<T> toObservable() {
return Observable.create(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
subscribe(s);
unsafeSubscribe(s);
}
});
}
Expand All @@ -2158,7 +2217,7 @@ public final <T> Single<T> toSingle(final Func0<? extends T> completionValueFunc
return Single.create(new rx.Single.OnSubscribe<T>() {
@Override
public void call(final SingleSubscriber<? super T> s) {
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -2222,7 +2281,7 @@ public final Completable unsubscribeOn(final Scheduler scheduler) {
return create(new CompletableOnSubscribe() {
@Override
public void call(final CompletableSubscriber s) {
subscribe(new CompletableSubscriber() {
unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onCompleted() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ public void onSubscribe(Subscription d) {
serial.add(main);
child.add(serial);

other.subscribe(so);
other.unsafeSubscribe(so);

return main;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void next() {
return;
}

c.subscribe(inner);
c.unsafeSubscribe(inner);
}

final class ConcatInnerSubscriber implements CompletableSubscriber {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void next() {
return;
}

a[idx].subscribe(this);
a[idx].unsafeSubscribe(this);
} while (decrementAndGet() != 0);
}
}
Expand Down
Loading