-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimized observeOn/subscribeOn #2603
Conversation
New benchmark results (i7 920, Java 1.8u31, Windows 7 x64)
There are 2x-3x througput improvements. I've verified there aren't any exceptions thrown and the last emitted value is always the size - 1 (all values should have been delivered). Note that these improvements are due to overhead reduction. With
Most likely, the improvement comes from the change to isUnusubscribed reading a volatile field instead of always entering the unbiasable synchronized blocks. |
@akarnokd great stuff, I'll try it out early next week and report some numbers :) |
Now on: i7 4770k, Java 1.8u31, Windows 7 x64
|
I see these improvements ... still reviewing the code:
|
|
||
public abstract class ObjectPool<T> { | ||
private Queue<T> pool; | ||
private final int maxSize; | ||
|
||
private Scheduler.Worker schedulerWorker; | ||
private final ScheduledExecutorService schedulerWorker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should combine all periodic maintenance-related threads into a single service: ObjectPool, IO scheduler's cleanup pool, JDK 6 computation scheduler's purge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does that affect memory, caches, etc to have an extra thread doing this stuff instead of just being the existing event loops?
I ask because we accidentally were running extra threads in a test with Netty+RxJava and it decreased throughput by 40%. That was far higher workload because metrics were being captured on the other threads, but it makes me less certain about choices involving putting work on other threads.
final Subscriber<? super T> child = this.child; | ||
final NotificationLite<T> on = this.on; | ||
@SuppressWarnings("rawtypes") | ||
final AtomicLongFieldUpdater<ObserveOnSubscriber> counter = COUNTER_UPDATER; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, what benefit does this give to assign this reference? Why use counter
instead of COUNTER_UPDATER
directly?
Same for the other assignments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the volatile access in the loop, the JIT optimization that would hoist them into registers is not allowed (i.e., field access couldn't be moved before a volatile read) and would just re-read them all the time.
@benjchristensen what is the verdict on this PR? |
I think it's only the discussion about exposing |
The optimization is built upon the direct access which is not possible if the class is package private. I could create a Schedulers.scheduleSingle(Scheduler, Action0) that calls schedule direct and since they are both in the same package, no need to expose EventLoopScheduler. |
So why don't you just move it to the rx.internal packages as you suggested before? |
Can you please rebase this, and move that file to rx.internal so it is not made part of the public API? |
Okay. |
Doing observeOn/subscribeOn on these is essentially the same operation.
Benchmark results: (i7 4770k, Java 1.8u31, Windows 7 x64)
Unfortunately, the benchmark results were quite hectic even with more warmup and iteration. I'd say the changes give +10% for the size = 1 case, but running the same code twice (observeOn 1, subscribeOn 1) gives inconsistent values. I suspect the main cause is the GC.