-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge and MergeMaxConcurrent unified and rewritten #2928
Merge and MergeMaxConcurrent unified and rewritten #2928
Conversation
I haven't benchmarked this because my current, old laptop where I wrote this PR is prone to overheating. |
ba9141f
to
18fd06a
Compare
I'm stoked you've taken this on. Merge has been a thorn in our side for a while. I've only had a cursory look so far, looks great but can you break up the |
Somebody go buy this man a laptop (how about you Netflix)! On 1 May 2015 at 06:04, David Karnok [email protected] wrote:
|
18fd06a
to
8a7e1fc
Compare
I've updated this PR and added safeguard against onNext before request() and made sure the queues are released if the stream is unsubscribed before completing normally. |
Here is a benchmark comparison on an Intel Celeron 1005M @ 1.9GHz, Windows 7 x64, Java 8u45 (this is not the overheating laptop of mine). It seems the single-shot overhead is greater in general, but note that the baseline merge has bugs especially with scalar sources: such merges may terminate much earlier and not deliver all values.
|
@davidmoten Can't really do because I need to return two values: the remaining request and how many main elements to request. |
OK, no worries On Sat, 2 May 2015 23:34 David Karnok [email protected] wrote:
|
8a7e1fc
to
ae154cd
Compare
ae154cd
to
4725498
Compare
Fixed a potential concurrency problem in |
@akarnokd I've got this on my list to spend time on ... but it will take several hours to review and some non-trivial time to profile and test, including against our applications. Thank you very much for pursuing this. @davidmoten You mention it's been a thorn in your side. What issues have you been having and are there test cases for them that we can use to assert this fixes them? |
Hi Ben, I haven't got any problems with merge at the moment but I have considered it a thorn in our side because it's such an important operator and yet the code isn't pretty (and has stuff in there now that is disabled like batching because we couldn't get it to work). It's no doubt suffered through RxJava's evolution and a nice clean fresh start from @akarnokd would be a great contribution. In terms of prettiness, and this might be for @akarnokd to consider in his rewrite, is that there are decisions made in the merge operator about how much to request from each source (the initial and subsequent requests) that I found hard to understand and I suspect would be nice to base on research /perfs or even make customizable. To this end I'd like to see decoupling of this sort of logic into a MergeRequestStrategy class say. I guess my aims would be
|
The choice of 128 for server (and 16 for Android) was made after testing various different settings: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L162 What do you recommend doing differently?
What do you want to see done? |
128 is the initial request but the requests following were much smaller On 8 May 2015 at 08:42, Ben Christensen [email protected] wrote:
|
I think I can simplify what I want having thought about it a little bit. I'm happy with 128 as an initial request and would like further requests to also be for the same number. |
Ah, that makes sense. I tried that at one point but couldn't get it to work as it got ridiculously complicated and I never went back to solve it. I definitely would like as well. |
While rewatching old Rx videos, I remembered one can implement flatMap in terms of merge and vice versa. I think if merge was be implemented via flatMap, one could save the allocation cost of the lifted map. Then merge == flatMap(o -> o). Any interest in pursuing this? |
That should be easy to check. |
4725498
to
9af0485
Compare
* Subscriber that requests Long.MAX_VALUE from upstream and doesn't queue. | ||
* @param <T> | ||
*/ | ||
static final class FastInnerSubscriber<T> extends Subscriber<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this optimization can't be used because merge
itself queues. See the bug report above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you say let's drop the unbounded mode completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't consider this a bug but rather an inconvenience. For shorter streams, the optimization is quite valuable. For longer streams, we could have a batcher operator which changes the request amount to some limited number, just like one would use observeOn()
without actually introducing asynchrony.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not just an inconvenience. It's a bug. It breaks the entire purpose of backpressure by having an unbounded queue. The whole point of request(n)
backpressure semantics is that things "just work" and there are no unbounded buffers, memory leaks, or dropping data unless a developer actively decides to do any of those things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine, I'll remove the optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to remove optimization, it just needs to be done differently, like the current version has. See comments here: #2928 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't bound that array because I'd need to track who originated each value to ask the proper sender for replenishment, which is essentially what the regular path does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the point of my comments. We have to maintain the backpressure semantics of the "regular path". We can't break backpressure in the name of optimization.
0913412
to
b883f64
Compare
Pulling comments on optimization into the main comments rather than the diffs which are getting hidden as code changes. Modified comment from #2928 (comment): The downstream requesting unbounded data does not necessarily mean unbounded up. The The only cases I'm aware of where it doesn't need to buffer are the following where no concurrent emission occurs:
Since (3) is the common case to optimize for (ie. many short synchronous streams), we would have to determine this at runtime. Basically we assume that if we never have This is why the current implementation has an optimization to skip the queues: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMerge.java#L612 and https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMerge.java#L686 But it uses the queues as soon as concurrent emissions happen. So we can't have an unbounded buffer as an optimization, but we can try to skip use of the buffer when we don't have concurrent emissions. As soon as we have concurrent emissions though we must use the buffer. And we must maintain |
Okay. I'll remove the unbounded paths and add a fast-path to the InnerSubscribers. However, I found a few cases that needs further decisions: In In the same test class in The |
Thank you.
That sounds wrong. A given
That's a hard one. I can see your point, but
Then let's fix it in 1.x and rebase this PR on top so we are comparing the right metrics. |
I'm busy with the changes to this PR. Could you do this? |
Sure. |
:-) |
This test was reported broken in ReactiveX#2928 (comment) Fixing by adding the use of LatchedObserver. Previously broken test results: ``` r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync 1 thrpt 5 363615.622 115041.519 ops/s r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync 1000 thrpt 5 350.204 125.773 ops/s r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync 1000000 thrpt 5 0.319 0.184 ops/s ``` Fixed results: ``` r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync 1 thrpt 5 102109.681 8709.920 ops/s r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync 1000 thrpt 5 403.071 130.651 ops/s r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync 1000000 thrpt 5 0.355 0.070 ops/s ```
b883f64
to
151c877
Compare
I did recover most of the performance, but still some cases lag behind: I managed this by adding a batching request to the InnerSubscriber (same logic as in #3030). I think the remaining performance loss is due to requesting from the main source one by one, but I'm not sure if I want to batch those as well because the maxConcurrent nature. The duplicate tryEmit is due to avoid a bimorphic dispatch: if I implement a common base class, I lose ~5% from the throughput values. I'm finished for today but I'll try to get back to the 1.x performance in the remaining clases. |
It seems the merge is quite sensitive to method structuring: just by refactoring the scalar emission path I got some notable performance back, but still, the single-value merge are a bit slower than 1.x. I think, however, that the green improvements are quite worth it. These are the two 1000 sized tests with their error range:
They never got closer to the 1.x line. At one point, I changed merge() to assume MAX_VALUE requests all the time but it didn't affect the number and I have no idea where the performance leaks. They both merge 1000 scalar values which don't touch any queue, don't account the production and just request another value before they return. |
Nothing stands out when I do a Flight Recorder profiling ... reviewing code changes. |
@akarnokd Do the new unit tests in this PR cover the things that were broken in the current merge impl? If not, what other use cases are broken today that this fixes? Note that I'm still digging into this revised code and exploring various perf tests. |
Yes, there was a potential hang when merging only scalar values and when merging a mixture of scalar and non-scalar values. |
Were you ever able to reproduce that with a unit test? Do you have that code? |
They are the 'flatMapRangeMixedAsyncLoop' and 'testFastMergeFullScalar' tests. |
child.onNext(((ScalarSynchronousObservable<? extends R>)o).t); | ||
child.onCompleted(); | ||
} else { | ||
o.subscribe(new Subscriber<R>(child) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this have to be unsafeSubscribe
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I've updated & squashed the PR.
91bdc5f
to
0420193
Compare
I did another pass on this and think it's good for merging and building a snapshot for a few days of final testing before release. I still question the subscribe/unsafeSubscribe thing above, but can discuss that further. |
Merge and MergeMaxConcurrent unified and rewritten
That was a simple oversight, it was already fixed before you merged this. |
I've rewritten
merge
andmergeMaxConcurrent
together from scratch.maxConcurrent
overloads to the array-merge and the mergeDelayErrorOperatorMapNotification
beause it disregarded backpressure on its terminal emissions.OperatorMergeTest.shouldNotCompleteIfThereArePendingScalarSynchronousEmissionsWhenTheLastInnerSubscriberCompletes
because it is legal to get onCompleted without any request.