-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OperatorDelay #1082
OperatorDelay #1082
Conversation
RxJava-pull-requests #993 SUCCESS |
Isn't this simpler if we just use a scheduler inside an Operator? The previous implementation seemed wrong to me... it feels like it should just be this (ignoring subscriptions in this pseudo-code): public void onNext(T t) {
schedulerWorker.schedule(() -> {
s.onNext(t);
}, delayTime, delayUnit);
} |
I've noticed that in the original but since the Worker is stateful, unsubscribing one would prevent any other clients from running scheduled work. Therefore, I create workers per child Subscribers instead of one per operator. Edit: sorry, I misread your comment. It would work until an onCompleted event is fired and the last onNext is still in the schedule. Making sure the onCompleted is delayed is complicated (I had implemented in the original selector version), but when using the concat trick, it will be emitted at the right time since both the source and the last single observable complete. |
If the entire subscription is unsubscribed, we want the worker to shut down. Each It's very similar to |
Yes, one Worker per Subscriber. As for the simplified onNext you are proposing, It would work until an onCompleted event is fired and the last onNext is still in the schedule. Making sure the onCompleted is delayed is complicated (I had implemented in the original selector version), but when using the concat trick, it will be emitted at the right time since both the source and the last single observable complete. |
Hmmm... if the |
We could only delay onCompleted the same amount as every onNext, which might be unwanted. The second concern is that the worker might get unsubscribed right after the onCompleted got scheduled. |
Got it. So can we use the scheduler.worker for onNext and then your concat trick for the onComplete? The reason is that it seems overly complicated and heavy right now using a multicast |
I've been fiddling with this and came up with a less weighted version: Observable<Observable<T>> seqs = observable.map(new Func1<T, Observable<T>>() {
public Observable<T> call(final T x) {
final AsyncSubject<T> result = AsyncSubject.create();
scheduler.createWorker().schedule(new Action0() {
@Override
public void call() {
result.onNext(x);
result.onCompleted();
}
}, delay, unit);
return result;
}
});
return Observable.concat(seqs); We need the AsyncSubject because the x emission might happen before or after the concat subscribes to the observable representing the future x. If it is still too heavy, I could create an AsyncSubject variant that has simplified subscription management limited to one client (the concat). |
I was worried about the workers but they can be shared properly: https://gist.github.com/akarnokd/11269808 |
I'll create a new PR due to the changes in master. |
Operator Delay
Issue #1060
I've applied the same item-delaying logic found in the timed delay to the selector-based delay. It looks elegant but adds a few extra layers layers and thus increases the per item delivery overhead.