Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite OnSubscribeRefCount to handle synchronous source #1695

Merged
merged 4 commits into from
Oct 14, 2014

Conversation

davidmoten
Copy link
Collaborator

Resubmitting this one having stuffed up the rebase.

Rewrite of OnSubscribeRefCount because original didn't handle synchronous source (#1688) and as far as I could see the original was fundamentally broken by this use case. Please note also includes a fix in OperatorMulticast where a null check was required. I've done the rewrite using locks rather than submit a lock-free version just to ensure that the approach is valid with the experts first.

// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a synchronous
// Observable
source.connect(new Action1<Subscription>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should put unlock in a finally clause because Action1 may be ignored if already unsubscribed. E.g.,

try {
            source.connect(new Action1<Subscription>() {
                @Override
                public void call(Subscription subscription) {
                    baseSubscription = subscription;

                    // handle unsubscribing from the base subscription
                    subscriber.add(disconnect());

                    // ready to subscribe to source so do it
                    source.unsafeSubscribe(subscriber);
                }
            });
}
finally {
      // release the write lock
      lock.writeLock().unlock();
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that approach won't work because source may be synchronous and the finally block doesn't get executed till the source completes preventing other subscription. I agree though that the finally is needed, just need to get the locking right.

@davidmoten
Copy link
Collaborator Author

would appreciate review of latest commit which incorporates @zsxwing suggestions

@Override
public void call() {
if (subscriptionCount.decrementAndGet() == 0) {
baseSubscription.unsubscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could disconnect the source immediately if one thread reaches L125 while another thread reaches L81 (because the counter was decremented to 0 and then incremented to 1 in the latter thread), i.e. prevent the reconnect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks David, I'll review given that fact.

@davidmoten
Copy link
Collaborator Author

Righto, over to you concurrency switched on people again. I'm not convinced that given the holes that @akarnokd pointed out that a dual lock arrangement is going to work so I've wound back to using a single lock for all subscriptions and disconnect.

@benjchristensen
Copy link
Member

I don't quite understand these changes yet. Partly I'm confused because the newly added unit tests pass even if the code changes aren't applied.

// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this block right here on a synchronous source and then deadlock any other subscriber trying to subscribe since the lock is now held?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because connect is called with an action that allows the subscription to
be read and in our case recorded. The action also unlocks.
On 3 Oct 2014 11:38, "Ben Christensen" [email protected] wrote:

In
rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRefCount.java:

  •            queue = null;
    
  •            if (localQueue == null) {
    
  •                emitting = false;
    
  •                return;
    
  • public void call(final Subscriber<? super T> subscriber) {
  •    lock.lock();
    
  •    if (subscriptionCount.incrementAndGet() == 1) {
    
  •        final AtomicBoolean writeLocked = new AtomicBoolean(true);
    
  •        try {
    
  •            // need to use this overload of connect to ensure that
    
  •            // baseSubscription is set in the case that source is a
    
  •            // synchronous Observable
    
  •            source.connect(onSubscribe(subscriber, writeLocked));
    

Won't this block right here on a synchronous source and then deadlock any
other subscriber trying to subscribe since the lock is now held?


Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/pull/1695/files#r18372521.

@benjchristensen benjchristensen added this to the 1.0 milestone Oct 7, 2014
@benjchristensen
Copy link
Member

Spent time reviewing this again and I think it's good so merging.

Thanks @davidmoten for the fix and @zsxwing and @akarnokd for the reviews.

I considered alternatives to the single-lock approach but am not confident enough in any of them to attempt right now. Correctness first, we can attempt enhancements later.

benjchristensen added a commit that referenced this pull request Oct 14, 2014
rewrite OnSubscribeRefCount to handle synchronous source
@benjchristensen benjchristensen merged commit 8c2986d into ReactiveX:0.20.x Oct 14, 2014
@davidmoten davidmoten deleted the refcount-1688 branch April 10, 2015 04:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants