Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsubscribing does not work when using subscribeOn(Schedulers.newThread()) #431

Closed
oasalonen opened this issue Oct 14, 2013 · 9 comments
Closed

Comments

@oasalonen
Copy link

I'm having trouble unsubscribing whenever I use Observable.subscribeOn(Schedulers.newThread()) on Android. I'm using rxjava 0.14.3. In my case, my observer class does the following:

Subscription sub = myObject.getData().subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(this);

The getData() method creates an observable, and the onSubscribeFunc generates some mock data using a new thread:

final BooleanSubscription subscription = new BooleanSubscription();
Thread t = new Thread(new Runnable() {
    public void run() {
        // generate data, call onNext()
        // check subscription.isUnsubscribed()
        // sleep a bit, repeat last two steps
    }
});
t.start();
return subscription;

When I unsubscribe the (Composite) subscription I got in the observer, isUnsubscribed() never returns true in the worker thread and BooleanSubscription.unsubscribe() never gets called (breakpoint never triggered). I stepped through the CompositeSubscription.unsubscribe() method, and it seems that it doesn't even contain the BooleanSubscription anywhere in the keySet.

The interesting thing is that if I call subscribeOn(Schedulers.threadPoolForComputation()) or call it using AndroidSchedulers.mainThread(), the unsubscription works correctly. Am I misusing the Schedulers.newThread() method or does the NewThreadScheduler have a bug?

@zsxwing
Copy link
Member

zsxwing commented Oct 17, 2013

Could you provide a unit test to reveal this issue? I tried similar codes but did not find the bug you described.

@peter-tackage
Copy link
Contributor

I've observed similar behaviour using rxjava 0.14.6, also on Android when attempting to unsubscribe from a Subscription created using Schedulers.newThread().schedulePeriodically(...)

The failure is fairly easy to reproduce in the context of the application, although it's inconsistent as often the subscriptions are successfully unsubscribed.

@rupertbates
Copy link

This test demonstrates the problem, basically the Observable never receives unsubscribes even though the Observer no longer receives notifications:

package tests;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.util.functions.Func1;

import java.util.concurrent.TimeUnit;

public class UnsubscribeTest {
    @Test
    public void testUnsubscribe() throws InterruptedException {
        Subscription sub = Observable.interval(1, TimeUnit.SECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        System.out.println("generated " + aLong);
                        return aLong;
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.currentThread())
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long args) {
                        System.out.println("Received " + args);
                    }
                });

        Thread.sleep(3000);
        System.out.println("Calling unsubscribe");
        sub.unsubscribe();
        Thread.sleep(3000);
    }
}

Output is:

generated 0
Received 0
generated 1
Received 1
Calling unsubscribe
generated 2
generated 3
generated 4
generated 5

If I change the subscribeOn thread to Schedulers.threadPoolForIO() then the output is:

generated 0
Received 0
generated 1
Received 1
Calling unsubscribe

@benjchristensen
Copy link
Member

I'll take a look.

@benjchristensen
Copy link
Member

I think this is fixed in #472

Can someone confirm?

@rupertbates
Copy link

Yes looks fixed to me.

@benjchristensen
Copy link
Member

Great, I'll release today or tomorrow.

@edisaverio
Copy link

Hello guys - I'm on 0.15.1 and still see the issue, on any scheduler you subscribeOn.
Issue is a little different - I go on getting called onNext on the observer even after unsubscribe(). Only difference is that onComplete is not called.

@akarnokd
Copy link
Member

The test program above prints the values expected when run with the current master, but the whole program just doesn't quit after that. I see three RxNewThreadScheduler threads (2 core machine), all of them waiting for ThreadPoolExecutor.workQueue.take(). These threads aren't marked as daemon threads. See NewThreadScheduler L55.

Edit: Maybe they should be regular threads, but the executor running them should be allowed to timeout its single core thread since NewThreadScheduler will start a new pool anyway.

private static class EventLoopScheduler extends Scheduler {
    private final ExecutorService executor;

    private EventLoopScheduler() {
        ThreadPoolExecutor e = (ThreadPoolExecutor)Executors.newFixedThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
            }
        });
        e.setKeepAliveTime(1, TimeUnit.SECONDS);
        e.allowCoreThreadTimeOut(true);
        executor = e;
    }
    ...
}

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Dec 17, 2013
This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR.

See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and ReactiveX#431 (comment)
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR.

See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and ReactiveX#431 (comment)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants