Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge many fixes #1895

Closed
wants to merge 5 commits into from
Closed

Merge many fixes #1895

wants to merge 5 commits into from

Conversation

akarnokd
Copy link
Member

Contains the PR #1893 and a possible fix for the premature RxRingBuffer unsubscription and NPE.

@akarnokd
Copy link
Member Author

It's incomplete. If the downstream unsubscribes, it has to unsubscribe each active InnerSubscriber's queue, but simply having actual.add(i.q) is not good enough because terminated InnerSubscribers need to remove their q to avoid memory leaks. That in turn would need to happen from inside RxRingBuffer itself...

}

public boolean isEmpty() {
if (queue == null) {
Queue<Object> q = queue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dangerous as it is now holding a reference to something that could be returned to the pool and used by someone else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading a non-volatile member variable several times when it can be set to null concurrently is even more dangerous.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expectation of this code is that release is not called concurrently with any other function.

If we need to support concurrent release, then we need synchronization and then kill perf. These references to queue won't solve the concurrency issue, but this code should never be invoked concurrently with release.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in #1845 was triggered by such concurrent invocation. Even if we don't let the upstream unsubscribe, downstream may unsubscribe at any time and one has to clean up the InnerSubscribers eventually. In this PR, the release is only triggered by a terminal event from an InnerSubscriber which doesn't run concurrently, but downstream unsubscription from a take(n) would not trigger any release. I have ideas how to re-enable it, but preventing the use of queue and release at the same time requires serialized access.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, why are these queues pooled in the first place? Based on the perf comment it seems the poolless implementation has higher throughput.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, why are these queues pooled in the first place?

Which perf comments are you referring to?

Inside RxRingBuffer it is the createUseAndDestroy1 tests that drove pooling. This is a very common use case in request/response systems. It was a 10x difference I believe. Netflix production code taking canary traffic died when I tried running this code without pooling due to object allocation and garbage collection overhead.

That does not mean however that it is the right solution, as the pooling causes lots of grief. If we move away from pooling though we need a solution that does not kill the system with object allocation overhead. That means we won't be able to use ring buffers (which pre-allocate up front) even if we never use it. Linked lists are also problematic as they allocate a node per item, but the benefit for them is they only allocate when we need the buffer (under backpressure cases).

I'll spend more time thinking on this as well, but please continue trying to help on this. I need someone to work together on it as it's a rather non-trivial problem to balance the various competing needs. The issue that drove to this was primarily massive object allocation overhead that killed throughput. I spent days with Java Mission Control profiling various options to get to the current design ... but obviously left behind concurrency bugs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd guess allocating a 1k ringbuffer and not utilizing it does hurt, but still the original and this pr can't prevent use after release. Hybrid arraylist-linkedlist could help but I haven't seen concurrent and/or lock-free variants

@@ -127,8 +129,10 @@ public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
this.actual = actual;
this.mergeProducer = new MergeProducer<T>(this);
this.delayErrors = delayErrors;
this.buffers = new CompositeSubscription();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rather expensive. It means we are now going to have a HashSet allocated for every merge and flatMap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required for correctness, otherwise the ringbuffers are not returned to the pool on unsubscription. One alternative could be viable to pooling, namely a parameter that specifies the ringbuffer size in merge() and thus it doesn't overallocate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a parameter that specifies the ring buffer size

That's not an option. The point of how this all works is that a developer has no idea if backpressure will be needed or the size of what they're consuming.

Required for correctness

I understand that. The point I was making is that we should be looking for a correct approach that doesn't impact perf, and object allocation overhead is what killed merge originally.

@benjchristensen
Copy link
Member

Performance of 1.0.1 alone and then with these changes:

v1.0.1

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .OperatorMergePerf.'

Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5325435.841   355975.709    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    48829.633     3244.714    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       54.394        4.383    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    97373.673    11044.130    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.809        0.368    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4437715.882   251805.970    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   437548.282    37953.173    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    49829.496     8208.868    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5345381.694   388031.049    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       49.858        3.909    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    74373.157    20872.747    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2998.045       83.685    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5193516.375   377830.538    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34618.127     4024.212    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.271        5.933    ops/s

Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5448592.507   410246.278    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    54425.611     1288.398    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       52.477        4.962    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    92792.488    19384.124    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.957        0.783    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4608856.070   201592.479    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   431920.261    48373.173    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    52309.410     3940.133    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5815289.623   254128.928    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       52.525        0.524    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    77640.706      905.177    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3000.748      263.071    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5397752.619   130514.965    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    29257.005     3984.630    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.506        0.864    ops/s

merge-1 0 1

v1.0.1 + akarnokd:MergeManyFixes

Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4652677.182   124504.915    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    53208.545     1880.771    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       54.342        1.369    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96872.888     3721.793    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.776        0.303    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4164810.391   165025.373    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   464592.905    10825.917    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    51914.168     3224.118    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  4864870.326   180200.813    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.731        2.126    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    74211.611     1191.372    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3037.404      196.364    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4712305.017   181033.640    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35506.729      565.008    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       25.865        1.935    ops/s


Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4720853.728   189280.311    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51373.021     7032.876    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       54.310        5.412    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    97443.762     1708.086    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.435        0.388    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4127156.675    65289.944    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   464984.192    21772.305    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    51828.432     3159.051    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  4867636.838   178784.669    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       51.096        1.170    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    74124.786     4567.624    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3084.738       61.171    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4692041.733   111905.033    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35294.723     1908.378    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.231        1.361    ops/s

merge-1 0 1 changes

@benjchristensen
Copy link
Member

r.o.OperatorMergePerf.merge1SyncStreamOfN drops from 5,325,435/second to 4,720,853/second.

r.o.OperatorMergePerf.mergeNSyncStreamsOfN drops from 5,815,289/second to 4,867,636/second/

r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 drops from 5,397,752/second to 4,692,041/second

I'd like to explore alternatives to this that don't allocate the CompositeSubscription unless there is contention requiring buffering.

@benjchristensen
Copy link
Member

I also am curious why we see so many LinkedNodes in the allocation analysis. I thought we had eliminated all of those.

@akarnokd
Copy link
Member Author

If I read the perf tables correctly, the fix seems to work as good as before if size != 1 in the test. The size == 1 test seem to suffer from increased overhead, probably because the volatile read on queue which prevents register optimizations.

@benjchristensen
Copy link
Member

Yes, it is only when size == 1, but that is a very common case. Many of the performance optimizations in merge are for exactly that case as that is primarily what broke Netflix apps when I didn't have it optimized.

I can't prove it yet, but past experience with this code leads me to believe it's the extra CompositeSubscription allocation that kills the perf. There is a reason I worked so hard to completely eliminate use of that type in merge. That's why Subscriber uses SubscriptionList instead of CompositeSubscription.

@benjchristensen
Copy link
Member

I'd like to remove the changes for handling RxRingBuffer unsubscribe but move forward on the rest of the fixes. We need more time to consider concurrent handling of an unsubscribe and/or whether we can eliminate object pooling without killing perf.

Do you have time right now to separate those changes, or should I go ahead and manually merge this and make those changes?

@akarnokd
Copy link
Member Author

Subscriber is add-only, but merge sources may terminate in any order. Maybe a specialized indexed-composite may help a bit.

I believe a composite subscription is necessary to ensure the correct release in case of external termination (but not enough). I've thought about a test where a lot of merge+take(n) operation is in progress independently and concurrently to each other which may step on each other's toes if an early return is picked up by someone else.

@benjchristensen
Copy link
Member

Subscriber is add-only, but merge sources may terminate in any order.

Yes. That's what makes CompositeSubscription so expensive and why it's not the default behavior for Subscriber.

correct release

With the current use of object pooling, you're probably right.

I've thought about a test where a lot of merge+take(n) operation is in progress independently and concurrently to each other

That kind of test would be valuable.

@akarnokd
Copy link
Member Author

This PR contains the fixes of #1893 (already merged) and everything else is the NPE & RingBuffer fixes; which are still under debate.

The occasional NPE can be fixed by reading the queue into a local variable; which I can do now.

@benjchristensen
Copy link
Member

The "onError from parent" and "emitted > 0" checks seem valid and good to merge. Those are not included in the other fix and seem separate from the RingBuffer discussion aren't they?

@akarnokd
Copy link
Member Author

The first 3 commits on this PR is identical to the 3 in #1893 . The rest is the fix in the RingBuffer and the added composite.

@benjchristensen
Copy link
Member

I'm looking at the diff, not the individual commits. The diff shows changes such as these:

                        if (emitted > 0) {
                            request(emitted);
                        }

and

                   if (!parent) {
                       wip--;
                   }

@benjchristensen
Copy link
Member

I turned off use of the RingBuffer with pooling so it uses SynchronizedQueue with a LinkedList under it:

Benchmark                                          (size)   Mode   Samples   LinkedList   RingBuffer
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5378007.404  5325435.841
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    50969.202    48829.633
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       52.712       54.394
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    97399.587    97373.673
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.197        4.809
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4447883.122  4437715.882
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   442266.165   437548.282
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50053.290    49829.496
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5537145.541  5345381.694
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       48.565       49.858
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    77854.621    74373.157
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2446.907     2998.045
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5135540.942  5193516.375
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34182.359    34618.127
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.466       34.271

The perf is actually okay.

Here is the memory allocation.

screen shot 2014-11-29 at 1 28 31 pm

I wonder if all of the other perf optimizations I ended up doing to merge to handle scalar cases and non-contended hot paths basically makes it so the object pooling with RingBuffer is unnecessary.

I'm going to spend more time playing with this. If we can eliminate object pooling it simplifies a lot for merge. The observeOn use cases probably still is going to be better with a ring buffer because it always goes via a queue, whereas merge tries to avoid the queue at all costs.

@akarnokd
Copy link
Member Author

Those diff lines are already in 1.x branch because of #1893 . The changes page compares against the version which was the head at that time, not now.

@benjchristensen
Copy link
Member

Oh really, that's not very helpful of Github! :-)

I look at the branch name at the top and it says 1.x so I figure it's showing me the diff against the actual branch, not some snapshot in time.

@benjchristensen
Copy link
Member

@akarnokd I have opened #1908 for us to discuss and evaluate other options so we can eliminate the object pooling and associated problems it brings.

@akarnokd
Copy link
Member Author

Excellent!

@akarnokd
Copy link
Member Author

akarnokd commented Dec 1, 2014

Closing because the queueing is re-evaluated.

@akarnokd akarnokd closed this Dec 1, 2014
@akarnokd akarnokd deleted the MergeManyFixes branch February 2, 2015 11:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants