-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplement 'subscribeOn' using 'lift' #822
Conversation
RxJava-pull-requests #742 FAILURE |
RxJava-pull-requests #743 SUCCESS |
|
||
@Override | ||
public void call() { | ||
scheduler.schedule(new Action1<Inner>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use inner
here I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a recursive schedule. As you said, if using inner, it may prevents further scheduled actions.
I've played around with this and couldn't get the scheduled unsubscription to run because when the subscription is scheduled, it exposes the Inner.MAS, which then gets unsubscribed and prevents further scheduling on Inner completely. |
Could you provide an example? The return value of |
This example doesn't print unsub and doesn't terminate after 100ms as it should (I know it isn't your subscribeOn, but this would ensure the unsubscription happens on the same newThread() thread): public class SubscribeOnTest {
static <T> Observable<T> subscribeOn1(Observable<T> source, Scheduler scheduler) {
return Observable.create((Subscriber<? super T> s) -> {
Subscriber<T> s0 = new Subscriber<T>() {
@Override
public void onCompleted() {
s.onCompleted();
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onNext(T t) {
s.onNext(t);
}
};
MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
s.add(mas);
mas.set(scheduler.schedule(i -> {
s.add(Subscriptions.create(() -> {
i.schedule(j -> {
System.out.println("Unsub: " + Thread.currentThread().getName());
s0.unsubscribe();
});
}));
System.out.println("Sub : " + Thread.currentThread().getName());
source.subscribe(s0);
}));
});
}
public static void main(String[] args) throws Exception {
Subscription s = subscribeOn1(Observable.interval(10, TimeUnit.MILLISECONDS)
, Schedulers.computation()).subscribe(v -> {
System.out.printf("%s: %s%n", Thread.currentThread().getName(), v);
});
Thread.sleep(105);
s.unsubscribe();
Thread.sleep(100);
}
} The problem is that the first schedule call once run should be gently removed to preserve the Inner's isUnsubscribed==false status. I had to modify MultipleAssignmentSubscription to allow setting and unsubscribing without calling unsubscribe in the inner subscription: https://gist.github.com/akarnokd/8843694 https://gist.github.com/akarnokd/8843709 With the latter, it correctly subscribes and unsubscribes. |
If the Subscription of |
RxJava-pull-requests #745 SUCCESS |
} | ||
|
||
})); | ||
cs.add(subscriber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this line necessary? As I understand subscriber chains, this is what should happen:
subscriber.unsubscribe -> cs.unsubscribe -> o.unsubscribe
Reimplement 'subscribeOn' using 'lift'
hi, this PR reimplemented the
subscribeOn
usinglift
. However, both the original and current implementation can not guarantee thatunsubscribe
is always called in thescheduler
. An extreme example is:will output "test".