Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

save allocation in OperatorSkipTimed #4239

Merged
merged 2 commits into from
Jul 27, 2016

Conversation

davidmoten
Copy link
Collaborator

as per title, combined AtomicBoolean with an Action.

@codecov-io
Copy link

codecov-io commented Jul 25, 2016

Current coverage is 84.06% (diff: 100%)

Merging #4239 into 1.x will decrease coverage by 0.13%

@@                1.x      #4239   diff @@
==========================================
  Files           265        265          
  Lines         17313      17319     +6   
  Methods           0          0          
  Messages          0          0          
  Branches       2627       2627          
==========================================
- Hits          14578      14560    -18   
- Misses         1885       1901    +16   
- Partials        850        858     +8   

Powered by Codecov. Last update 45f6072...8d4eff8

@@ -77,4 +72,15 @@ public void onCompleted() {
}
};
}

final static class AtomicBooleanAction extends AtomicBoolean implements Action0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you subclass Subscriber, implement Action0, and use a volatile field for the boolean then you can save even more!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is better. You don't use CAS on the boolean thus a regular volatile field suffices. Then you have 1 allocation. Also switching to OnSubscribe saves another.

@davidmoten
Copy link
Collaborator Author

Good point, thanks @JakeWharton, I'll do that.

@davidmoten
Copy link
Collaborator Author

Ah, the trouble with that idea @JakeWharton is that I need to create a non-anonymous class to subclass Subscriber and implement Action0 and that class needs to hold a reference to the child subscriber. Thus I end up with the same number of allocations.

@davidmoten
Copy link
Collaborator Author

happy to switch to onSubscribe while I'm here. I'll do that.

@davidmoten davidmoten force-pushed the skip-timed-reduce-alloc branch from 3bf68ff to 9648079 Compare July 25, 2016 21:00
@davidmoten
Copy link
Collaborator Author

Got rid of AtomicBoolean, subscriber implements Action0, converted to OnSubscribe

@davidmoten
Copy link
Collaborator Author

@JakeWharton upon reflection I realize there's no cost moving from anonymous to non-anonymous class because the anonymous class would have included at least one implicit external reference to access the child object. So thanks for the suggestion.

final Worker worker = scheduler.createWorker();
child.add(worker);
SkipTimedSubscriber<T> subscriber = new SkipTimedSubscriber<T>(child);
worker.schedule(subscriber, time, unit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.add(subscriber) missing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah oops, thank you @akarnokd . I just need to add super(child) in the SkipTimedSubscriber constructor. You mentioning that makes me realize that there is probably a backpressure related bug in the existing operator in as much as when we don't emit to the child we should request another (like filter). I'll write a unit test for unsubscription as penance and another to demonstrate the backpressure problem (if it exists).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's wrong because it calls unsubscribe in the onError/onCompleted path and unsubscribes the child. The worker should be added to subscriber instead and subscriber added to the child. No constructor forwarding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't support backpressure:

/**
 * Returns an Observable that skips values emitted by the source Observable before a specified time window
 * elapses.
 * <p>
 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't support backpressure as it uses time to skip arbitrary number of elements and
 *  thus has to consume the source {@code Observable} in an unbounded manner (i.e., no backpressure applied to it).</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This version of {@code skip} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param time
 *            the length of the time window to skip
 * @param unit
 *            the time unit of {@code time}
 * @return an Observable that skips values emitted by the source Observable before the time window defined
 *         by {@code time} elapses and the emits the remainder
 * @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
 */

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor forwarding handles requests for us too though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would lead to hot spinning for the duration of the time skip.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I'm with you, thanks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually maybe I'm not with you yet. The alternative is to request Long.MAX_VALUE and it might still spin like crazy in an upstream operator. It just doesn't seem different from filter...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting 1 each time incurs an atomic increment costing 22-45 cycle overhead. For filter, the element count is the main deciding factor. For skip its time which can translate to arbitrary element count. The problem is you can't know what the source is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For filter, the element count is the main deciding factor

That's assuming that the predicate doesn't use time.

For skip its time which can translate to arbitrary element count. The problem is you can't know what the source is.

I'm not sure why we wouldn't accept the atomic increment cost as the price to have this operator support backpressure.

@akarnokd
Copy link
Member

👍

@akarnokd akarnokd merged commit 3467685 into ReactiveX:1.x Jul 27, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants