-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Window operators now support backpressure in the inner observable. #3150
Window operators now support backpressure in the inner observable. #3150
Conversation
d4bc4ef
to
f0aec07
Compare
Do we need to maintain both BufferUntilSubscriber V1 and V2 at the same time? |
|
} | ||
|
||
synchronized (this) { | ||
if (!missed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why this is necessary? This could only happen if you are the emitting thread and no other thread has tried to emit while you are emitting and the only changes in this if block seem unrelated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no other thread tried to do something, we quit the emission loop. This means other actions can now enter the emission loop. This is part of the standard emission-loop approach with an optimization for unbounded consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I see that. This condition is tracked above on line 230 using the emitting
field. I am unclear on why we have a missed
field. It seems like missed
could be a local variable or deleted entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method may be entered from multiple threads and only one can enter the emission loop. The others need to signal their attempt which can't happen if missed
is a local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you are saying makes perfect sense and I can see that from the code. I am asking why does this if statement guard lines 280-284? The fact that another thread did not try to emit while your thread was emitting doesn't seem relevant for these lines. It seems like this block should always run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That if block ends in return which shouldn't run if there was missed activity in which case the the loop has to try and emit more values if possible, again. That missed may come from a new value becoming available or from more requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You are saying "if there have not been additional requests while emitting then we should return". At first this seemed like a race condition however I see that the onNext would just hop threads to the other thread in that case. Do you think it would give us better performance and possible read easier by taking out the synchronized block and the missed
field and replacing it with a check after each for loop iteration if (get() > 0)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This form of caughtUp optimiziation requires an atomic block which wouldn't work with the queue-drain approach. Since this
represents the requested amount, having a non-zero requested doesn't mean there is values available in the queue, therefore it would just loop indefinitely. The performance is relative to the use pattern. Mostly single-threaded use benefits from synchronized because the JIT can eliminate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Thanks for the explanation.
It seems like Also since groupBy's back pressure is being patched maybe it'd be a good idea to revisit this pull request with those changes to try to combine the two |
These are internal classes and can live in their own files. The groupBy in #3428 no longer uses it and once merged, this I don't think we should be so eager to combine things; such combinations end up in monstrous classes which are then sources of misunderstanding and complexity. |
|
I'm proposing to hide away internal functionality from the public API. |
Neither of these are public API since they reside in the internal package. Hiding them further away seems to be unnecessary. In addition, Historically, Due to this very specific hijacking, adding backpressure to |
All I'm asking for is a rename from v2 in 1.x |
Maybe I missed it but what's the name you'd like? |
f0aec07
to
f351ca6
Compare
I've renamed the class to |
I'm going to sleep now (it's midnight here) so if you only have concerns about naming, location and visibility, I suggest merging this PR then posting a separate PR with your changes (and merge it if you can get somebody else to like it in the meantime). |
@akarnokd would you mind rebasing this? |
f351ca6
to
e30a333
Compare
Done. |
👍 |
…ureV2 Window operators now support backpressure in the inner observable.
Repost of #3050.