-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge many fixes #1895
Merge many fixes #1895
Conversation
It's incomplete. If the downstream unsubscribes, it has to unsubscribe each active InnerSubscriber's queue, but simply having |
} | ||
|
||
public boolean isEmpty() { | ||
if (queue == null) { | ||
Queue<Object> q = queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dangerous as it is now holding a reference to something that could be returned to the pool and used by someone else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading a non-volatile member variable several times when it can be set to null concurrently is even more dangerous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expectation of this code is that release is not called concurrently with any other function.
If we need to support concurrent release, then we need synchronization and then kill perf. These references to queue won't solve the concurrency issue, but this code should never be invoked concurrently with release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in #1845 was triggered by such concurrent invocation. Even if we don't let the upstream unsubscribe, downstream may unsubscribe at any time and one has to clean up the InnerSubscribers eventually. In this PR, the release is only triggered by a terminal event from an InnerSubscriber which doesn't run concurrently, but downstream unsubscription from a take(n) would not trigger any release. I have ideas how to re-enable it, but preventing the use of queue and release at the same time requires serialized access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, why are these queues pooled in the first place? Based on the perf comment it seems the poolless implementation has higher throughput.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, why are these queues pooled in the first place?
Which perf comments are you referring to?
Inside RxRingBuffer
it is the createUseAndDestroy1
tests that drove pooling. This is a very common use case in request/response systems. It was a 10x difference I believe. Netflix production code taking canary traffic died when I tried running this code without pooling due to object allocation and garbage collection overhead.
That does not mean however that it is the right solution, as the pooling causes lots of grief. If we move away from pooling though we need a solution that does not kill the system with object allocation overhead. That means we won't be able to use ring buffers (which pre-allocate up front) even if we never use it. Linked lists are also problematic as they allocate a node per item, but the benefit for them is they only allocate when we need the buffer (under backpressure cases).
I'll spend more time thinking on this as well, but please continue trying to help on this. I need someone to work together on it as it's a rather non-trivial problem to balance the various competing needs. The issue that drove to this was primarily massive object allocation overhead that killed throughput. I spent days with Java Mission Control profiling various options to get to the current design ... but obviously left behind concurrency bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd guess allocating a 1k ringbuffer and not utilizing it does hurt, but still the original and this pr can't prevent use after release. Hybrid arraylist-linkedlist could help but I haven't seen concurrent and/or lock-free variants
@@ -127,8 +129,10 @@ public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) { | |||
this.actual = actual; | |||
this.mergeProducer = new MergeProducer<T>(this); | |||
this.delayErrors = delayErrors; | |||
this.buffers = new CompositeSubscription(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is rather expensive. It means we are now going to have a HashSet
allocated for every merge
and flatMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required for correctness, otherwise the ringbuffers are not returned to the pool on unsubscription. One alternative could be viable to pooling, namely a parameter that specifies the ringbuffer size in merge() and thus it doesn't overallocate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a parameter that specifies the ring buffer size
That's not an option. The point of how this all works is that a developer has no idea if backpressure will be needed or the size of what they're consuming.
Required for correctness
I understand that. The point I was making is that we should be looking for a correct approach that doesn't impact perf, and object allocation overhead is what killed merge originally.
Performance of 1.0.1 alone and then with these changes: v1.0.1 ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .OperatorMergePerf.'
v1.0.1 + akarnokd:MergeManyFixes
|
r.o.OperatorMergePerf.merge1SyncStreamOfN drops from 5,325,435/second to 4,720,853/second. r.o.OperatorMergePerf.mergeNSyncStreamsOfN drops from 5,815,289/second to 4,867,636/second/ r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 drops from 5,397,752/second to 4,692,041/second I'd like to explore alternatives to this that don't allocate the |
I also am curious why we see so many LinkedNodes in the allocation analysis. I thought we had eliminated all of those. |
If I read the perf tables correctly, the fix seems to work as good as before if size != 1 in the test. The size == 1 test seem to suffer from increased overhead, probably because the volatile read on queue which prevents register optimizations. |
Yes, it is only when size == 1, but that is a very common case. Many of the performance optimizations in merge are for exactly that case as that is primarily what broke Netflix apps when I didn't have it optimized. I can't prove it yet, but past experience with this code leads me to believe it's the extra |
I'd like to remove the changes for handling Do you have time right now to separate those changes, or should I go ahead and manually merge this and make those changes? |
Subscriber is add-only, but merge sources may terminate in any order. Maybe a specialized indexed-composite may help a bit. I believe a composite subscription is necessary to ensure the correct release in case of external termination (but not enough). I've thought about a test where a lot of merge+take(n) operation is in progress independently and concurrently to each other which may step on each other's toes if an early return is picked up by someone else. |
Yes. That's what makes
With the current use of object pooling, you're probably right.
That kind of test would be valuable. |
This PR contains the fixes of #1893 (already merged) and everything else is the NPE & RingBuffer fixes; which are still under debate. The occasional NPE can be fixed by reading the queue into a local variable; which I can do now. |
The "onError from parent" and "emitted > 0" checks seem valid and good to merge. Those are not included in the other fix and seem separate from the RingBuffer discussion aren't they? |
The first 3 commits on this PR is identical to the 3 in #1893 . The rest is the fix in the RingBuffer and the added composite. |
I'm looking at the diff, not the individual commits. The diff shows changes such as these: if (emitted > 0) {
request(emitted);
} and if (!parent) {
wip--;
} |
I turned off use of the
The perf is actually okay. Here is the memory allocation. I wonder if all of the other perf optimizations I ended up doing to I'm going to spend more time playing with this. If we can eliminate object pooling it simplifies a lot for merge. The |
Those diff lines are already in 1.x branch because of #1893 . The changes page compares against the version which was the head at that time, not now. |
Oh really, that's not very helpful of Github! :-) I look at the branch name at the top and it says 1.x so I figure it's showing me the diff against the actual branch, not some snapshot in time. |
Excellent! |
Closing because the queueing is re-evaluated. |
Contains the PR #1893 and a possible fix for the premature RxRingBuffer unsubscription and NPE.