-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
save allocation in OperatorSkipTimed #4239
Conversation
Current coverage is 84.06% (diff: 100%)@@ 1.x #4239 diff @@
==========================================
Files 265 265
Lines 17313 17319 +6
Methods 0 0
Messages 0 0
Branches 2627 2627
==========================================
- Hits 14578 14560 -18
- Misses 1885 1901 +16
- Partials 850 858 +8
|
@@ -77,4 +72,15 @@ public void onCompleted() { | |||
} | |||
}; | |||
} | |||
|
|||
final static class AtomicBooleanAction extends AtomicBoolean implements Action0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you subclass Subscriber
, implement Action0
, and use a volatile field for the boolean then you can save even more!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is better. You don't use CAS on the boolean thus a regular volatile field suffices. Then you have 1 allocation. Also switching to OnSubscribe saves another.
Good point, thanks @JakeWharton, I'll do that. |
Ah, the trouble with that idea @JakeWharton is that I need to create a non-anonymous class to subclass |
happy to switch to |
3bf68ff
to
9648079
Compare
Got rid of |
@JakeWharton upon reflection I realize there's no cost moving from anonymous to non-anonymous class because the anonymous class would have included at least one implicit external reference to access the |
final Worker worker = scheduler.createWorker(); | ||
child.add(worker); | ||
SkipTimedSubscriber<T> subscriber = new SkipTimedSubscriber<T>(child); | ||
worker.schedule(subscriber, time, unit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
child.add(subscriber)
missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah oops, thank you @akarnokd . I just need to add super(child) in the SkipTimedSubscriber
constructor. You mentioning that makes me realize that there is probably a backpressure related bug in the existing operator in as much as when we don't emit to the child we should request another (like filter
). I'll write a unit test for unsubscription as penance and another to demonstrate the backpressure problem (if it exists).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's wrong because it calls unsubscribe
in the onError
/onCompleted
path and unsubscribes the child. The worker
should be added to subscriber
instead and subscriber
added to the child
. No constructor forwarding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't support backpressure:
/**
* Returns an Observable that skips values emitted by the source Observable before a specified time window
* elapses.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't support backpressure as it uses time to skip arbitrary number of elements and
* thus has to consume the source {@code Observable} in an unbounded manner (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code skip} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time
* the length of the time window to skip
* @param unit
* the time unit of {@code time}
* @return an Observable that skips values emitted by the source Observable before the time window defined
* by {@code time} elapses and the emits the remainder
* @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructor forwarding handles requests for us too though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would lead to hot spinning for the duration of the time skip.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I'm with you, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually maybe I'm not with you yet. The alternative is to request Long.MAX_VALUE
and it might still spin like crazy in an upstream operator. It just doesn't seem different from filter
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requesting 1 each time incurs an atomic increment costing 22-45 cycle overhead. For filter, the element count is the main deciding factor. For skip its time which can translate to arbitrary element count. The problem is you can't know what the source is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For filter, the element count is the main deciding factor
That's assuming that the predicate doesn't use time.
For skip its time which can translate to arbitrary element count. The problem is you can't know what the source is.
I'm not sure why we wouldn't accept the atomic increment cost as the price to have this operator support backpressure.
👍 |
as per title, combined
AtomicBoolean
with anAction
.