Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental RingBuffer using WriterReaderPhaser #2243

Conversation

benjchristensen
Copy link
Member

As per discussion in #2189 this is an attempt at implementing RxRingBuffer using WriterReaderPhaser.

The performance numbers are worse than #2189, inline with the RW Lock and WIP implementations.

See the last column:

Benchmark                                          (size)   Mode   Samples          1.x   Inline Volatile    + finalize       RW Lock          WIP     WRPhaser
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4757888.048       5264594.520   4956256.323   5288310.755  5032942.628  5147447.030
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    44877.618         42845.758     39209.439     25742.696    29025.955    27779.876
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       42.366            40.979        37.036        24.769       27.260       27.694
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    99981.127         99910.070     94307.080    103112.286   103176.289   100516.101
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.675             4.620         4.670         4.374        4.313        4.413
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4751265.583       4706441.452   4376983.062   4739418.557  4673633.614  4510099.724
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458704.984        480075.261    427165.143    483313.588   476318.407   462373.555
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    42493.290         42178.254     39640.240     42728.480    42542.171    41354.668
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5406448.872       5375090.752   5066264.570   5628401.294  4974892.417  4986054.668
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       44.528            40.990        41.106        24.974       28.212       27.755
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76898.222         72655.377     69748.305     78283.565    78987.646    78550.912
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3172.653          2955.854      3064.749      1858.361     2204.948     2310.804
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5157134.576       5163837.644   4846336.744   5290961.536  5139893.848  4486879.415
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    39961.491         39341.526     37312.117     40418.492    39163.267    37424.146
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.925            35.730        33.948        35.526       35.611       32.287

@benjchristensen
Copy link
Member Author

cc @akarnokd

@benjchristensen
Copy link
Member Author

This failure again:

rx.internal.operators.OperatorRetryTest > testRetryWithBackpressure FAILED
    java.lang.Exception: test timed out after 10000 milliseconds

@benjchristensen
Copy link
Member Author

An update related to this is available at #1908 (comment)

@akarnokd
Copy link
Member

The implementation is incorrect. Once flipped, the onXXX should be routed to the nop_queue, but essentially if the criticalValueAtEnter is less than zero, the unsubscription has happened and nothing should be enqueued. The release() is not idempotent because it can flip a second time back to the emitting-enabled mode.

I believe by joining the ringbuffer and the phaser, you can get away with a single ingress atomic counter. It will provide he phase bit and the writerIndex at the same time.

In addition, I figured out why the multi-consumer queue was needed: it was because an async clear() acts as a second reader using the same poll() and thus would corrupt an SPSC. Maybe an SPSC queue's poll can be modified so a filling the queue with nulls and moving the reader's index up to the writer's index is workable.

@benjchristensen
Copy link
Member Author

Why do they need to be routed anywhere? All future emissions will now return immediately since released == true.

Also, a single counter had the same performance problem. That was the WIP implementation.

@akarnokd
Copy link
Member

Released field is redundant because a negative critical value can indicate a terminal state; one field less.

The WIP produced some nice results, ecxept on a few cases where it drastically underperformed. Could you point me to the sources of this variant to see if more nanoseconds can be saved?

@benjchristensen
Copy link
Member Author

Released field is redundant because a negative critical value can indicate a terminal state; one field less.

Okay, but how is the implementation incorrect, even if it may have other optimizations?

Could you point me to the sources of this variant to see if more nanoseconds can be saved?

The test is in https://github.com/benjchristensen/RxJava/blob/ring-buffer-wip/src/main/java/rx/internal/util/RxRingBuffer.java

As per discussion at ReactiveX#2243 (comment)

Performance numbers are the same with this optimization:

with WriterReaderPhaser using phase state

```
Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4930586.615  1177558.483    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    26117.610      412.515    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       25.466        0.775    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5   102486.748     1377.611    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.087        0.233    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4457316.389   363539.115    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   473724.591    21719.743    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    40023.766     3562.388    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5220487.374   405664.161    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       23.860        1.395    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    78372.613     2185.710    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2189.509       30.923    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5168348.297   107791.880    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    38955.533     4054.434    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.013        2.609    ops/s
```
@benjchristensen
Copy link
Member Author

I just pushed the change you recommended and eliminated the use of released and use the phase state directly. It did not change performance:

with WriterReaderPhaser using phase state

Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4930586.615  1177558.483    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    26117.610      412.515    ops/s
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       25.466        0.775    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5   102486.748     1377.611    ops/s
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.087        0.233    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4457316.389   363539.115    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   473724.591    21719.743    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    40023.766     3562.388    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5220487.374   405664.161    ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       23.860        1.395    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    78372.613     2185.710    ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2189.509       30.923    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5168348.297   107791.880    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    38955.533     4054.434    ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.013        2.609    ops/s

@akarnokd
Copy link
Member

Method release is not idempotent and may flip the phaser back to even. I think the critical section in poll might be the cause of the performance drop. I see that without it, an ongoing poll might pick up a value from a new use of the queue. I'll think about this a bit more. An improvement could be if you eliminated the size() call before each onNext; a failed offer is enough indication of a missing backpressure, no need to count the queue all the time.

@akarnokd
Copy link
Member

akarnokd commented Jan 5, 2015

I did some experimenting on this branch, here are the results:

Benchmark              (size)        1.x   |    ben      | growable-fx |    growable |       spsc 
1SyncStreamOfN              1  3779678,748 | 3750731,962 | 3895262,865 | 3831654,726 | 3815898,055
1SyncStreamOfN           1000    21250,675 |   16863,054 |   17524,493 |   17623,563 |   17311,877
1SyncStreamOfN        1000000       20,406 |      14,935 |      16,457 |      16,812 |      16,697
NAsyncStreamsOfN            1   115390,116 |  108276,321 |  113706,768 |  113270,805 |  104689,284
NAsyncStreamsOfN         1000        2,579 |       2,359 |       2,416 |       2,429 |       2,485
NSyncStreamsOf1             1  3543551,254 | 3634053,632 | 3521790,813 | 3521857,887 | 3588808,022
NSyncStreamsOf1           100   299166,910 |  301841,420 |  268720,642 |  299909,354 |  298040,908
NSyncStreamsOf1          1000    28404,751 |   28600,354 |   22409,451 |   28502,782 |   28327,782
NSyncStreamsOfN             1  4054571,577 | 4059693,836 | 4045262,328 | 4070817,061 | 4126726,344
NSyncStreamsOfN          1000       24,324 |      18,672 |      19,361 |      19,433 |      19,313
TwoAsyncStreamsOfN          1    85846,727 |   82293,815 |   81340,729 |   84980,486 |   83300,137
TwoAsyncStreamsOfN       1000     1823,137 |    1657,242 |    1630,500 |    1631,996 |    1777,078
reamOfNthatMergesIn1        1  3724179,351 | 3749830,891 | 3734612,120 | 3738599,855 | 3594971,781
reamOfNthatMergesIn1     1000    19051,928 |   19221,689 |   19255,202 |   19312,769 |   21084,864
reamOfNthatMergesIn1  1000000       18,265 |      18,202 |      17,673 |      18,295 |      18,360

Benchmark              (size)        1.x   | spmc+2wrp   | spmc+2wrpp  | grow+2wrp   | spsc+2wrp   | spsc+2hp   
1SyncStreamOfN              1  3779678,748 | 3798671,379 | 3724837,669 | 3733385,355 | 3767936,028 | 3788226,802
1SyncStreamOfN           1000    21250,675 |   18564,658 |   18101,748 |   18437,839 |   18530,542 |   18813,822
1SyncStreamOfN        1000000       20,406 |      18,270 |      17,751 |      17,707 |      17,712 |      16,972
NAsyncStreamsOfN            1   115390,116 |  106782,224 |   97994,868 |  112464,083 |  115629,480 |  104921,251
NAsyncStreamsOfN         1000        2,579 |       2,520 |       2,492 |       2,485 |       2,546 |       2,495
NSyncStreamsOf1             1  3543551,254 | 3606618,888 | 3614771,590 | 3567401,694 | 3602242,709 | 3611750,573
NSyncStreamsOf1           100   299166,910 |  299075,019 |  271830,698 |  299063,067 |  301703,721 |  302071,539
NSyncStreamsOf1          1000    28404,751 |   28612,302 |   28363,753 |   28482,301 |   28420,833 |   28577,877
NSyncStreamsOfN             1  4054571,577 | 4143834,293 | 4041081,408 | 4020815,026 | 4003156,953 | 4057396,041
NSyncStreamsOfN          1000       24,324 |      20,916 |      20,821 |      20,553 |      20,601 |      20,736
TwoAsyncStreamsOfN          1    85846,727 |   86027,435 |   83778,359 |   84782,438 |   85682,983 |   84881,540
TwoAsyncStreamsOfN       1000     1823,137 |    1778,062 |    1804,147 |    1758,151 |    1889,458 |    1898,120
reamOfNthatMergesIn1        1  3724179,351 | 3723142,805 | 3751296,228 | 3729408,751 | 3725068,220 | 3717625,162
reamOfNthatMergesIn1     1000    19051,928 |   19251,003 |   21053,425 |   19270,488 |   19392,595 |   19262,662
reamOfNthatMergesIn1  1000000       18,265 |      18,066 |      18,323 |      18,172 |      18,069 |      16,494

The growable is a modified version of JCTools' SpscGrowableArrayQueue to support full capacity utilization. The growable-fx is a variant where it is used as a fixed capacity queue. Both are in par with 1.x except on 2-3 cases where they perform 20% worse. Note that due to the pooling, the queues may reach their full capacity and thus the cost of resize diminishes over time.

The spsc is the usual SpscArrayQueue but again fixed support full capacity utilization. It does well but has 10%-20% loss in some benchmarks.

The -2wrp postfix indicates a different phasing: separate phaser for the writers and separate phaser for the readers: it reduces contention between writers and readers and since both are single threaded, it allows the phaser to use lazySet on its egress counters instead of atomic increment-and-get. In addition, the phase flip uses spinning instead of yielding. They all perform well and get closer to the 1.x mark if not surpassing it; however, there isn't one that mostly outperforms the others within the error margin. The -2wrpp is a padded version which seems to improve on some of the high-contention benchmarks. The final column of spsc-2hp is a two half-phasers (no odd epoch since it isn't really used) and performs quite well except in two benchmarks which drop I can't explain.

In conclusion, I'd chose SpscArrayQueue + two lazy writer-reader-phaser implementation to replace 1.x; it gets close to the baseline and is correct to avoid false sharing of queues between subsequent uses. See implementation here.

@benjchristensen
Copy link
Member Author

Looks interesting. I'll play with the implementation you provided. Do you want to provide a cleaned up pull request of your proposed winner?

@akarnokd
Copy link
Member

akarnokd commented Jan 5, 2015

Sure.

@benjchristensen benjchristensen deleted the experimental-ringbuffer-writerreaderphaser branch June 11, 2015 21:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants