Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: observeOn now replenishes with constant rate #3795

Merged
merged 1 commit into from
Apr 8, 2016

Conversation

akarnokd
Copy link
Member

This PR makes sure observeOn requests replenishments in a fixed and predictable quantity of 75% of the bufferSize, that is, if an emission counter reaches 0.75 * bufferSize, that amount is requested and the emission counter is reset to zero. This requires saving the emission count between drain runs. If the bufferSize is 1 or 2, the replenishment will trigger after every 1 or 2 items.

Note that there is only one sensitive operator-builder, AsyncOnSubscribe, which is mostly affected by the request pattern as it facilitates user code to respond with an Observable sequence of the requested amount.

In addition, since observeOn now supports setting the buffer size, it can act as a rebatching operator via the help of Schedulers.immediate().

@artem-zinnatullin
Copy link
Contributor

👍

@@ -105,12 +109,15 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boo
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
this.limit = calculatedSize - (calculatedSize >> 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a less efficient (but more readable): (int) (calculatedSize * 0.75) ?

@stevegury
Copy link
Member

Other than the previous comment, 👍

@akarnokd akarnokd force-pushed the ObserveOnStableFetch branch from 23862c1 to e2331e3 Compare April 7, 2016 06:55
@akarnokd
Copy link
Member Author

akarnokd commented Apr 7, 2016

That formula doesn't work if calculatedSize == 1. I've updated the PR with a comment on the calculation.

@stevegury
Copy link
Member

👍

@akarnokd akarnokd merged commit 53c31cd into ReactiveX:1.x Apr 8, 2016
@akarnokd akarnokd deleted the ObserveOnStableFetch branch April 8, 2016 20:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants