Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: fix observeOn resource handling, add delayError capability #3682

Merged
merged 1 commit into from
Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5999,7 +5999,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer.
* asynchronously with a bounded buffer.
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
Expand All @@ -6014,12 +6016,43 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler, boolean)
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
return lift(new OperatorObserveOn<T>(scheduler, false));
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param delayError
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
* from upstream
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError));
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,9 @@ public final Single<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousSingle) {
return ((ScalarSynchronousSingle<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
// Note that since Single emits onSuccess xor onError,
// there is no cut-ahead possible like with regular Observable sequences.
return lift(new OperatorObserveOn<T>(scheduler, false));
}

/**
Expand Down
Loading