Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserveOn scheduled unsubscription #1264

Merged

Conversation

akarnokd
Copy link
Member

Proposed fix for #1241.

Some operators eagerly unsubscribe the chain in case of an error, however, this may prevent the delivery of the onError if it is run through the observeOn operator as the worker gets unsubscribed and may or may not reach the point where it emits the onError in the other thread. This change will schedule the unsubscription along with the event delivery and thus ensure that all events sent to observeOn prior to unsubscription will be delivered in the new thread.

Note that this change makes the unsubscription delayed and if event processing is slow in the new thread, it may take a while to happen; the original version just dropped any unprocessed work and thus was faster to terminate.

@cloudbees-pull-request-builder

RxJava-pull-requests #1159 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

I'm not sure this conforms with the guidelines for unsubscribe or not. /cc @headinthebox

Note guideline 4.4, in particular the last bold line:

4.4. Assume a best effort to stop all outstanding work on Unsubscribe
When unsubscribe is called on an observable subscription, the observable sequence will make a best effort attempt to stop all outstanding work. This means that any queued work that has not been started will not start.

@akarnokd
Copy link
Member Author

Best effort in this case means that the observation of the error is matter of chance. This affects MergeDelayError as well, so you'd only see a broken chain but not the reason for the failure nor a completion event.

@headinthebox
Copy link
Contributor

This is a though one. My reading is that the rul applies to user code subscriptions.

If in user code you write val s = source.subscribe(...) and then call s.unsubscribe() you are not going to wait for outstanding work.

However, when inside source there is an because of an error, I think it is reasonable to expect that the onError bubbles up to the outside subscriber and does not get dropped because of internal unsubscriptions.

As @akarnokd mentions, otherwise onError becomes a game a chance.

@benjchristensen
Copy link
Member

So do you recommend merging this change @headinthebox ? This is a real gray area and not fully covered by specifications.

Should we make it drop all onNext (skip/filter them) but iterate through the entire queue to allow for an onError if it is queued for delivery?

@headinthebox
Copy link
Contributor

It is a gray area indeed .... I'd say just let the queue drain but don't add new notifications (otherwise xs.observeOn(s).subscribe(...) will behave very different from xs.subscribe(...) when errors happen).

@benjchristensen
Copy link
Member

Then I'm merging this.

benjchristensen added a commit that referenced this pull request May 28, 2014
@benjchristensen benjchristensen merged commit dd52daf into ReactiveX:master May 28, 2014
@akarnokd akarnokd deleted the ObserveOnScheduleUnsubscribe branch May 28, 2014 20:39
@benjchristensen benjchristensen mentioned this pull request Jun 1, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants