Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge and MergeMaxConcurrent unified and rewritten #2928

Merged
merged 1 commit into from
Jul 14, 2015

Conversation

akarnokd
Copy link
Member

I've rewritten merge and mergeMaxConcurrent together from scratch.

  • Added experimental maxConcurrent overloads to the array-merge and the mergeDelayError
  • Fixed OperatorMapNotification beause it disregarded backpressure on its terminal emissions.
  • While debugging some merge test failures, I've encountered a bug in OperatorPublish with the same underlying logical error (see comment) and fixed it here.
  • Accidentally introduced a few tabs, therefore, I went in and replaced all tabs with spaces.
  • I've changed OperatorMergeTest.shouldNotCompleteIfThereArePendingScalarSynchronousEmissionsWhenTheLastInnerSubscriberCompletes because it is legal to get onCompleted without any request.

@akarnokd
Copy link
Member Author

I haven't benchmarked this because my current, old laptop where I wrote this PR is prone to overheating.

@davidmoten
Copy link
Collaborator

I'm stoked you've taken this on. Merge has been a thorn in our side for a while. I've only had a cursory look so far, looks great but can you break up the emitLoop method, it's super long (>200 lines)?

@davidmoten
Copy link
Collaborator

Somebody go buy this man a laptop (how about you Netflix)!

On 1 May 2015 at 06:04, David Karnok [email protected] wrote:

I haven't benchmarked this because my current, old laptop where I wrote
this PR is prone to overheating.


Reply to this email directly or view it on GitHub
#2928 (comment).

@akarnokd akarnokd force-pushed the OperatorMergeFullRewrite branch from 18fd06a to 8a7e1fc Compare May 1, 2015 06:34
@akarnokd
Copy link
Member Author

akarnokd commented May 1, 2015

I've updated this PR and added safeguard against onNext before request() and made sure the queues are released if the stream is unsubscribed before completing normally.

@akarnokd
Copy link
Member Author

akarnokd commented May 1, 2015

Here is a benchmark comparison on an Intel Celeron 1005M @ 1.9GHz, Windows 7 x64, Java 8u45 (this is not the overheating laptop of mine).

mergeperf

It seems the single-shot overhead is greater in general, but note that the baseline merge has bugs especially with scalar sources: such merges may terminate much earlier and not deliver all values.

Benchmark                    (size)      Master     Master err           PR     PR error
flatMapIntPassthruAsync             1  498695,426   259249,193   412488,433   257250,017
flatMapIntPassthruAsync        1000       628,490      325,556      826,485     1030,174
flatMapIntPassthruAsync     1000000         0,372        0,144        0,845        0,266
flatMapIntPassthruSync            1   2413250,773   340583,441  1380370,980   268957,879
flatMapIntPassthruSync         1000     16731,496     5159,943    16374,699     6581,767
flatMapIntPassthruSync      1000000        17,016        4,609       15,445        4,289
flatMapTwoNestedSync              1   1809099,547   314275,657  1074829,348   214620,187
flatMapTwoNestedSync           1000      8908,864     1665,210    12105,668     4147,721
flatMapTwoNestedSync        1000000         8,553        4,492       13,661        3,763
merge1SyncStreamOfN               1   2073223,271   624706,196  1340920,111   321874,624
merge1SyncStreamOfN            1000     15583,667     5675,705    24295,187      741,011
merge1SyncStreamOfN         1000000        16,086        3,177       30,617       14,407
mergeNAsyncStreamsOfN             1     53251,280     1821,052    77537,033    63762,352
mergeNAsyncStreamsOfN          1000         6,258        2,367        7,633        4,073
mergeNSyncStreamsOf1              1   1814390,194   250905,669  1221128,575   523755,940
mergeNSyncStreamsOf1            100    221860,250    69297,969   149696,874    13363,647
mergeNSyncStreamsOf1           1000     20576,165     2416,898    16998,408     2414,499
mergeNSyncStreamsOfN              1   2076685,785   558323,104  1382133,163   931994,488
mergeNSyncStreamsOfN           1000        16,458        3,175       28,144       15,014
mergeTwoAsyncStreamsOfN           1     45570,075     8864,806    62306,327    23355,026
mergeTwoAsyncStreamsOfN        1000      1086,413      260,172     3059,881      554,938
oneStreamOfNthatMergesIn1         1   1894495,101   373234,431  1430870,599   883644,992
oneStreamOfNthatMergesIn1      1000     18433,963     6341,333    18367,707    12874,644
oneStreamOfNthatMergesIn1   1000000        17,750        2,938       17,197        8,896

@akarnokd
Copy link
Member Author

akarnokd commented May 2, 2015

@davidmoten Can't really do because I need to return two values: the remaining request and how many main elements to request.

@davidmoten
Copy link
Collaborator

OK, no worries

On Sat, 2 May 2015 23:34 David Karnok [email protected] wrote:

@davidmoten https://github.com/davidmoten Can't really do because I
need to return two values: the remaining request and how many main elements
to request.


Reply to this email directly or view it on GitHub
#2928 (comment).

@akarnokd akarnokd force-pushed the OperatorMergeFullRewrite branch from 8a7e1fc to ae154cd Compare May 4, 2015 12:54
@akarnokd
Copy link
Member Author

akarnokd commented May 4, 2015

i7 4790, Windows 7 x64, Java 8u45:

image

I've tried my best to close the gap with size = 1 cases, but couldn't figure out why it is that slower: even if I remove the emitter loop because of the trivial case, it is still 10% slower. Perhaps someone else can spot some unnecessary operation I missed.

@akarnokd akarnokd force-pushed the OperatorMergeFullRewrite branch from ae154cd to 4725498 Compare May 5, 2015 09:06
@akarnokd
Copy link
Member Author

akarnokd commented May 5, 2015

Fixed a potential concurrency problem in MergeProducer.request().

@benjchristensen
Copy link
Member

@akarnokd I've got this on my list to spend time on ... but it will take several hours to review and some non-trivial time to profile and test, including against our applications.

Thank you very much for pursuing this.

@davidmoten You mention it's been a thorn in your side. What issues have you been having and are there test cases for them that we can use to assert this fixes them?

@davidmoten
Copy link
Collaborator

Hi Ben, I haven't got any problems with merge at the moment but I have considered it a thorn in our side because it's such an important operator and yet the code isn't pretty (and has stuff in there now that is disabled like batching because we couldn't get it to work). It's no doubt suffered through RxJava's evolution and a nice clean fresh start from @akarnokd would be a great contribution.

In terms of prettiness, and this might be for @akarnokd to consider in his rewrite, is that there are decisions made in the merge operator about how much to request from each source (the initial and subsequent requests) that I found hard to understand and I suspect would be nice to base on research /perfs or even make customizable. To this end I'd like to see decoupling of this sort of logic into a MergeRequestStrategy class say. I guess my aims would be

  • to be able to customize the requesting behaviour of the merge operator under backpressure for my use cases to increase performance
  • to optimise the RxJava default request behaviour for the most common use cases (I guess we'd take a wild stab at what they are)

@benjchristensen
Copy link
Member

optimise the RxJava default request behaviour for the most common use cases

The choice of 128 for server (and 16 for Android) was made after testing various different settings: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L162

What do you recommend doing differently?

to be able to customize the requesting behaviour of the merge operator under backpressure for my use cases to increase performance

What do you want to see done?

@davidmoten
Copy link
Collaborator

128 is the initial request but the requests following were much smaller
from memory (1, 3, 2,1, etc) and it was hard to figure out what strategy
for those secondary requests was at play and if it had been benchmarked
against other strategies.

On 8 May 2015 at 08:42, Ben Christensen [email protected] wrote:

optimise the RxJava default request behaviour for the most common use cases

The choice of 128 was made after testing various different settings:
https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L162

What do you recommend doing differently?

to be able to customize the requesting behaviour of the merge operator
under backpressure for my use cases to increase performance

What do you want to see done?


Reply to this email directly or view it on GitHub
#2928 (comment).

@davidmoten
Copy link
Collaborator

I think I can simplify what I want having thought about it a little bit. I'm happy with 128 as an initial request and would like further requests to also be for the same number.

@benjchristensen
Copy link
Member

would like further requests to also be for the same number.

Ah, that makes sense. I tried that at one point but couldn't get it to work as it got ridiculously complicated and I never went back to solve it. I definitely would like as well.

@akarnokd
Copy link
Member Author

akarnokd commented Jun 6, 2015

While rewatching old Rx videos, I remembered one can implement flatMap in terms of merge and vice versa. I think if merge was be implemented via flatMap, one could save the allocation cost of the lifted map. Then merge == flatMap(o -> o). Any interest in pursuing this?

@headinthebox
Copy link
Contributor

That should be easy to check.

@akarnokd akarnokd force-pushed the OperatorMergeFullRewrite branch from 4725498 to 9af0485 Compare June 8, 2015 06:07
* Subscriber that requests Long.MAX_VALUE from upstream and doesn't queue.
* @param <T>
*/
static final class FastInnerSubscriber<T> extends Subscriber<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this optimization can't be used because merge itself queues. See the bug report above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you say let's drop the unbounded mode completely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't consider this a bug but rather an inconvenience. For shorter streams, the optimization is quite valuable. For longer streams, we could have a batcher operator which changes the request amount to some limited number, just like one would use observeOn() without actually introducing asynchrony.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not just an inconvenience. It's a bug. It breaks the entire purpose of backpressure by having an unbounded queue. The whole point of request(n) backpressure semantics is that things "just work" and there are no unbounded buffers, memory leaks, or dropping data unless a developer actively decides to do any of those things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, I'll remove the optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to remove optimization, it just needs to be done differently, like the current version has. See comments here: #2928 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't bound that array because I'd need to track who originated each value to ask the proper sender for replenishment, which is essentially what the regular path does.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the point of my comments. We have to maintain the backpressure semantics of the "regular path". We can't break backpressure in the name of optimization.

@benjchristensen
Copy link
Member

Pulling comments on optimization into the main comments rather than the diffs which are getting hidden as code changes.

Modified comment from #2928 (comment):


The downstream requesting unbounded data does not necessarily mean unbounded up. The merge operator is an async operator, just like zip and observeOn since it does queueing. This means the merge operator itself IS an async operator and thus has buffered, request(n) semantics regardless of whether the downstream requests Long.MAX_VALUE or not.

The only cases I'm aware of where it doesn't need to buffer are the following where no concurrent emission occurs:

  1. When merge has only a single stream to merge (not a real merge, so not an interesting case)
  2. When maxConcurrent==1 which is the same as concat or effectively the same as usecase 1 above.
  3. When each Observable being merged is synchronous, which can't be known by merge ahead of time. In this case it again behaves like concat.

Since (3) is the common case to optimize for (ie. many short synchronous streams), we would have to determine this at runtime. Basically we assume that if we never have missed occur due to concurrent emission, we stay on the "fast path", but as soon as we see concurrency occur we have to use queues.

This is why the current implementation has an optimization to skip the queues: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMerge.java#L612 and https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMerge.java#L686

But it uses the queues as soon as concurrent emissions happen.

So we can't have an unbounded buffer as an optimization, but we can try to skip use of the buffer when we don't have concurrent emissions. As soon as we have concurrent emissions though we must use the buffer. And we must maintain request(n) semantics.

@akarnokd
Copy link
Member Author

Okay. I'll remove the unbounded paths and add a fast-path to the InnerSubscribers.

However, I found a few cases that needs further decisions:

In OperatorMergeDelayErrorTest.testErrorDelayed1 expects the source to emit "six" which doesn't happen in the normal path of this PR because I shut the source down. The test indicates the original merge allowed value emissions after error from the same source.

In the same test class in testMergeSourceWhichDoesntPropagateExceptionBack if the child subscriber's onNext throws, why do we want to keep emitting the subsequent values if delayErrors == true? If the child crashed while trying to work with the value why do we think the next value won't crash it or find it in an inconsistent state?

The OperatorFlatMapPerf.flatMapIntPassthruAsync has a logical error. It runs an async observation but doesn't wait for the completion of it. Basically it floods the system with scheduled tasks. If I add a latched observer, I get ~ 200k ops/s with both 1.x and this PR.

@benjchristensen
Copy link
Member

I'll remove the unbounded paths and add a fast-path to the InnerSubscribers.

Thank you.

The test indicates the original merge allowed value emissions after error from the same source.

That sounds wrong. A given Observable can't emit after an onError so that test would be invalid.

If the child crashed while trying to work with the value why do we think the next value won't crash it or find it in an inconsistent state?

That's a hard one. I can see your point, but mergeDelayError asks to "delay errors", so we delay them. If we change that behavior we will almost certainly break usage. This unit test came from user requirements for mergeDelayError.

The OperatorFlatMapPerf.flatMapIntPassthruAsync has a logical error

Then let's fix it in 1.x and rebase this PR on top so we are comparing the right metrics.

@akarnokd
Copy link
Member Author

Then let's fix it in 1.x

I'm busy with the changes to this PR. Could you do this?

@benjchristensen
Copy link
Member

Could you do this?

Sure.

@akarnokd
Copy link
Member Author

Well, my first try is mostly a disaster:

image

@benjchristensen
Copy link
Member

:-)

benjchristensen added a commit to benjchristensen/RxJava that referenced this pull request Jun 18, 2015
This test was reported broken in ReactiveX#2928 (comment)

Fixing by adding the use of LatchedObserver.

Previously broken test results:

```
r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync         1  thrpt         5   363615.622   115041.519    ops/s
r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync      1000  thrpt         5      350.204      125.773    ops/s
r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync   1000000  thrpt         5        0.319        0.184    ops/s
```

Fixed results:

```
r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync         1  thrpt         5   102109.681     8709.920    ops/s
r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync      1000  thrpt         5      403.071      130.651    ops/s
r.o.OperatorFlatMapPerf.flatMapIntPassthruAsync   1000000  thrpt         5        0.355        0.070    ops/s
```
@akarnokd akarnokd force-pushed the OperatorMergeFullRewrite branch from b883f64 to 151c877 Compare June 18, 2015 20:12
@akarnokd
Copy link
Member Author

I did recover most of the performance, but still some cases lag behind:

image

I managed this by adding a batching request to the InnerSubscriber (same logic as in #3030). I think the remaining performance loss is due to requesting from the main source one by one, but I'm not sure if I want to batch those as well because the maxConcurrent nature. The duplicate tryEmit is due to avoid a bimorphic dispatch: if I implement a common base class, I lose ~5% from the throughput values.

I'm finished for today but I'll try to get back to the 1.x performance in the remaining clases.

@akarnokd
Copy link
Member Author

It seems the merge is quite sensitive to method structuring: just by refactoring the scalar emission path I got some notable performance back, but still, the single-value merge are a bit slower than 1.x. I think, however, that the green improvements are quite worth it.

image

These are the two 1000 sized tests with their error range:

   flatMapIntPassthruSync       1000  thrpt         5    53754,733     4624,880    ops/s
oneStreamOfNthatMergesIn1      1000  thrpt         5    55004,627      772,675    ops/s

They never got closer to the 1.x line. At one point, I changed merge() to assume MAX_VALUE requests all the time but it didn't affect the number and I have no idea where the performance leaks. They both merge 1000 scalar values which don't touch any queue, don't account the production and just request another value before they return.

@benjchristensen
Copy link
Member

Nothing stands out when I do a Flight Recorder profiling ... reviewing code changes.

@benjchristensen
Copy link
Member

@akarnokd Do the new unit tests in this PR cover the things that were broken in the current merge impl? If not, what other use cases are broken today that this fixes?

Note that I'm still digging into this revised code and exploring various perf tests.

@akarnokd
Copy link
Member Author

Yes, there was a potential hang when merging only scalar values and when merging a mixture of scalar and non-scalar values.

@benjchristensen
Copy link
Member

Were you ever able to reproduce that with a unit test? Do you have that code?

@akarnokd
Copy link
Member Author

They are the 'flatMapRangeMixedAsyncLoop' and 'testFastMergeFullScalar' tests.

child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
child.onCompleted();
} else {
o.subscribe(new Subscriber<R>(child) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this have to be unsafeSubscribe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I've updated & squashed the PR.

@akarnokd akarnokd force-pushed the OperatorMergeFullRewrite branch from 91bdc5f to 0420193 Compare July 14, 2015 17:18
@benjchristensen
Copy link
Member

I did another pass on this and think it's good for merging and building a snapshot for a few days of final testing before release.

I still question the subscribe/unsafeSubscribe thing above, but can discuss that further.

benjchristensen added a commit that referenced this pull request Jul 14, 2015
Merge and MergeMaxConcurrent unified and rewritten
@benjchristensen benjchristensen merged commit 36845db into ReactiveX:1.x Jul 14, 2015
@akarnokd
Copy link
Member Author

I still question the subscribe/unsafeSubscribe thing above, but can discuss that further.

That was a simple oversight, it was already fixed before you merged this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants