-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New Implementation of SerializedObserver #999
New Implementation of SerializedObserver #999
Conversation
#### JMH Benchmarks 0.17.3 Benchmark (size) Mode Samples Mean Mean error Units r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1024 avgt 5 45.504 1.710 ns/op r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1048576 avgt 5 58.600 5.647 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1024 avgt 5 68.610 4.596 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1048576 avgt 5 71.313 2.318 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1024 avgt 5 73.322 3.666 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1048576 avgt 5 76.518 1.355 ns/op 0.17.2 Benchmark (size) Mode Samples Mean Mean error Units r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1024 avgt 5 45.790 1.184 ns/op r.operators.OperatorSerializePerf.noSerializationSingleThreaded 1048576 avgt 5 58.518 3.788 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1024 avgt 5 72.665 7.851 ns/op r.operators.OperatorSerializePerf.serializedSingleStream 1048576 avgt 5 74.788 2.946 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1024 avgt 5 73.661 3.499 ns/op r.operators.OperatorSerializePerf.synchronizedSingleStream 1048576 avgt 5 78.386 5.036 ns/op #### Manual Benchmarks /** * 0.17.3: * * Run: 10 - 9,746,505 ops/sec * Run: 11 - 9,956,019 ops/sec * Run: 12 - 10,053,770 ops/sec * Run: 13 - 10,076,958 ops/sec * Run: 14 - 9,983,319 ops/sec * * 0.17.2: * * Run: 10 - 9,851,999 ops/sec * Run: 11 - 9,726,975 ops/sec * Run: 12 - 9,719,762 ops/sec * Run: 13 - 9,668,141 ops/sec * Run: 14 - 9,799,700 ops/sec * * @param input */ public void serializedSingleStream(Input input) { for (int i = 0; i < reps; i++) { input.observable.serialize().subscribe(input.subscriber); } }
Unit test showing delays. Fails when MAX_DRAIN_ITERATION set to 1, passes as currently configured. Added a thread starvation unit test and marked as ignored for now. Doesn't pass even with MAX_DRAIN_ITERATION set to 1. Probably needs backpressure solution.
RxJava-pull-requests #932 FAILURE |
synchronized (this) { | ||
list = queue; | ||
queue = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If list is null here, there is a window between this sync block and the finally sync block where emitting is still true and events are queued and not replayed until a subsequent event appears. A better way would be:
synchronized (this) {
list = queue;
queue = null;
if (list = null) {
emitting = false;
break;
}
}
But then the finally block should be changed to avoid setting emitting to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean, but the intricacies with onComplete
/onError
and the race for terminated
then becomes quite complicated.
If I understand correctly, the finally
block would not only need to not touch emitting
in this case when not terminated, but it would also have to check if terminated and !emitting
to reclaim the right to drain the queue, correct? Otherwise the terminal state could result in duplicate emission.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, something like that. I'll post the proposed correction within a day.
New Implementation of SerializedObserver
RxJava-pull-requests #936 FAILURE |
Rewrite of
SerializedObserver
by @akarnokd discussed at #962 (comment) from this Gist: https://gist.github.com/akarnokd/9545150JMH Benchmarks
0.17.3
0.17.2
Manual Benchmarks