-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix the issue that GroupBy may not call 'unsubscribe' #1967
Fix the issue that GroupBy may not call 'unsubscribe' #1967
Conversation
Copy @akarnokd 's comment from #1959: The onError seems to be simplistic. I'd expect an onError comes down, the whole contraption should be shut down and both main and groups be notified of the error. Such termination happens in onCompleted. In addition, since we use BufferUntilSubscriber, its client may throw in its onNext while replaying (no idea where it will go) or in the direct-mode phase (where it will bubble back to the emitItem) and again not tearing down anything. Now if the main was observed through unsafeSubscribe, we can't know if the downstream will eventually unsubscribe upwards or not. In addition, if a group's onNext throws, does it need to tear down everything at all or just that specific group just like the group's unsubscribe()? |
Since |
9666e56
to
272a752
Compare
if (wip <= 0) { | ||
return null; | ||
} | ||
if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for making the group emission thread-safe with a concurrent unsubscribe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can take a look at the original discussion in #1959
…itted`; Add more comments.
unsubscribe(); | ||
} | ||
// if we have no outstanding groups (all completed or unsubscribe) and terminated on outer | ||
if (groups.isEmpty() && terminated == TERMINATED_WITH_COMPLETED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be an if/else
? If we just unsubscribed
in the previous lines I don't think we should ever go through this flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
Updated. |
I think this is now good. @akarnokd can you also review this again please? |
It is okay. |
Thanks for the review. |
Fix the issue that GroupBy may not call 'unsubscribe'
No description provided.