-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: add Completable.safeSubscribe option + RxJavaPlugins hook support #3942
Changes from 4 commits
fcb201a
9469e9a
98c0315
159f868
aa7d82f
ebb38eb
f3958da
1c1f37d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import rx.functions.*; | ||
import rx.internal.operators.*; | ||
import rx.internal.util.*; | ||
import rx.observers.SafeCompletableSubscriber; | ||
import rx.plugins.*; | ||
import rx.schedulers.Schedulers; | ||
import rx.subscriptions.*; | ||
|
@@ -37,6 +38,12 @@ | |
*/ | ||
@Experimental | ||
public class Completable { | ||
/** The error handler instance. */ | ||
static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler(); | ||
|
||
/** The completable hook. */ | ||
static RxJavaCompletableExecutionHook HOOK = RxJavaPlugins.getInstance().getCompletableExecutionHook(); | ||
|
||
/** | ||
* Callback used for building deferred computations that takes a CompletableSubscriber. | ||
*/ | ||
|
@@ -100,9 +107,6 @@ public void call(CompletableSubscriber s) { | |
} | ||
}); | ||
|
||
/** The error handler instance. */ | ||
static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler(); | ||
|
||
/** | ||
* Returns a Completable which terminates as soon as one of the source Completables | ||
* terminates (normally or with an error) and cancels all other Completables. | ||
|
@@ -172,7 +176,7 @@ public void onSubscribe(Subscription d) { | |
} | ||
|
||
// no need to have separate subscribers because inner is stateless | ||
c.subscribe(inner); | ||
c.unsafeSubscribe(inner); | ||
} | ||
} | ||
}); | ||
|
@@ -300,7 +304,7 @@ public void onSubscribe(Subscription d) { | |
} | ||
|
||
// no need to have separate subscribers because inner is stateless | ||
c.subscribe(inner); | ||
c.unsafeSubscribe(inner); | ||
} | ||
} | ||
}); | ||
|
@@ -415,7 +419,7 @@ public void call(CompletableSubscriber s) { | |
return; | ||
} | ||
|
||
c.subscribe(s); | ||
c.unsafeSubscribe(s); | ||
} | ||
}); | ||
} | ||
|
@@ -898,7 +902,7 @@ public void call(final CompletableSubscriber s) { | |
|
||
final AtomicBoolean once = new AtomicBoolean(); | ||
|
||
cs.subscribe(new CompletableSubscriber() { | ||
cs.unsafeSubscribe(new CompletableSubscriber() { | ||
Subscription d; | ||
void dispose() { | ||
d.unsubscribe(); | ||
|
@@ -974,7 +978,7 @@ public void call() { | |
* not null (not verified) | ||
*/ | ||
protected Completable(CompletableOnSubscribe onSubscribe) { | ||
this.onSubscribe = onSubscribe; | ||
this.onSubscribe = HOOK.onCreate(onSubscribe); | ||
} | ||
|
||
/** | ||
|
@@ -998,7 +1002,7 @@ public final void await() { | |
final CountDownLatch cdl = new CountDownLatch(1); | ||
final Throwable[] err = new Throwable[1]; | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1049,7 +1053,7 @@ public final boolean await(long timeout, TimeUnit unit) { | |
final CountDownLatch cdl = new CountDownLatch(1); | ||
final Throwable[] err = new Throwable[1]; | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1189,7 +1193,7 @@ public void call(final CompletableSubscriber s) { | |
final Scheduler.Worker w = scheduler.createWorker(); | ||
set.add(w); | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
|
||
@Override | ||
|
@@ -1301,7 +1305,7 @@ protected final Completable doOnLifecycle( | |
return create(new CompletableOnSubscribe() { | ||
@Override | ||
public void call(final CompletableSubscriber s) { | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1433,7 +1437,7 @@ public final Throwable get() { | |
final CountDownLatch cdl = new CountDownLatch(1); | ||
final Throwable[] err = new Throwable[1]; | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1477,7 +1481,7 @@ public final Throwable get(long timeout, TimeUnit unit) { | |
final CountDownLatch cdl = new CountDownLatch(1); | ||
final Throwable[] err = new Throwable[1]; | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1525,11 +1529,10 @@ public final Completable lift(final CompletableOperator onLift) { | |
@Override | ||
public void call(CompletableSubscriber s) { | ||
try { | ||
// TODO plugin wrapping | ||
|
||
CompletableSubscriber sw = onLift.call(s); | ||
CompletableOperator onLiftDecorated = HOOK.onLift(onLift); | ||
CompletableSubscriber sw = onLiftDecorated.call(s); | ||
|
||
subscribe(sw); | ||
unsafeSubscribe(sw); | ||
} catch (NullPointerException ex) { | ||
throw ex; | ||
} catch (Throwable ex) { | ||
|
@@ -1570,7 +1573,7 @@ public void call(final CompletableSubscriber s) { | |
|
||
s.onSubscribe(ad); | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1632,7 +1635,7 @@ public final Completable onErrorComplete(final Func1<? super Throwable, Boolean> | |
return create(new CompletableOnSubscribe() { | ||
@Override | ||
public void call(final CompletableSubscriber s) { | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1681,7 +1684,7 @@ public final Completable onErrorResumeNext(final Func1<? super Throwable, ? exte | |
@Override | ||
public void call(final CompletableSubscriber s) { | ||
final SerialSubscription sd = new SerialSubscription(); | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1707,7 +1710,7 @@ public void onError(Throwable e) { | |
return; | ||
} | ||
|
||
c.subscribe(new CompletableSubscriber() { | ||
c.unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -1843,7 +1846,7 @@ public final <T> Observable<T> startWith(Observable<T> other) { | |
*/ | ||
public final Subscription subscribe() { | ||
final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
@Override | ||
public void onCompleted() { | ||
mad.unsubscribe(); | ||
|
@@ -1876,7 +1879,7 @@ public final Subscription subscribe(final Action0 onComplete) { | |
requireNonNull(onComplete); | ||
|
||
final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
@Override | ||
public void onCompleted() { | ||
try { | ||
|
@@ -1918,7 +1921,7 @@ public final Subscription subscribe(final Action1<? super Throwable> onError, fi | |
requireNonNull(onComplete); | ||
|
||
final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
@Override | ||
public void onCompleted() { | ||
try { | ||
|
@@ -1962,56 +1965,112 @@ private static void deliverUncaughtException(Throwable e) { | |
* @param s the CompletableSubscriber, not null | ||
* @throws NullPointerException if s is null | ||
*/ | ||
public final void subscribe(CompletableSubscriber s) { | ||
public final void unsafeSubscribe(CompletableSubscriber s) { | ||
requireNonNull(s); | ||
try { | ||
// TODO plugin wrapping the subscriber | ||
CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe); | ||
|
||
onSubscribe.call(s); | ||
onSubscribeDecorated.call(s); | ||
} catch (NullPointerException ex) { | ||
throw ex; | ||
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
ex = HOOK.onSubscribeError(ex); | ||
ERROR_HANDLER.handleError(ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not? That exception has to be reported otherwise it could get lost. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see from the javadocs of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It won't get lost as it's supposed to be called in nested subscriptions, like when implementing operators. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In observable, if the subscription fails, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But this should get compared to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Observable is kind of in an inconsistent state in this regard, really could use a cleanup. The only difference should be that the incoming subscriber is or isn't wrapped. Crash management should be the same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, ok. I also see that it's supposed to not call the hook, but it does. However, couldn't be the case that multiple calls to the error handler are made for the same error (e.g., due to operators)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, if the exception is fatal, it skips the hook. Once the hook has been called, the exception is wrapped into a NullPointerException and thrown up, but then there is a catch (NullPointerException) to prevent it from calling the hook again. |
||
throw toNpe(ex); | ||
} | ||
} | ||
|
||
/** | ||
* Subscribes the given CompletableSubscriber to this Completable instance | ||
* and handles exceptions thrown by its onXXX methods. | ||
* @param s the CompletableSubscriber, not null | ||
* @throws NullPointerException if s is null | ||
*/ | ||
public final void subscribe(CompletableSubscriber s) { | ||
requireNonNull(s); | ||
try { | ||
CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe); | ||
|
||
onSubscribeDecorated.call(new SafeCompletableSubscriber(s)); | ||
} catch (NullPointerException ex) { | ||
throw ex; | ||
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
ex = HOOK.onSubscribeError(ex); | ||
ERROR_HANDLER.handleError(ex); | ||
throw toNpe(ex); | ||
} | ||
} | ||
|
||
/** | ||
* Subscribes a reactive-streams Subscriber to this Completable instance which | ||
* Subscribes a regular Subscriber to this Completable instance which | ||
* will receive only an onError or onComplete event. | ||
* @param s the reactive-streams Subscriber, not null | ||
* @throws NullPointerException if s is null | ||
*/ | ||
public final <T> void subscribe(Subscriber<T> s) { | ||
public final <T> void unsafeSubscribe(final Subscriber<T> s) { | ||
requireNonNull(s); | ||
try { | ||
final Subscriber<?> sw = s; // FIXME hooking in 1.x is kind of strange to me | ||
|
||
if (sw == null) { | ||
throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber"); | ||
} | ||
|
||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
@Override | ||
public void onCompleted() { | ||
sw.onCompleted(); | ||
s.onCompleted(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
sw.onError(e); | ||
s.onError(e); | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription d) { | ||
sw.add(d); | ||
s.add(d); | ||
} | ||
}); | ||
|
||
RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method deals with |
||
} catch (NullPointerException ex) { | ||
throw ex; | ||
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
ex = HOOK.onSubscribeError(ex); | ||
ERROR_HANDLER.handleError(ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
throw toNpe(ex); | ||
} | ||
} | ||
|
||
/** | ||
* Subscribes a regular Subscriber to this Completable instance which | ||
* will receive only an onError or onComplete event | ||
* and handles exceptions thrown by its onXXX methods. | ||
* @param s the reactive-streams Subscriber, not null | ||
* @throws NullPointerException if s is null | ||
*/ | ||
public final <T> void subscribe(final Subscriber<T> s) { | ||
requireNonNull(s); | ||
try { | ||
unsafeSubscribe(new SafeCompletableSubscriber(new CompletableSubscriber() { | ||
@Override | ||
public void onCompleted() { | ||
s.onCompleted(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
s.onError(e); | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription d) { | ||
s.add(d); | ||
} | ||
})); | ||
RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s); | ||
} catch (NullPointerException ex) { | ||
throw ex; | ||
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
ex = HOOK.onSubscribeError(ex); | ||
ERROR_HANDLER.handleError(ex); | ||
throw toNpe(ex); | ||
} | ||
|
@@ -2038,7 +2097,7 @@ public void call(final CompletableSubscriber s) { | |
@Override | ||
public void call() { | ||
try { | ||
subscribe(s); | ||
unsafeSubscribe(s); | ||
} finally { | ||
w.unsubscribe(); | ||
} | ||
|
@@ -2141,7 +2200,7 @@ public final <T> Observable<T> toObservable() { | |
return Observable.create(new OnSubscribe<T>() { | ||
@Override | ||
public void call(Subscriber<? super T> s) { | ||
subscribe(s); | ||
unsafeSubscribe(s); | ||
} | ||
}); | ||
} | ||
|
@@ -2158,7 +2217,7 @@ public final <T> Single<T> toSingle(final Func0<? extends T> completionValueFunc | |
return Single.create(new rx.Single.OnSubscribe<T>() { | ||
@Override | ||
public void call(final SingleSubscriber<? super T> s) { | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
@@ -2222,7 +2281,7 @@ public final Completable unsubscribeOn(final Scheduler scheduler) { | |
return create(new CompletableOnSubscribe() { | ||
@Override | ||
public void call(final CompletableSubscriber s) { | ||
subscribe(new CompletableSubscriber() { | ||
unsafeSubscribe(new CompletableSubscriber() { | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SafeCompletableSubscriber
is not used here nor the otherssubscribe
methods, except forsubscribe(Subscriber)
. I thinksubscribe
methods should call between themselves (as inObservable#subscribe
methods) to refactor code and avoid forgetting this kind of things.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsafeSubscribe
should call between themselves too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should call
subscribe
now instead, doesn't it? And the otherssubscribe
methods.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the rest are intermediate operators or already perform custom logic in addition to reporting the error to a plugin. Plus, this particulare use case, the subscriber is not expected to crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the idea of the safe subscribe is to ensure the caller that the contract will be met. Why in this particular case it won't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But
SafeSubscriber
in the first line of the javadocs says:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. However, this method doesn't run any user supplied code and thus no need to wrap it into a safe completable subscriber.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. However, the other 2
subscribe
methods that don't useSafeCompletableSubscriber
should do it. To assert that theAction
instances are called at most once, and not both, complying with the contract.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the safeguard to those methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Why not reusing
SafeCompletableSubscriber
?