Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement 'subscribeOn' using 'lift' #822

Merged
merged 3 commits into from
Feb 7, 2014

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Feb 6, 2014

hi, this PR reimplemented the subscribeOn using lift. However, both the original and current implementation can not guarantee that unsubscribe is always called in the scheduler. An extreme example is:

    public static void main(String[] args) throws InterruptedException {
        Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> t1) {
                final Subscription s = Subscriptions.create(new Action0() {

                    @Override
                    public void call() {
                        System.out.println(Thread.currentThread().getName());
                    }
                });
                t1.add(s);
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        t1.unsubscribe();
                    }

                }, "test").start();
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                t1.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread()).subscribe();
        Thread.sleep(10000);
    }

will output "test".

@cloudbees-pull-request-builder

RxJava-pull-requests #742 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #743 SUCCESS
This pull request looks good


@Override
public void call() {
scheduler.schedule(new Action1<Inner>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use inner here I guess.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a recursive schedule. As you said, if using inner, it may prevents further scheduled actions.

@akarnokd
Copy link
Member

akarnokd commented Feb 6, 2014

I've played around with this and couldn't get the scheduled unsubscription to run because when the subscription is scheduled, it exposes the Inner.MAS, which then gets unsubscribed and prevents further scheduling on Inner completely.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 6, 2014

Could you provide an example? The return value of scheduler.schedule isn't used.

@akarnokd
Copy link
Member

akarnokd commented Feb 6, 2014

This example doesn't print unsub and doesn't terminate after 100ms as it should (I know it isn't your subscribeOn, but this would ensure the unsubscription happens on the same newThread() thread):

public class SubscribeOnTest {
    static <T> Observable<T> subscribeOn1(Observable<T> source, Scheduler scheduler) {
        return Observable.create((Subscriber<? super T> s) -> {
            Subscriber<T> s0 = new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    s.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    s.onError(e);
                }
                @Override
                public void onNext(T t) {
                    s.onNext(t);
                }
            };
            MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
            s.add(mas);
            mas.set(scheduler.schedule(i -> {
                s.add(Subscriptions.create(() -> {
                    i.schedule(j -> {
                        System.out.println("Unsub: " + Thread.currentThread().getName());
                        s0.unsubscribe();
                    });
                }));
                System.out.println("Sub  : " + Thread.currentThread().getName());
                source.subscribe(s0);
            }));
        });
    }
    public static void main(String[] args) throws Exception {
        Subscription s = subscribeOn1(Observable.interval(10, TimeUnit.MILLISECONDS)
                , Schedulers.computation()).subscribe(v -> {
                    System.out.printf("%s: %s%n", Thread.currentThread().getName(), v);
                });

        Thread.sleep(105);
        s.unsubscribe();
        Thread.sleep(100);
    }
}

The problem is that the first schedule call once run should be gently removed to preserve the Inner's isUnsubscribed==false status. I had to modify MultipleAssignmentSubscription to allow setting and unsubscribing without calling unsubscribe in the inner subscription:

https://gist.github.com/akarnokd/8843694

https://gist.github.com/akarnokd/8843709

With the latter, it correctly subscribes and unsubscribes.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 6, 2014

If the Subscription of scheduler.schedule is not added to the subscriber or the mas, so the unsubscribe can be always called, although there is a little performance cost.

@cloudbees-pull-request-builder

RxJava-pull-requests #745 SUCCESS
This pull request looks good

}

}));
cs.add(subscriber);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line necessary? As I understand subscriber chains, this is what should happen:

subscriber.unsubscribe -> cs.unsubscribe -> o.unsubscribe

benjchristensen added a commit that referenced this pull request Feb 7, 2014
Reimplement 'subscribeOn' using 'lift'
@benjchristensen benjchristensen merged commit d40b684 into ReactiveX:master Feb 7, 2014
@benjchristensen
Copy link
Member

Thanks @zsxwing for this and @akarnokd for the review.

@zsxwing zsxwing deleted the subscribeOn branch February 7, 2014 04:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants