Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pursue Elimination of RingBuffer Pooling #1908

Closed
benjchristensen opened this issue Nov 29, 2014 · 41 comments
Closed

Pursue Elimination of RingBuffer Pooling #1908

benjchristensen opened this issue Nov 29, 2014 · 41 comments

Comments

@benjchristensen
Copy link
Member

The use of object pooling significantly complicates the unsubscribe behavior and when it is safe to release. This issue is to track research on how we can eliminate pooling to improve or fix correctness while still maintaining good performance (and memory allocation and GC behavior).

The merge and observeOn use cases are good to track performance and already have JMH benchmarks in place.

Here is how to run the tests:

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOnPerf.*'

Here are results for merge and observeOn comparing use of pooled SpmcArrayQueue vs SynchronizedQueue (a synchronized LinkedList).

The ** markers indicate where performance degradation happened.

Merge

Benchmark                                          (size)   Mode   Samples   RingBuffer  ||   LinkedList
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5448592.507  ||  5332264.735    5358807.217
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    54425.611  ||    52385.763      52409.485
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       52.477  ||       53.928         54.128
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    92792.488  ||   103554.112     104711.192
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.957  ||        4.194          4.143
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4608856.070  ||  4294087.613    4688656.691
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   431920.261  ||   451007.165     333391.274**
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    52309.410  ||    49952.664      50057.346
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5815289.623  ||  5558593.305    5575812.189
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       52.525  ||       49.788         49.567
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    77640.706  ||    79530.128      79472.088
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3000.748  ||     2358.716**     2397.035**
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5397752.619  ||  5282943.171    5361848.221
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    29257.005  ||    34150.793      35196.066
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.506  ||       25.827**       26.517**
ObserveOn

Benchmark                                         (size)   Mode   Samples   RingBuffer  ||    LinkedList       LinkedList
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   109192.288  ||    111037.202       110112.152
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     6952.955  ||      2846.331**       2821.400**
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       12.267  ||         9.988**         10.063**
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16430666.170  ||  16284869.881     16504292.796
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   153431.778  ||    158892.599       157288.399
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      150.061  ||       159.481          149.546
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16899.056  ||     16111.396        16532.411
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     7262.566  ||      5742.547**       5504.293**
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.520  ||        10.364**          9.954**

The SynchronizedQueue was only ever intended for use by environments without sun.misc.Unsafe (such as Android) so it is worth exploring other alternatives that don't involve a ring buffer (and allocation overhead) but are thread-safe for the single-produce-multi-consumer use cases the RxRingBuffer is used in (and then rename to RxQueue or something like that).

@benjchristensen
Copy link
Member Author

Here are some tests with MpscLinkedQueue and SpscLinkedQueue with merge and observeOn.

Merge

Benchmark                                          (size)   Mode   Samples  SpmcArrayQueue  ||     Synchronized LinkedList    ||  MpscLinkedQueue7^
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5     5448592.507  ||  5332264.735    5358807.217    ||       5280793.113
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5       54425.611  ||    52385.763      52409.485    ||         51982.505
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5          52.477  ||       53.928         54.128    ||            59.971
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5       92792.488  ||   103554.112     104711.192    ||        104769.406
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5           4.957  ||        4.194          4.143    ||             4.750
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5     4608856.070  ||  4294087.613    4688656.691    ||       4274735.897
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5      431920.261  ||   451007.165     333391.274**  ||        440251.858
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5       52309.410  ||    49952.664      50057.346    ||         49188.282
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5     5815289.623  ||  5558593.305    5575812.189    ||       5576949.872
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5          52.525  ||       49.788         49.567    ||            51.102
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5       77640.706  ||    79530.128      79472.088    ||         75433.006
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5        3000.748  ||     2358.716**     2397.035**  ||          2881.911
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5     5397752.619  ||  5282943.171    5361848.221    ||       5445358.040
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5       29257.005  ||    34150.793      35196.066    ||         34305.563
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5          35.506  ||       25.827**       26.517**  ||            33.594

^MpscLinkedQueue7 => This can be used by merge and observeOn.  Currently unbounded and needs to be bounded.



ObserveOn


Benchmark                                         (size)   Mode   Samples SpmcArrayQueue  ||        Synchronized LinkedList    || MpscLinkedQueue7^ || SpscLinkedQueue^
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5     109192.288  ||    111037.202       110112.152    ||       106683.693  ||      109647.796
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5       6952.955  ||      2846.331**       2821.400**  ||         6280.963  ||        7117.558
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5         12.267  ||         9.988**         10.063**  ||           12.667  ||          13.786
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5   16430666.170  ||  16284869.881     16504292.796    ||     15826734.130  ||    16052912.021
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5     153431.778  ||    158892.599       157288.399    ||       153065.062  ||      155336.587
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5        150.061  ||       159.481          149.546    ||          153.177  ||         152.810
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5      16899.056  ||     16111.396        16532.411    ||        14405.399  ||       15836.574
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5       7262.566  ||      5742.547**       5504.293**  ||         7081.180  ||        8112.809
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5         13.520  ||        10.364**          9.954**  ||           12.881  ||          14.937


^MpscLinkedQueue7 => This could be used since observeOn supports single-consumer. Currently unbounded and needs to be bounded.
^SpscLinkedQueue7 => This could be used since observeOn supports single-producer, single-consumer. Currently unbounded and needs to be bounded.

JMC while running merge tests for 60 seconds:

screen shot 2014-11-29 at 8 47 43 pm

We can't use these queues as-is since they are single-consumer and we need multi-consumer for merge. Also, observeOn is showing some test failures.

However, the results do suggest that we can get a data structure to achieve performance without object pooling.

@benjchristensen
Copy link
Member Author

Here is an example of what we don't want as far as object-allocation.

observeOn using JCTools linked list (good):

observeon-jctools-linkedlist

observeOn using JCTools array queue without pooling (bad):

observeon-jctools-arrayqueue

Note the massive object allocation.

@akarnokd
Copy link
Member

It's quite odd Spsc is failing since everything should be serialized, i.e., only 1 thread offering values to the internal queues at a time and only 1 thread collecting from all queues, right? Thread hopping should not affect Spsc. I can only speculate, but when I did some backpressure-related code lately, I often didn't get the consumption phase right at first: the case where upstream produces and downstream requests at the same time, both may end up in the drain loop. This is why I used the emitting flag in onBackpressureBlock or in backpressure-aware ReplaySubject.

@benjchristensen
Copy link
Member Author

Yes it should work and the other spsc/mpsc queue impls work so it suggests an issue with those impls (they aren't yet officially released).

@daschl
Copy link
Contributor

daschl commented Nov 30, 2014

@benjchristensen with which params are you running jmh + jfr?

@benjchristensen
Copy link
Member Author

I manually attach JFR after the test passes the warmup and hits the first iteration. I increase the integration time to some longer number (been doing 25 seconds) so it runs far longer than the 60 seconds I capture via JFR.

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 25 .*OperatorObserveOnPerf.*'

I start flight recording when the first iteration starts:

# Run progress: 0.00% complete, ETA 00:19:30
# Warmup: 5 iterations, 1 s each
# Measurement: 5 iterations, 25 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: rx.operators.OperatorObserveOnPerf.observeOnComputation
# Parameters: (size = 1)
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/bin/java
# VM options: -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Xmx512m -Dfile.encoding=UTF-8
# Fork: 1 of 1
# Warmup Iteration   1: 76564.389 ops/s
# Warmup Iteration   2: 105044.060 ops/s
# Warmup Iteration   3: 108520.068 ops/s
# Warmup Iteration   4: 108104.151 ops/s
# Warmup Iteration   5: 109615.544 ops/s
Iteration   1:

screen shot 2014-11-30 at 11 03 48 am

screen shot 2014-11-30 at 11 03 56 am

@benjchristensen
Copy link
Member Author

some unit tests failing with this that need to be debugged

Just a silly oversight on my part. These are unbounded queues and our use cases requires them to be bounded. Thus, the offer never returns false so we never hit the backpressure exception:

        if (!queue.offer(on.next(o))) {
            throw new MissingBackpressureException();
        }

If we use one of these unbounded queues we'll need to add the overhead of tracking the size, or modify the queue implementation to correctly do that for us.

@benjchristensen
Copy link
Member Author

ConcurrentLinkedQueue takes a performance hit compared with the JCTools implementations for observeOn:

ObserveOn

Benchmark                                         (size)  SpmcArrayQueue  ||   ConcurrentLinkedQueue
r.o.OperatorObserveOnPerf.observeOnComputation         1      109192.288  ||              110221.988
r.o.OperatorObserveOnPerf.observeOnComputation      1000        6952.955  ||                6016.298**
r.o.OperatorObserveOnPerf.observeOnComputation   1000000          12.267  ||                  10.748**
r.o.OperatorObserveOnPerf.observeOnImmediate           1    16430666.170  ||            16245304.118
r.o.OperatorObserveOnPerf.observeOnImmediate        1000      153431.778  ||              156059.398
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000         150.061  ||                 155.667
r.o.OperatorObserveOnPerf.observeOnNewThread           1       16899.056  ||               16700.144
r.o.OperatorObserveOnPerf.observeOnNewThread        1000        7262.566  ||                6813.449**
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000          13.520  ||                  10.655**

It's okay for merge:

Benchmark                                          (size)  SpmcArrayQueue  ||  MpscLinkedQueue7^  ||  ConcurrentLinkedQueue
r.o.OperatorMergePerf.merge1SyncStreamOfN               1     5448592.507  ||       5280793.113   ||            5292601.594
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000       54425.611  ||         51982.505   ||              51990.637
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000          52.477  ||            59.971   ||                 59.967
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1       92792.488  ||        104769.406   ||             103669.636
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000           4.957  ||             4.750   ||                  4.074
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1     4608856.070  ||       4274735.897   ||            4360997.158
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100      431920.261  ||        440251.858   ||             438544.154
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000       52309.410  ||         49188.282   ||              49382.440
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1     5815289.623  ||       5576949.872   ||            5577426.641
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000          52.525  ||            51.102   ||                 49.899
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1       77640.706  ||         75433.006   ||              78623.524
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000        3000.748  ||          2881.911   ||               2696.428
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1     5397752.619  ||       5445358.040   ||            5346115.873
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000       29257.005  ||         34305.563   ||              34051.746
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000          35.506  ||            33.594   ||                 35.157

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Nov 30, 2014
@benjchristensen
Copy link
Member Author

I modified RxRingBuffer to use its own counter for bounding the links lists and am now using SpscLinkedQueue

The only unit test failing is RxRingBufferSpmcTest.testConcurrency which is a valid failure since I am only using an spsc queue.

Scroll to the right to see the SpscLinkedQueue+Size values:

Merge

Benchmark                                          (size)  SpmcArrayQueue  ||     Synchronized LinkedList    ||  MpscLinkedQueue7^  ||  ConcurrentLinkedQueue  || SpscLinkedQueue+Size
r.o.OperatorMergePerf.merge1SyncStreamOfN               1     5448592.507  ||  5332264.735    5358807.217    ||       5280793.113   ||            5292601.594  ||          5332726.909
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000       54425.611  ||    52385.763      52409.485    ||         51982.505   ||              51990.637  ||            49268.224
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000          52.477  ||       53.928         54.128    ||            59.971   ||                 59.967  ||               50.742
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1       92792.488  ||   103554.112     104711.192    ||        104769.406   ||             103669.636  ||           104210.342
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000           4.957  ||        4.194          4.143    ||             4.750   ||                  4.074  ||                4.688
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1     4608856.070  ||  4294087.613    4688656.691    ||       4274735.897   ||            4360997.158  ||          4888816.598
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100      431920.261  ||   451007.165     333391.274**  ||        440251.858   ||             438544.154  ||           445969.294
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000       52309.410  ||    49952.664      50057.346    ||         49188.282   ||              49382.440  ||            49817.351
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1     5815289.623  ||  5558593.305    5575812.189    ||       5576949.872   ||            5577426.641  ||          5707051.084
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000          52.525  ||       49.788         49.567    ||            51.102   ||                 49.899  ||               43.819**
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1       77640.706  ||    79530.128      79472.088    ||         75433.006   ||              78623.524  ||            78480.286
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000        3000.748  ||     2358.716**     2397.035**  ||          2881.911   ||               2696.428  ||             2627.181*
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1     5397752.619  ||  5282943.171    5361848.221    ||       5445358.040   ||            5346115.873  ||          5352179.516
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000       29257.005  ||    34150.793      35196.066    ||         34305.563   ||              34051.746  ||            35081.657
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000          35.506  ||       25.827**       26.517**  ||            33.594   ||                 35.157  ||               35.378


^MpscLinkedQueue7 => This can be used by merge and observeOn.  Currently unbounded and needs to be bounded.



ObserveOn


Benchmark                                         (size)  SpmcArrayQueue  ||        Synchronized LinkedList    || MpscLinkedQueue7^ || SpscLinkedQueue^  || ConcurrentLinkedQueue   || SpscLinkedQueue+Size
r.o.OperatorObserveOnPerf.observeOnComputation         1      109192.288  ||    111037.202       110112.152    ||       106683.693  ||      109647.796   ||            110221.988   ||           105952.032
r.o.OperatorObserveOnPerf.observeOnComputation      1000        6952.955  ||      2846.331**       2821.400**  ||         6280.963  ||        7117.558   ||              6016.298** ||             6615.249
r.o.OperatorObserveOnPerf.observeOnComputation   1000000          12.267  ||         9.988**         10.063**  ||           12.667  ||          13.786   ||                10.748** ||               11.381
r.o.OperatorObserveOnPerf.observeOnImmediate           1    16430666.170  ||  16284869.881     16504292.796    ||     15826734.130  ||    16052912.021   ||          16245304.118   ||         16294413.530
r.o.OperatorObserveOnPerf.observeOnImmediate        1000      153431.778  ||    158892.599       157288.399    ||       153065.062  ||      155336.587   ||            156059.398   ||           157113.031
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000         150.061  ||       159.481          149.546    ||          153.177  ||         152.810   ||               155.667   ||              157.675
r.o.OperatorObserveOnPerf.observeOnNewThread           1       16899.056  ||     16111.396        16532.411    ||        14405.399  ||       15836.574   ||             16700.144   ||            15718.673
r.o.OperatorObserveOnPerf.observeOnNewThread        1000        7262.566  ||      5742.547**       5504.293**  ||         7081.180  ||        8112.809   ||              6813.449** ||             7248.917
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000          13.520  ||        10.364**          9.954**  ||           12.881  ||          14.937   ||                10.655** ||               12.344


^MpscLinkedQueue7 => This could be used since observeOn supports single-consumer. Currently unbounded and needs to be bounded.
^SpscLinkedQueue7 => This could be used since observeOn supports single-producer, single-consumer. Currently unbounded and needs to be bounded.

observeOn Flight Recorder

1.0.1

observeon-1 0 1

Modified to use SPSC Linked List

observeon-spsc

merge Flight Recorder

1.0.1

merge-1 0 1

Modified to use SPSC Linked List

merge-spsc

The flight recorder stuff, particularly for merge needs more evaluation.

It does seem though that the trade-offs in performance and object allocation are not severe and thus probably warrants the change to allow avoiding object pooling.

I'll submit a PR with the changes for evaluation and further experimentation.

@benjchristensen
Copy link
Member Author

PR #1909 contains the code for the previous perf tests.

@benjchristensen
Copy link
Member Author

A note while we're working on this ... whatever decision we make here I think should target 1.1 since it will affect memory and GC behavior so is larger than a patch.

I also want to give enough time to really think through this ... sleeping on whatever solutions we work on since this is very nuanced and the tradeoffs need to be thought through.

@benjchristensen
Copy link
Member Author

I've been exploring weak and phantom references tonight while exploring whether it's possible to continue using object pooling but without manual release. I have it working, and performance is good, but at high velocity on JMH tests the ReferenceQueue is filling up in an unbounded manner.

Anyone have good experience using WeakReferences for an object pool with high throughput?

I wish there was a better way to get a callback from the WeakReference than it being put into a ReferenceQueue.

@akarnokd
Copy link
Member

akarnokd commented Dec 5, 2014

The problem with pooling is that it can now become the congestion point: multiple threads might want to grab an element from the pool at the same time.

The alternative is reference counting:

public void onNext(Object o) {
   Queue<Object> q = queue;
   if (q != null && q.acquire()) { // i.e., cas 1 -> 2
      q.offer(o);
      q.release(); // i.e., decrementAndGet
   }
}
public void unsubscribe() {
   Queue<Object> q = queue;
   if (q != null) {
        queue = null;
        q.release();
   } 
}

Could be achieved via AtomicInteger. When borrowed, the queue starts out in 1. Use toggles this between 1 and 2. If an unsubscription happens, the party that goes from 1 to 0 will put the queue back into the pool. The drawback is that now you have extra bookkeeping which may halve the throughput in RxRingBufferPerf.

@benjchristensen
Copy link
Member Author

The problem with pooling is that it can now become the congestion point: multiple threads might want to grab an element from the pool at the same time.

Sure, but if it's a concurrent queue, which it is, then that contention should not be severe. It's using an MpmcArrayQueue optimized for performance.

I agree it is a point of contention, but if that contention is cheaper than the object allocation/collection then it's a net win.

reference counting

This could work but is effectively synchronizing unsubscribe and queue usage (cheaper than mutex, but still a form of synchronizing) which definitely has a cost, both performance and cognitive every time this queue is used. Hence us pursuing either a solution that doesn't need to be pooled, or my pursuit of weak references so the release to the pool is automated.

@akarnokd
Copy link
Member

akarnokd commented Dec 6, 2014

I think false sharing isn't really an issue by us because how te queus are used: the producer is likely to win the drain race and emit data, or the queue is practically full and only 1 element is put for each 1 element taken (request(1) in merge and observeOn) and the two sides are far away to each other and no false sharing happens. So an unpadded queue of 128 capacity takes up 1024+16+32 bytes nicely fitting in L1. The padded version takes 65k.

@benjchristensen
Copy link
Member Author

I have some some benchmarking on different solutions that have been posted, including removing the pooling altogether and here are the numbers. Note that we now have a queue size of 128, not 1024 like we used to.

Benchmark                                                                 (nextRuns)  (size)            1.x                   No Pool          2PhaseArrayQueue    branch rxqueue-spsc 
r.internal.IndexedRingBufferPerf.indexedRingBufferAdd                            N/A     100     288851.474  |    464395.527  161% ++  |    288585.686  100%    |    128859.372 45% --
r.internal.IndexedRingBufferPerf.indexedRingBufferAdd                            N/A   10000       1053.102  |      1122.418  107%     |       988.942  94%     |      1033.768 98%
r.internal.IndexedRingBufferPerf.indexedRingBufferAddRemove                      N/A     100     130035.705  |    219530.470  169% ++  |    129688.488  100%    |    122350.438 94%
r.internal.IndexedRingBufferPerf.indexedRingBufferAddRemove                      N/A   10000        394.109  |       408.761  104%     |       372.060  94%     |       382.935 97%
r.internal.RxRingBufferPerf.spmcCreateUseAndDestroy1                             N/A     N/A   26400373.503  |   4011739.109  15% --   |   3904652.414  15% --  |  10354119.555 39% --
r.internal.RxRingBufferPerf.spmcRingBufferAddRemove1                             N/A     N/A   41947289.772  |  40889056.084  97%      | 167128945.622  398% ++ |  39302492.472 94%
r.internal.RxRingBufferPerf.spscCreateUseAndDestroy1                             N/A     N/A   26625817.275  |   4041119.853  15% --   |  10543005.986  40% --  |   9844380.252 37% --
r.internal.RxRingBufferPerf.spscRingBufferAddRemove1                             N/A     N/A   42376557.425  |  39384117.892  93%      |  50678693.611  120% ++ |  40440515.140 95%
r.operators.OperatorFlatMapPerf.flatMapIntPassthruAsync                          N/A       1     323419.496  |    314150.913  97%      |    306704.864  95%     |     56413.217 17% --
r.operators.OperatorFlatMapPerf.flatMapIntPassthruAsync                          N/A    1000        325.685  |       320.644  98%      |       312.976  96%     |        75.440 23% --
r.operators.OperatorFlatMapPerf.flatMapIntPassthruAsync                          N/A 1000000          0.318  |         0.318  100%     |         0.311  98%     |         0.066 21% --
r.operators.OperatorFlatMapPerf.flatMapIntPassthruSync                           N/A       1    5544570.553  |   5634853.059  102%     |   5355739.382  97%     |   1276719.950 23% --
r.operators.OperatorFlatMapPerf.flatMapIntPassthruSync                           N/A    1000      37510.701  |     37428.777  100%     |     37971.972  101%    |      8511.690 23% --
r.operators.OperatorFlatMapPerf.flatMapIntPassthruSync                           N/A 1000000         32.831  |        33.133  101%     |        33.224  101%    |         7.466 23% --
r.operators.OperatorFlatMapPerf.flatMapTwoNestedSync                             N/A       1    4081082.521  |   3988577.555  98%      |   3855453.808  94%     |    892951.263 22% --
r.operators.OperatorFlatMapPerf.flatMapTwoNestedSync                             N/A    1000      26194.758  |     26060.788  99%      |     24783.878  95%     |      5014.843 19% -- 
r.operators.OperatorFlatMapPerf.flatMapTwoNestedSync                             N/A 1000000         24.255  |        24.638  102%     |        22.923  95%     |         5.212 21% --
r.operators.OperatorMapPerf.mapPassThru                                          N/A       1   24413615.697  |  24572501.107  101%     |  24135623.277  99%     |   5882139.319 24% --
r.operators.OperatorMapPerf.mapPassThru                                          N/A    1000     126022.367  |    126928.122  101%     |    121905.819  97%     |     31068.374 25% --
r.operators.OperatorMapPerf.mapPassThru                                          N/A 1000000        125.726  |       133.165  106%     |       125.238  100%    |        32.079 26% --
r.operators.OperatorMapPerf.mapPassThruViaLift                                   N/A       1   26830181.693  |  26855829.721  100%     |  26402665.260  98%     |   6974887.323 26% --
r.operators.OperatorMapPerf.mapPassThruViaLift                                   N/A    1000     126998.154  |    127883.467  101%     |    124275.543  98%     |     29430.411 23% --
r.operators.OperatorMapPerf.mapPassThruViaLift                                   N/A 1000000        132.249  |       132.858  100%     |       130.028  98%     |       105.182 80% --
r.operators.OperatorMergePerf.merge1SyncStreamOfN                                N/A       1    5004527.084  |   5096505.217  102%     |   4913306.706  98%     |   4565544.618 91%
r.operators.OperatorMergePerf.merge1SyncStreamOfN                                N/A    1000      52723.842  |     51944.534  99%      |     44783.449  85% --  |     38827.856 74% --
r.operators.OperatorMergePerf.merge1SyncStreamOfN                                N/A 1000000         49.928  |        49.643  99%      |        47.285  95%     |        42.332 85% --
r.operators.OperatorMergePerf.mergeNAsyncStreamsOfN                              N/A       1      99724.243  |     96046.578  96%      |     97233.505  98%     |     98970.453 99%
r.operators.OperatorMergePerf.mergeNAsyncStreamsOfN                              N/A    1000          4.661  |         5.085  109%     |         4.688  101%    |         4.727 101%
r.operators.OperatorMergePerf.mergeNSyncStreamsOf1                               N/A       1    4543776.401  |   4562256.139  100%     |   4626141.577  102%    |   4550596.577 100%
r.operators.OperatorMergePerf.mergeNSyncStreamsOf1                               N/A     100     482015.536  |    481347.669  100%     |    477687.006  99%     |    474884.635 99%
r.operators.OperatorMergePerf.mergeNSyncStreamsOf1                               N/A    1000      52102.762  |     52212.844  100%     |     50536.678  97%     |     55255.898 106%
r.operators.OperatorMergePerf.mergeNSyncStreamsOfN                               N/A       1    5182887.347  |   4926496.569  95%      |   5160628.853  100%    |   5043255.904 97%
r.operators.OperatorMergePerf.mergeNSyncStreamsOfN                               N/A    1000         52.055  |        51.914  100%     |        48.084  92%     |        46.567 89% --
r.operators.OperatorMergePerf.mergeTwoAsyncStreamsOfN                            N/A       1      76604.876  |     74224.276  97%      |     71937.709  94%     |     77366.844 101%
r.operators.OperatorMergePerf.mergeTwoAsyncStreamsOfN                            N/A    1000       3166.341  |      3130.021  99%      |      3295.517  104%    |      2541.420 80% --
r.operators.OperatorMergePerf.oneStreamOfNthatMergesIn1                          N/A       1    5276030.387  |   5048453.291  96%      |   4806177.849  91%     |   4935148.406 94%
r.operators.OperatorMergePerf.oneStreamOfNthatMergesIn1                          N/A    1000      37197.231  |     37647.410  101%     |     38406.161  103%    |     36161.836 97%
r.operators.OperatorMergePerf.oneStreamOfNthatMergesIn1                          N/A 1000000         34.649  |        34.124  98%      |        34.025  98%     |        32.754 95%
r.operators.OperatorObserveOnPerf.observeOnComputation                           N/A       1     107207.970  |    100380.298  94%      |    108474.884  101%    |    105535.982 98%
r.operators.OperatorObserveOnPerf.observeOnComputation                           N/A    1000       6517.902  |      6869.709  105%     |      6371.416  98%     |      6301.948 97%
r.operators.OperatorObserveOnPerf.observeOnComputation                           N/A 1000000         13.670  |        13.580  99%      |        11.346  83% --  |        11.305 83% --
r.operators.OperatorObserveOnPerf.observeOnImmediate                             N/A       1   15245087.895  |  14962479.384  98%      |  14997783.368  98%     |  14673057.780 96%
r.operators.OperatorObserveOnPerf.observeOnImmediate                             N/A    1000     178513.653  |    173023.234  97%      |    170021.413  95%     |    164929.922 92%
r.operators.OperatorObserveOnPerf.observeOnImmediate                             N/A 1000000        157.016  |       152.469  97%      |       154.095  98%     |       144.408 92%
r.operators.OperatorObserveOnPerf.observeOnNewThread                             N/A       1      16879.424  |     15716.151  93%      |     15426.649  91%     |     14689.995 87% --
r.operators.OperatorObserveOnPerf.observeOnNewThread                             N/A    1000       7783.654  |      7765.427  100%     |      6734.649  87%     |      7173.622 92%
r.operators.OperatorObserveOnPerf.observeOnNewThread                             N/A 1000000         13.153  |        14.191  108%     |        11.650  89%     |        11.275 86% --
r.operators.OperatorRangePerf.rangeWithBackpressureRequest                       N/A       1   25332343.625  |  25065988.234  99%      |  24342054.885  96%     |  25262395.598 100%
r.operators.OperatorRangePerf.rangeWithBackpressureRequest                       N/A    1000     116819.629  |    116739.463  100%     |    113936.110  98%     |    115857.556 99%
r.operators.OperatorRangePerf.rangeWithBackpressureRequest                       N/A 1000000        135.843  |       134.416  99%      |       130.710  96%     |       134.943 99%
r.operators.OperatorRangePerf.rangeWithoutBackpressure                           N/A       1  121325903.567  | 118002649.957  97%      | 112020450.304  92%     | 104636361.664 86% --
r.operators.OperatorRangePerf.rangeWithoutBackpressure                           N/A    1000     186533.260  |    187334.419  100%     |    185695.013  100%    |    179519.340 96%
r.operators.OperatorRangePerf.rangeWithoutBackpressure                           N/A 1000000        166.914  |       169.448  102%     |       160.806  96%     |       158.203 95%
r.operators.OperatorSerializePerf.noSerializationSingleThreaded                  N/A       1   18921314.555  |  18257594.271  96%      |  18853324.397  100%    |  18123044.637 96%
r.operators.OperatorSerializePerf.noSerializationSingleThreaded                  N/A    1000     269546.032  |    228423.425  85% --   |    266431.946  99%     |    248808.733 92%
r.operators.OperatorSerializePerf.noSerializationSingleThreaded                  N/A 1000000        260.990  |       254.223  97%      |       253.943  97%     |       235.045 90% --
r.operators.OperatorSerializePerf.serializedSingleStream                         N/A       1   10736813.240  |  10037561.783  93%      |   9935098.948  93%     |   9663588.470 90% --
r.operators.OperatorSerializePerf.serializedSingleStream                         N/A    1000      79767.255  |     67068.394  84% --   |     66532.316  83% --  |     64151.866 80% --
r.operators.OperatorSerializePerf.serializedSingleStream                         N/A 1000000         70.840  |        75.120  106%     |        73.776  104%    |        72.247 102%
r.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended            N/A       1     111389.498  |    110210.985  99%      |    110774.637  99%     |    106648.746 96%
r.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended            N/A    1000       7575.618  |      7207.681  95%      |      7189.025  95%     |      7175.698 95%
r.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended            N/A 1000000          9.196  |         9.864  107%     |         8.714  95%     |         9.383 102%
r.operators.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow             N/A       1      83396.077  |     78874.484  95%      |     81866.626  98%     |     78901.924 95%
r.operators.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow             N/A    1000      36273.839  |     34303.562  95%      |     35859.978  99%     |     34513.093 95%
r.operators.OperatorSerializePerf.serializedTwoStreamsSlightlyContended          N/A       1      55607.010  |     53946.317  97%      |     55332.808  100%    |     51419.088 92%
r.operators.OperatorSerializePerf.serializedTwoStreamsSlightlyContended          N/A    1000      53468.342  |     53613.934  100%     |     53089.878  99%     |     52291.141 98%
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A       1     107083.283  |    106172.060  99%      |    109756.005  102%    |    108969.564 102%
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A      10     103095.692  |    102757.452  100%     |    102707.186  100%    |     99579.238 97%
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A     100      37921.038  |     39183.046  103%     |     34085.675  90%     |     32331.879 85% --
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    1000       6776.178  |      7015.656  104%     |      6289.376  93%     |      6384.900 94%
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    2000       4851.628  |      4828.700  100%     |      4212.292  87%     |      4354.948 90% --
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    3000       3754.394  |      3778.104  101%     |      2961.396  79%     |      3207.050 85% --
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    4000       2994.907  |      2949.000  98%      |      2489.455  83% --  |      2562.348 86% --
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A   10000       1314.721  |      1275.489  97%      |      1113.610  85% --  |      1102.961 84% --
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A  100000        136.838  |       132.305  97%      |       115.854  85% --  |       117.097 86% --
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A 1000000         13.837  |        12.820  93%      |        11.543  83% --  |        10.903 79% --
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A       1     113032.129  |    112135.558  99%      |    111731.222  99%     |    111749.315 99%
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A      10     109884.718  |    109449.819  100%     |    102844.370  94%     |    109021.256 99%
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A     100      89945.753  |     88015.871  98%      |     85771.952  95%     |     88074.845 98%
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    1000      31680.733  |     31522.878  100%     |     31045.429  98%     |     31413.322 99%
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    2000      18490.513  |     18352.634  99%      |     15774.641  85% --  |     16059.751 87% --
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    3000      11490.479  |     11446.241  100%     |     11197.939  97%     |     12871.202 112% ++
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    4000       8833.117  |     10230.066  116% ++  |      9788.883  111%    |      9889.138 112% ++
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A   10000       3888.874  |      3885.651  100%     |      3919.543  101%    |      3903.835 100%
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A  100000        446.789  |       438.143  98%      |       434.775  97%     |       432.321 97%
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A 1000000         43.228  |        42.516  98%      |        42.588  99%     |        41.833 97%
r.schedulers.IOSchedulerPerf.observeOn                                           N/A       1     107443.954  |    103885.976  97%      |    106696.574  99%     |    105420.129 98%
r.schedulers.IOSchedulerPerf.observeOn                                           N/A    1000       6835.942  |      6934.710  101%     |      6175.613  90%     |      6257.461 92%
r.schedulers.IOSchedulerPerf.observeOn                                           N/A 1000000         13.804  |        13.076  95%      |        11.489  83% --  |        11.646 84% --
r.schedulers.IOSchedulerPerf.subscribeOn                                         N/A       1     113589.713  |    112548.761  99%      |    113767.664  100%    |    112749.541 99%
r.schedulers.IOSchedulerPerf.subscribeOn                                         N/A    1000      31572.535  |     31359.484  99%      |     31566.874  100%    |     28903.326 92%
r.schedulers.IOSchedulerPerf.subscribeOn                                         N/A 1000000         43.469  |        43.259  100%     |        43.075  99%     |        42.883 99%
r.subjects.ReplaySubjectPerf.subscribeAfterEventsCount1                            1     N/A    4885450.713  |   4663855.111  95%      |   4760593.357  97%     |   4814097.168 99%
r.subjects.ReplaySubjectPerf.subscribeAfterEventsCount1                         1000     N/A      36567.449  |     34794.653  95%      |     30965.582  85% --  |     35307.030 97%
r.subjects.ReplaySubjectPerf.subscribeAfterEventsCount1                      1000000     N/A         46.758  |        44.710  96%      |        45.807  98%     |        45.132 97%
r.subjects.ReplaySubjectPerf.subscribeAfterEventsUnbounded                         1     N/A    5166859.073  |   4928418.828  95%      |   4975843.203  96%     |   4971964.310 96%
r.subjects.ReplaySubjectPerf.subscribeAfterEventsUnbounded                      1000     N/A      36677.914  |     35071.647  96%      |     34172.806  93%     |     36402.484 99%
r.subjects.ReplaySubjectPerf.subscribeAfterEventsUnbounded                   1000000     N/A         45.299  |        43.800  97%      |        44.580  98%     |        44.281 98%
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsCount1                           1     N/A    3602234.043  |   3467543.784  96%      |   3555240.202  99%     |   3590503.433 100%
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsCount1                        1000     N/A      47750.855  |     46718.061  98%      |     46229.640  97%     |     47580.266 100%
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsCount1                     1000000     N/A         42.764  |        47.144  110% ++  |        45.690  107%    |        42.126 99%
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsUnbounded                        1     N/A    3787851.518  |   3637162.254  96%      |   3527627.269  93%     |   3761304.213 99%
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsUnbounded                     1000     N/A      43472.144  |     46173.174  106%     |     41008.079  94%     |     46078.493 106%
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsUnbounded                  1000000     N/A         42.057  |        41.150  98%      |        40.454  96%     |        40.994 97%

@benjchristensen
Copy link
Member Author

Here is a comparison of no pooling with size at 128 vs 1024:

Benchmark                                                                 (nextRuns)  (size)            1.x              No Pool (128)    No Pool (1024)     
r.internal.IndexedRingBufferPerf.indexedRingBufferAdd                            N/A     100     288851.474  |    464395.527  161% ++  |    491662.797  170% ++ 
r.internal.IndexedRingBufferPerf.indexedRingBufferAdd                            N/A   10000       1053.102  |      1122.418  107%     |      1201.884  114% ++ 
r.internal.IndexedRingBufferPerf.indexedRingBufferAddRemove                      N/A     100     130035.705  |    219530.470  169% ++  |    230427.274  177% ++ 
r.internal.IndexedRingBufferPerf.indexedRingBufferAddRemove                      N/A   10000        394.109  |       408.761  104%     |       430.168  109%    
r.internal.RxRingBufferPerf.spmcCreateUseAndDestroy1                             N/A     N/A   26400373.503  |   4011739.109  15% --   |   2066095.594  8%   -- 
r.internal.RxRingBufferPerf.spmcRingBufferAddRemove1                             N/A     N/A   41947289.772  |  40889056.084  97%      |  42517680.638  101%    
r.internal.RxRingBufferPerf.spscCreateUseAndDestroy1                             N/A     N/A   26625817.275  |   4041119.853  15% --   |   2065904.299  8%   -- 
r.internal.RxRingBufferPerf.spscRingBufferAddRemove1                             N/A     N/A   42376557.425  |  39384117.892  93%      |  42717826.799  101%    
r.operators.OperatorFlatMapPerf.flatMapIntPassthruAsync                          N/A       1     323419.496  |    314150.913  97%      |    286454.266  89%  -- 
r.operators.OperatorFlatMapPerf.flatMapIntPassthruAsync                          N/A    1000        325.685  |       320.644  98%      |       304.203  93%     
r.operators.OperatorFlatMapPerf.flatMapIntPassthruAsync                          N/A 1000000          0.318  |         0.318  100%     |         0.305  96%     
r.operators.OperatorFlatMapPerf.flatMapIntPassthruSync                           N/A       1    5544570.553  |   5634853.059  102%     |   5703716.144  103%    
r.operators.OperatorFlatMapPerf.flatMapIntPassthruSync                           N/A    1000      37510.701  |     37428.777  100%     |     37643.924  100%    
r.operators.OperatorFlatMapPerf.flatMapIntPassthruSync                           N/A 1000000         32.831  |        33.133  101%     |        33.955  103%    
r.operators.OperatorFlatMapPerf.flatMapTwoNestedSync                             N/A       1    4081082.521  |   3988577.555  98%      |   4148571.600  102%    
r.operators.OperatorFlatMapPerf.flatMapTwoNestedSync                             N/A    1000      26194.758  |     26060.788  99%      |     27231.215  104%    
r.operators.OperatorFlatMapPerf.flatMapTwoNestedSync                             N/A 1000000         24.255  |        24.638  102%     |        26.108  108%    
r.operators.OperatorMapPerf.mapPassThru                                          N/A       1   24413615.697  |  24572501.107  101%     |  25033228.781  103%    
r.operators.OperatorMapPerf.mapPassThru                                          N/A    1000     126022.367  |    126928.122  101%     |    131846.869  105%    
r.operators.OperatorMapPerf.mapPassThru                                          N/A 1000000        125.726  |       133.165  106%     |       129.763  103%    
r.operators.OperatorMapPerf.mapPassThruViaLift                                   N/A       1   26830181.693  |  26855829.721  100%     |  27593572.229  103%    
r.operators.OperatorMapPerf.mapPassThruViaLift                                   N/A    1000     126998.154  |    127883.467  101%     |    128981.592  102%    
r.operators.OperatorMapPerf.mapPassThruViaLift                                   N/A 1000000        132.249  |       132.858  100%     |       130.398  99%     
r.operators.OperatorMergePerf.merge1SyncStreamOfN                                N/A       1    5004527.084  |   5096505.217  102%     |   5198752.240  104%    
r.operators.OperatorMergePerf.merge1SyncStreamOfN                                N/A    1000      52723.842  |     51944.534  99%      |     54578.616  104%    
r.operators.OperatorMergePerf.merge1SyncStreamOfN                                N/A 1000000         49.928  |        49.643  99%      |        46.496  93%     
r.operators.OperatorMergePerf.mergeNAsyncStreamsOfN                              N/A       1      99724.243  |     96046.578  96%      |     92329.887  93%     
r.operators.OperatorMergePerf.mergeNAsyncStreamsOfN                              N/A    1000          4.661  |         5.085  109%     |         4.858  104%    
r.operators.OperatorMergePerf.mergeNSyncStreamsOf1                               N/A       1    4543776.401  |   4562256.139  100%     |   4868734.621  107%    
r.operators.OperatorMergePerf.mergeNSyncStreamsOf1                               N/A     100     482015.536  |    481347.669  100%     |    474865.976  99%     
r.operators.OperatorMergePerf.mergeNSyncStreamsOf1                               N/A    1000      52102.762  |     52212.844  100%     |     41445.112  80%  -- 
r.operators.OperatorMergePerf.mergeNSyncStreamsOfN                               N/A       1    5182887.347  |   4926496.569  95%      |   5233708.963  101%    
r.operators.OperatorMergePerf.mergeNSyncStreamsOfN                               N/A    1000         52.055  |        51.914  100%     |        52.875  102%    
r.operators.OperatorMergePerf.mergeTwoAsyncStreamsOfN                            N/A       1      76604.876  |     74224.276  97%      |     70830.676  92%     
r.operators.OperatorMergePerf.mergeTwoAsyncStreamsOfN                            N/A    1000       3166.341  |      3130.021  99%      |      3684.324  116% ++ 
r.operators.OperatorMergePerf.oneStreamOfNthatMergesIn1                          N/A       1    5276030.387  |   5048453.291  96%      |   4977866.142  94%     
r.operators.OperatorMergePerf.oneStreamOfNthatMergesIn1                          N/A    1000      37197.231  |     37647.410  101%     |     39342.369  106%    
r.operators.OperatorMergePerf.oneStreamOfNthatMergesIn1                          N/A 1000000         34.649  |        34.124  98%      |        34.793  100%    
r.operators.OperatorObserveOnPerf.observeOnComputation                           N/A       1     107207.970  |    100380.298  94%      |    103951.076  97%     
r.operators.OperatorObserveOnPerf.observeOnComputation                           N/A    1000       6517.902  |      6869.709  105%     |      4597.071  71%  -- 
r.operators.OperatorObserveOnPerf.observeOnComputation                           N/A 1000000         13.670  |        13.580  99%      |         8.769  64%  -- 
r.operators.OperatorObserveOnPerf.observeOnImmediate                             N/A       1   15245087.895  |  14962479.384  98%      |  15246618.567  100%    
r.operators.OperatorObserveOnPerf.observeOnImmediate                             N/A    1000     178513.653  |    173023.234  97%      |    175349.312  98%     
r.operators.OperatorObserveOnPerf.observeOnImmediate                             N/A 1000000        157.016  |       152.469  97%      |       158.773  101%    
r.operators.OperatorObserveOnPerf.observeOnNewThread                             N/A       1      16879.424  |     15716.151  93%      |     15436.938  91%     
r.operators.OperatorObserveOnPerf.observeOnNewThread                             N/A    1000       7783.654  |      7765.427  100%     |      4088.303  53%  -- 
r.operators.OperatorObserveOnPerf.observeOnNewThread                             N/A 1000000         13.153  |        14.191  108%     |        14.327  109%    
r.operators.OperatorRangePerf.rangeWithBackpressureRequest                       N/A       1   25332343.625  |  25065988.234  99%      |  25389267.172  100%    
r.operators.OperatorRangePerf.rangeWithBackpressureRequest                       N/A    1000     116819.629  |    116739.463  100%     |    115639.590  99%     
r.operators.OperatorRangePerf.rangeWithBackpressureRequest                       N/A 1000000        135.843  |       134.416  99%      |       135.049  99%     
r.operators.OperatorRangePerf.rangeWithoutBackpressure                           N/A       1  121325903.567  | 118002649.957  97%      | 117809961.937  97%     
r.operators.OperatorRangePerf.rangeWithoutBackpressure                           N/A    1000     186533.260  |    187334.419  100%     |    189459.713  102%    
r.operators.OperatorRangePerf.rangeWithoutBackpressure                           N/A 1000000        166.914  |       169.448  102%     |       167.025  100%    
r.operators.OperatorSerializePerf.noSerializationSingleThreaded                  N/A       1   18921314.555  |  18257594.271  96%      |  19336558.885  102%    
r.operators.OperatorSerializePerf.noSerializationSingleThreaded                  N/A    1000     269546.032  |    228423.425  85% --   |    272078.328  101%    
r.operators.OperatorSerializePerf.noSerializationSingleThreaded                  N/A 1000000        260.990  |       254.223  97%      |       263.143  101%    
r.operators.OperatorSerializePerf.serializedSingleStream                         N/A       1   10736813.240  |  10037561.783  93%      |  10329940.205  96%     
r.operators.OperatorSerializePerf.serializedSingleStream                         N/A    1000      79767.255  |     67068.394  84% --   |     82774.032  104%    
r.operators.OperatorSerializePerf.serializedSingleStream                         N/A 1000000         70.840  |        75.120  106%     |        73.439  104%    
r.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended            N/A       1     111389.498  |    110210.985  99%      |    114315.595  103%    
r.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended            N/A    1000       7575.618  |      7207.681  95%      |      6969.878  92%     
r.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended            N/A 1000000          9.196  |         9.864  107%     |        10.363  113% ++ 
r.operators.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow             N/A       1      83396.077  |     78874.484  95%      |     83311.153  100%    
r.operators.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow             N/A    1000      36273.839  |     34303.562  95%      |     35931.364  99%     
r.operators.OperatorSerializePerf.serializedTwoStreamsSlightlyContended          N/A       1      55607.010  |     53946.317  97%      |     53502.884  96%     
r.operators.OperatorSerializePerf.serializedTwoStreamsSlightlyContended          N/A    1000      53468.342  |     53613.934  100%     |     52365.494  98%     
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A       1     107083.283  |    106172.060  99%      |    103457.291  97%     
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A      10     103095.692  |    102757.452  100%     |     98744.249  96%     
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A     100      37921.038  |     39183.046  103%     |     38418.723  101%    
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    1000       6776.178  |      7015.656  104%     |      4530.745  67%  -- 
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    2000       4851.628  |      4828.700  100%     |      2085.208  43%  -- 
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    3000       3754.394  |      3778.104  101%     |      1310.303  35%  -- 
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A    4000       2994.907  |      2949.000  98%      |      1008.669  34%  -- 
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A   10000       1314.721  |      1275.489  97%      |       399.229  30%  -- 
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A  100000        136.838  |       132.305  97%      |        50.999  37%  -- 
r.schedulers.ComputationSchedulerPerf.observeOn                                  N/A 1000000         13.837  |        12.820  93%      |         9.045  65%  -- 
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A       1     113032.129  |    112135.558  99%      |    113835.591  101%    
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A      10     109884.718  |    109449.819  100%     |    109171.798  99%     
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A     100      89945.753  |     88015.871  98%      |     89399.541  99%     
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    1000      31680.733  |     31522.878  100%     |     31588.465  100%    
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    2000      18490.513  |     18352.634  99%      |     18429.705  100%    
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    3000      11490.479  |     11446.241  100%     |     11472.235  100%    
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A    4000       8833.117  |     10230.066  116% ++  |      9979.643  113% ++ 
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A   10000       3888.874  |      3885.651  100%     |      3517.361  90%     
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A  100000        446.789  |       438.143  98%      |       456.528  102%    
r.schedulers.ComputationSchedulerPerf.subscribeOn                                N/A 1000000         43.228  |        42.516  98%      |        44.854  104%    
r.schedulers.IOSchedulerPerf.observeOn                                           N/A       1     107443.954  |    103885.976  97%      |    107011.170  100%    
r.schedulers.IOSchedulerPerf.observeOn                                           N/A    1000       6835.942  |      6934.710  101%     |      4307.395  63%  -- 
r.schedulers.IOSchedulerPerf.observeOn                                           N/A 1000000         13.804  |        13.076  95%      |        10.003  72%  -- 
r.schedulers.IOSchedulerPerf.subscribeOn                                         N/A       1     113589.713  |    112548.761  99%      |    112666.557  99%     
r.schedulers.IOSchedulerPerf.subscribeOn                                         N/A    1000      31572.535  |     31359.484  99%      |     31608.746  100%    
r.schedulers.IOSchedulerPerf.subscribeOn                                         N/A 1000000         43.469  |        43.259  100%     |        43.581  100%    
r.subjects.ReplaySubjectPerf.subscribeAfterEventsCount1                            1     N/A    4885450.713  |   4663855.111  95%      |   4769133.206  98%     
r.subjects.ReplaySubjectPerf.subscribeAfterEventsCount1                         1000     N/A      36567.449  |     34794.653  95%      |     36690.199  100%    
r.subjects.ReplaySubjectPerf.subscribeAfterEventsCount1                      1000000     N/A         46.758  |        44.710  96%      |        49.111  105%    
r.subjects.ReplaySubjectPerf.subscribeAfterEventsUnbounded                         1     N/A    5166859.073  |   4928418.828  95%      |   5233752.980  101%    
r.subjects.ReplaySubjectPerf.subscribeAfterEventsUnbounded                      1000     N/A      36677.914  |     35071.647  96%      |     37576.169  102%    
r.subjects.ReplaySubjectPerf.subscribeAfterEventsUnbounded                   1000000     N/A         45.299  |        43.800  97%      |        46.962  104%    
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsCount1                           1     N/A    3602234.043  |   3467543.784  96%      |   3700116.809  103%    
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsCount1                        1000     N/A      47750.855  |     46718.061  98%      |     47680.041  100%    
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsCount1                     1000000     N/A         42.764  |        47.144  110% ++  |        48.698  114% ++ 
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsUnbounded                        1     N/A    3787851.518  |   3637162.254  96%      |   3705592.692  98%     
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsUnbounded                     1000     N/A      43472.144  |     46173.174  106%     |     46912.710  108%    
r.subjects.ReplaySubjectPerf.subscribeBeforeEventsUnbounded                  1000000     N/A         42.057  |        41.150  98%      |        46.833  111% ++ 

The drop from 1024 to 128 makes it work okay with no pooling. At 1024 we needed the pooling.

@benjchristensen
Copy link
Member Author

Any of you able to check performance and object allocation behavior using #1944 so it's not just my judgement on this? Is removing the object pool okay while using SpscArrayQueue as long as we stay at size 128?

Can you get WeakReference pooling to work better than the "no pooling" performance?

@benjchristensen
Copy link
Member Author

@akarnokd based on the perf numbers above what do you think we should do?

@akarnokd
Copy link
Member

The no-pool 128 version seems to be the best choice generally. Maybe the low spscCreateUseDestroy1 case can be improved by removing the pad and complex hierarchy from the main class.

@benjchristensen
Copy link
Member Author

Should we move forward with this change in 1.0.x or should it be 1.1? I have not seen any evidence in my testing to suggest that this is a significant change in GC behavior ... which surprised me and is why I'm hesitant and want confirmation.

@benjchristensen
Copy link
Member Author

By the way, nothing we're doing here should affect Android since we use a LinkedList without pooling on Android.

@JakeWharton
Copy link
Contributor

What versions of Android are supported? Deque/ArrayDeque are usually preferred if you are API 9 and up (which is Java 6, 99.5% of active Android devices).

@benjchristensen
Copy link
Member Author

We use the SpscRingBuffer for non-Android since it uses sun.misc.Unsafe and a synchronized LinkedList for Android. Is a synchronized ArrayDeque preferred for Android then? The max size for Android is set to 16 instead of 128. With LinkedList nothing is allocated until it is needed. If we use ArrayDeque would you want it set to 16 as the initial capacity or something like 2 and let it grow if needed?

@benjchristensen
Copy link
Member Author

Here is the size for Android: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L264 and the list impl choice when Unsafe is not available: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L324

The conditional check for unsafe happens here: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L44

If you want to make a change for Android would you like to submit a PR?

@benjchristensen
Copy link
Member Author

What versions of Android are supported?

We code to Java 6 and I believe it is Android API 9+ as per discussions in #153 and #1004

@JakeWharton
Copy link
Contributor

On Android we generally prefer the up-front allocation over lazy allocation of somewhat disposable wrapper objects (such as that inside of a LinkedList). This is for the benefit of comparatively poor handling of short lived objects as well as avoiding the GC as much as possible in our 60fps world. We also definitely wouldn't want to pay the cost of having the array expand unless it exceeded the 16 value.

@benjchristensen
Copy link
Member Author

It will never exceed 16 due to how the backpressure approach works, it will throw a MissingBackpressureException instead if the origin doesn't respect the requested amount.

Often the buffer is not needed, or it's only needed for a single value (using Observable like a Future) so on Android what is better, allocating a 16 size array and maybe only using 1 spot, or allocating linked nodes only as needed but potentially allocating more in a slow/contended stream of data?

Do you want to submit a PR with the changes done as you'd like them for Android?

@benjchristensen
Copy link
Member Author

I have added #1969 with Flight Recorder tests showing the impact of removing pooling. It's not catastrophic but it also shows signs of being a potential problem. This is exactly the type of metrics that pushed me to add object pooling. That said, because we've dropped from 1024 to 128 the impact is not anywhere near as bad as it was before.

@benjchristensen
Copy link
Member Author

This test is particularly bad: OperatorMergePerf.mergeNAsyncStreamsOfN

This suggests we can't eliminate pooling and use array queues.

With Pooling

screen shot 2014-12-13 at 11 22 24 am

Without

screen shot 2014-12-13 at 11 22 29 am

With Pooling

screen shot 2014-12-13 at 11 22 40 am

Without

screen shot 2014-12-13 at 11 22 45 am

@benjchristensen
Copy link
Member Author

I have submitted another variant in #2189

It makes different tradeoffs to allow for object pooling to work in most use cases, normal GC in edge cases (haven't found a consistent one yet, only theoretical, but I haven't tried hard yet), while maintaining more-or-less the same performance characteristics as the current 1.x code.

I have NOT battle-tested this and intend on sleeping on it then reviewing again, but wanted to post to trigger discussions and get a review on the direction and trade-offs.

@benjchristensen
Copy link
Member Author

I tried another implementation, this time with a WriterReaderPhaser in #2243. It performs similarly to the RWLock and WIP implementations and worse than #2189.

@benjchristensen
Copy link
Member Author

Problem definition:

  • The current approach to removing the object pool occasionally results in concurrency issues because concurrent emission/unsubscribe can result in 2 instances of RxRingBuffer holding a single queue.
  • Retaining a reference to the queue to avoid a NullPointerException results in the same problem of 2 RxRingBuffer instances holding a single queue.
  • In short, synchronization is needed between emission and unsubscribe in order to safely release the queue to the pool.

A refresher on what has been attempted so far while exploring this:

  • All attempts at removing the object pool have resulted in performance degradation via significant increases in object allocation and GC time. This was pursued since removing the pool would eliminate the race condition.
  • Use of WIP, mutex, RW lock, and WriterReaderPhraser make it thread-safe but at significant performance penalty in the normal happy-path (see performance numbers below)
  • Attempts with using WeakReference have failed so far. Every attempt I've made suggests that ReferenceQueue has non-trivial overhead and unfortunately it's the only way to get the "callback" via polling that something is released. It doesn't help much that it was written in Java 1.2 and is heavily synchronized. We can't implement our own as this code is special-cased by the JVM since it is involved in GC.

The behavior we're trying to deal with is:

  • Emissions are all serial from the same thread. Draining may happen on a second thread. It is SPSC in behavior.
  • Unsubscribe generally comes from the same thread, but does not have to, hence the possibly concurrency. This race happens only once, at the end.
  • It is okay if the unsubscribe/emission race is non-deterministic (finish emitting then unsubscribe or unsubscribe immediately and drop data).

Here are performance numbers of the various tests I've done:

Benchmark                                          (size)   Mode   Samples          1.x   Inline Volatile    + finalize       RW Lock          WIP     WRPhaser
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4757888.048       5264594.520   4956256.323   5288310.755  5032942.628  5147447.030
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    44877.618         42845.758     39209.439     25742.696    29025.955    27779.876
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       42.366            40.979        37.036        24.769       27.260       27.694
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    99981.127         99910.070     94307.080    103112.286   103176.289   100516.101
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.675             4.620         4.670         4.374        4.313        4.413
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4751265.583       4706441.452   4376983.062   4739418.557  4673633.614  4510099.724
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458704.984        480075.261    427165.143    483313.588   476318.407   462373.555
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    42493.290         42178.254     39640.240     42728.480    42542.171    41354.668
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5406448.872       5375090.752   5066264.570   5628401.294  4974892.417  4986054.668
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       44.528            40.990        41.106        24.974       28.212       27.755
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76898.222         72655.377     69748.305     78283.565    78987.646    78550.912
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3172.653          2955.854      3064.749      1858.361     2204.948     2310.804
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5157134.576       5163837.644   4846336.744   5290961.536  5139893.848  4486879.415
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    39961.491         39341.526     37312.117     40418.492    39163.267    37424.146
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.925            35.730        33.948        35.526       35.611       32.287

@benjchristensen
Copy link
Member Author

I'd like to move forward on something since we do currently have a correctness issue.

Unless there is some performance or functional issue I am not yet aware of, I suggest we move forward with #2189 since it seems to work while retaining performance, despite not being the most elegant of approaches.

Once we've merged and released to fix the correctness issue, other more elegant solutions can continue to be pursued. I'd love for something such as the WriterReaderPhaser to work and retain performance.

@benjchristensen
Copy link
Member Author

As a fun aside, and because this is happening over the holidays, here are some pages from a book I just read my girls the other day that totally made me think of this issue and many others like it :-)

screen shot 2014-12-27 at 10 30 00 pm

screen shot 2014-12-27 at 10 30 17 pm

screen shot 2014-12-27 at 10 30 27 pm

Book at http://www.amazon.com/Most-Magnificent-Thing-Ashley-Spires-ebook/dp/B00IZH626G

Spoiler ... she eventually succeeds, though still with a few imperfections :-)

@nitsanw
Copy link
Contributor

nitsanw commented Dec 28, 2014

Just to let you know I have added an Spsc which might fit the bill here:
https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscGrowableArrayQueue.java
Small footprint, grows to max size if the queue needs to grow (if consumer falls behind), still has some of the false sharing protection. Seems to perform well in my benchmarks, let me know how it works for you.

@akarnokd
Copy link
Member

Quite clever; no CAS, no tombstone and no copying. However, correct me if I'm wrong, but it seems the code on line 135 may read beyond the buffer if offset is at the last element. The other thing I see is that the queue should disallow offering Object[] values because it would confuse the poll. Better yet, when a new array needs to be communicated, wrap it into a private holder class so it is not confused with any other type; resize should be infrequent enough to not cause significant overhead.

@akarnokd
Copy link
Member

Since the queues are limited to RxRingBuffer.SIZE, it might be worth considering this growable queue doesn't grow in several steps but jumps immediately to its maximum value. In my two-phased version, I also triggered a growth after cerain number of elements have been offered; this helped eliminating the cost of CAS for long running queues. Here, if the poll() can know no more resize can happen, an instanceof may be avoided (replaced by a simple boolean flag).

@nitsanw
Copy link
Contributor

nitsanw commented Dec 28, 2014

Fixed as per your suggestions in JCTools/JCTools#43
I think the factor of 2 growth is beneficial, especially since CAS is no longer required. You'll be saving up on intermediate sizing, but suffering in the case of transient spike.
I considered adding a 'max capacity reached flag', still playing with it. I think the instanceof check is not that costly if you mean to actually use the returned object.

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2015

The RxRingBuffer has been fixed and the JCTools queues have been upgraded in 1.0.5. Pooling seems to be the only way to increase the performance of single-shot merging of values.

@akarnokd akarnokd closed this as completed Feb 5, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants