-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experimental RingBuffer using WriterReaderPhaser #2243
Experimental RingBuffer using WriterReaderPhaser #2243
Conversation
Non-blocking emissions with phase controlled unsubscribe/release.
cc @akarnokd |
This failure again:
|
An update related to this is available at #1908 (comment) |
The implementation is incorrect. Once flipped, the onXXX should be routed to the nop_queue, but essentially if the criticalValueAtEnter is less than zero, the unsubscription has happened and nothing should be enqueued. The release() is not idempotent because it can flip a second time back to the emitting-enabled mode. I believe by joining the ringbuffer and the phaser, you can get away with a single ingress atomic counter. It will provide he phase bit and the writerIndex at the same time. In addition, I figured out why the multi-consumer queue was needed: it was because an async clear() acts as a second reader using the same poll() and thus would corrupt an SPSC. Maybe an SPSC queue's poll can be modified so a filling the queue with nulls and moving the reader's index up to the writer's index is workable. |
Why do they need to be routed anywhere? All future emissions will now return immediately since released == true. Also, a single counter had the same performance problem. That was the WIP implementation. |
Released field is redundant because a negative critical value can indicate a terminal state; one field less. The WIP produced some nice results, ecxept on a few cases where it drastically underperformed. Could you point me to the sources of this variant to see if more nanoseconds can be saved? |
Okay, but how is the implementation incorrect, even if it may have other optimizations?
The test is in https://github.com/benjchristensen/RxJava/blob/ring-buffer-wip/src/main/java/rx/internal/util/RxRingBuffer.java |
As per discussion at ReactiveX#2243 (comment) Performance numbers are the same with this optimization: with WriterReaderPhaser using phase state ``` Benchmark (size) Mode Samples Score Score error Units r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4930586.615 1177558.483 ops/s r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 26117.610 412.515 ops/s r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 25.466 0.775 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 102486.748 1377.611 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.087 0.233 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4457316.389 363539.115 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 473724.591 21719.743 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 40023.766 3562.388 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5220487.374 405664.161 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 23.860 1.395 ops/s r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 78372.613 2185.710 ops/s r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2189.509 30.923 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5168348.297 107791.880 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 38955.533 4054.434 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 35.013 2.609 ops/s ```
I just pushed the change you recommended and eliminated the use of
|
Method release is not idempotent and may flip the phaser back to even. I think the critical section in poll might be the cause of the performance drop. I see that without it, an ongoing poll might pick up a value from a new use of the queue. I'll think about this a bit more. An improvement could be if you eliminated the size() call before each onNext; a failed offer is enough indication of a missing backpressure, no need to count the queue all the time. |
I did some experimenting on this branch, here are the results:
The The The In conclusion, I'd chose SpscArrayQueue + two lazy writer-reader-phaser implementation to replace 1.x; it gets close to the baseline and is correct to avoid false sharing of queues between subsequent uses. See implementation here. |
Looks interesting. I'll play with the implementation you provided. Do you want to provide a cleaned up pull request of your proposed winner? |
Sure. |
As per discussion in #2189 this is an attempt at implementing
RxRingBuffer
usingWriterReaderPhaser
.The performance numbers are worse than #2189, inline with the RW Lock and WIP implementations.
See the last column: