Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorDelay #1082

Closed
wants to merge 1 commit into from
Closed

OperatorDelay #1082

wants to merge 1 commit into from

Conversation

akarnokd
Copy link
Member

Operator Delay

Issue #1060

I've applied the same item-delaying logic found in the timed delay to the selector-based delay. It looks elegant but adds a few extra layers layers and thus increases the per item delivery overhead.

@akarnokd akarnokd mentioned this pull request Apr 24, 2014
57 tasks
@cloudbees-pull-request-builder

RxJava-pull-requests #993 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

Isn't this simpler if we just use a scheduler inside an Operator? The previous implementation seemed wrong to me... it feels like it should just be this (ignoring subscriptions in this pseudo-code):

public void onNext(T t) {
   schedulerWorker.schedule(() -> { 
         s.onNext(t);
   }, delayTime, delayUnit);
}

@akarnokd
Copy link
Member Author

I've noticed that in the original but since the Worker is stateful, unsubscribing one would prevent any other clients from running scheduled work. Therefore, I create workers per child Subscribers instead of one per operator.

Edit: sorry, I misread your comment. It would work until an onCompleted event is fired and the last onNext is still in the schedule. Making sure the onCompleted is delayed is complicated (I had implemented in the original selector version), but when using the concat trick, it will be emitted at the right time since both the source and the last single observable complete.

@benjchristensen
Copy link
Member

unsubscribing one would prevent any other clients from running scheduled work

If the entire subscription is unsubscribed, we want the worker to shut down. Each Subscriber will have its own Scheduler.Worker.

It's very similar to observeOn (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java#L69) just delayed instead of immediate.

@akarnokd
Copy link
Member Author

Yes, one Worker per Subscriber.

As for the simplified onNext you are proposing, It would work until an onCompleted event is fired and the last onNext is still in the schedule. Making sure the onCompleted is delayed is complicated (I had implemented in the original selector version), but when using the concat trick, it will be emitted at the right time since both the source and the last single observable complete.

@benjchristensen
Copy link
Member

Hmmm... if the onCompleted is also being scheduled wouldn't it be fine?

@akarnokd
Copy link
Member Author

We could only delay onCompleted the same amount as every onNext, which might be unwanted. The second concern is that the worker might get unsubscribed right after the onCompleted got scheduled.

@benjchristensen
Copy link
Member

Got it. So can we use the scheduler.worker for onNext and then your concat trick for the onComplete?

The reason is that it seems overly complicated and heavy right now using a multicast ReplaySubject to handle this.

@akarnokd
Copy link
Member Author

I've been fiddling with this and came up with a less weighted version:

     Observable<Observable<T>> seqs = observable.map(new Func1<T, Observable<T>>() {
            public Observable<T> call(final T x) {
                final AsyncSubject<T> result = AsyncSubject.create();
                scheduler.createWorker().schedule(new Action0() {
                    @Override
                    public void call() {
                        result.onNext(x);
                        result.onCompleted();
                    }
                }, delay, unit);
                return result;
            }
        });
        return Observable.concat(seqs);

We need the AsyncSubject because the x emission might happen before or after the concat subscribes to the observable representing the future x. If it is still too heavy, I could create an AsyncSubject variant that has simplified subscription management limited to one client (the concat).

@akarnokd
Copy link
Member Author

I was worried about the workers but they can be shared properly: https://gist.github.com/akarnokd/11269808

@akarnokd
Copy link
Member Author

I'll create a new PR due to the changes in master.

@akarnokd akarnokd closed this Apr 25, 2014
@akarnokd akarnokd mentioned this pull request Apr 25, 2014
@akarnokd akarnokd deleted the OperatorDelay branch May 6, 2014 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants