Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window operators now support backpressure in the inner observable. #3150

Merged

Conversation

akarnokd
Copy link
Member

Repost of #3050.

@abersnaze
Copy link
Contributor

Do we need to maintain both BufferUntilSubscriber V1 and V2 at the same time?

@akarnokd
Copy link
Member Author

akarnokd commented Oct 8, 2015

BufferUntilSubscriber does some black backpressure magic inside GroupBy which I wasn't able to figure out and the V2 doesn't pass unit tests with it.

@abersnaze abersnaze mentioned this pull request Oct 8, 2015
6 tasks
}

synchronized (this) {
if (!missed) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why this is necessary? This could only happen if you are the emitting thread and no other thread has tried to emit while you are emitting and the only changes in this if block seem unrelated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no other thread tried to do something, we quit the emission loop. This means other actions can now enter the emission loop. This is part of the standard emission-loop approach with an optimization for unbounded consumers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I see that. This condition is tracked above on line 230 using the emitting field. I am unclear on why we have a missed field. It seems like missed could be a local variable or deleted entirely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method may be entered from multiple threads and only one can enter the emission loop. The others need to signal their attempt which can't happen if missed is a local variable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are saying makes perfect sense and I can see that from the code. I am asking why does this if statement guard lines 280-284? The fact that another thread did not try to emit while your thread was emitting doesn't seem relevant for these lines. It seems like this block should always run.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That if block ends in return which shouldn't run if there was missed activity in which case the the loop has to try and emit more values if possible, again. That missed may come from a new value becoming available or from more requests.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You are saying "if there have not been additional requests while emitting then we should return". At first this seemed like a race condition however I see that the onNext would just hop threads to the other thread in that case. Do you think it would give us better performance and possible read easier by taking out the synchronized block and the missed field and replacing it with a check after each for loop iteration if (get() > 0)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This form of caughtUp optimiziation requires an atomic block which wouldn't work with the queue-drain approach. Since this represents the requested amount, having a non-zero requested doesn't mean there is values available in the queue, therefore it would just loop indefinitely. The performance is relative to the use pattern. Mostly single-threaded use benefits from synchronized because the JIT can eliminate it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Thanks for the explanation.

@stealthcode
Copy link

It seems like BufferUntilSubscriber is being used as a special case buffer for OperatorGroupBy while BufferUntilSubscriberV2 is custom for the windowing operators. If they cannot be reconciled to work for both cases then could you please move them into the operators as static nested classes?

Also since groupBy's back pressure is being patched maybe it'd be a good idea to revisit this pull request with those changes to try to combine the two BufferUntilSubscribers.

@akarnokd
Copy link
Member Author

These are internal classes and can live in their own files. The groupBy in #3428 no longer uses it and once merged, this BufferUntilSubscriberV2 can be renamed; or better yet, promoted to a standard API UnicastSubject.

I don't think we should be so eager to combine things; such combinations end up in monstrous classes which are then sources of misunderstanding and complexity.

@stealthcode
Copy link

BufferUntilSubscriber was a proposed as a solution to the time gap problem in group by so because BufferUntilSubscriberV2 doesn't work to solve this problem for group by leads me to question it's naming. Is it built to handle the same problem or a different problem? Are you proposing to rename v2 to UnicastSubject?

@stealthcode
Copy link

I don't think we should be so eager to combine things; such combinations end up in monstrous classes which are then sources of misunderstanding and complexity.

I'm proposing to hide away internal functionality from the public API.

@akarnokd
Copy link
Member Author

Neither of these are public API since they reside in the internal package. Hiding them further away seems to be unnecessary. In addition, BufferUntilSubscriberV2 is effectively turned into the official UnicastSubject in 2.x.

Historically, BufferUntilSubscriber started out to solve the time-gap problem for the non-backpressured groupBy and window. When the backpressure was introduced, groupBy had to support backpressure but BufferUntilSubscriber wasn't enhanced. Instead, BufferUntilSubscriber is used as a middle man and the subscription process is hijacked to inject a Producer that attempts to coordinate requests. Apparently, this didn't cover all request pattern hence the original bug.

Due to this very specific hijacking, adding backpressure to BufferUntilSubscriber directly didn't work out as it conflicted with the request coordination in a way I couldn't resolve. Therefore, I decided to have a separate class that does backpressure but doesn't have to be involved in request coordination for window (which I believe can't be established with a reasonable strategy).

@stealthcode
Copy link

All I'm asking for is a rename from v2 in 1.x

@akarnokd
Copy link
Member Author

Maybe I missed it but what's the name you'd like?

@akarnokd akarnokd force-pushed the BufferUntilSubscriberBackpressureV2 branch from f0aec07 to f351ca6 Compare October 13, 2015 22:04
@akarnokd
Copy link
Member Author

I've renamed the class to UnicastSubject and combined the State with the OnSubscribeBUS class.

@akarnokd
Copy link
Member Author

I'm going to sleep now (it's midnight here) so if you only have concerns about naming, location and visibility, I suggest merging this PR then posting a separate PR with your changes (and merge it if you can get somebody else to like it in the meantime).

@stealthcode
Copy link

@akarnokd would you mind rebasing this?

@akarnokd akarnokd force-pushed the BufferUntilSubscriberBackpressureV2 branch from f351ca6 to e30a333 Compare November 23, 2015 22:15
@akarnokd
Copy link
Member Author

Done.

@stealthcode
Copy link

👍

akarnokd added a commit that referenced this pull request Nov 23, 2015
…ureV2

Window operators now support backpressure in the inner observable.
@akarnokd akarnokd merged commit 9fddbd9 into ReactiveX:1.x Nov 23, 2015
@akarnokd akarnokd deleted the BufferUntilSubscriberBackpressureV2 branch November 23, 2015 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants