-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented the delay
operator
#384
Conversation
RxJava-pull-requests #281 SUCCESS |
The idea is that The |
* Delays the observable sequence by the given time interval. | ||
*/ | ||
public static <T> OnSubscribeFunc<T> delay(final Observable<? extends T> source, long delay, TimeUnit unit) { | ||
return delay(source, delay, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to use Schedulers.threadPoolForComputation()
so we reuse the existing ScheduledExecutorService
rather than creating a new one for every single call to delay
.
RxJava-pull-requests #282 SUCCESS |
RxJava-pull-requests #283 SUCCESS |
Why was this decision made? Is that how Rx.Net works (the MSDN docs don't say). This means we'd ignore It seems that |
@Override | ||
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delay, TimeUnit unit) { | ||
long newDelay = unit.toNanos(delay) + this.unit.toNanos(this.delay); | ||
return underlying.schedule(state, action, newDelay, TimeUnit.NANOSECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this will inject non-determinism ... notifications will be capable of interleaving and being out of order.
I think we need to combine this with ScheduledObserver
which maintains a queue and event loop for handling each notification sequentially on the given scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, can anyone verify what Rx.Net does here related to order?
Can events become out of order when using delay
or does it retain order guarantees (as I expect it would)?
Concerning the question whether Rx.NET emits errors without delay: Here it says that they get emitted without delay. MSDN doesn't say anything about it. So I tried it out in F#, and it actually doesn't get delayed: #light
open Core.Printf
open System
open System.Reactive.Linq
module Program =
exception ObsvError
[<EntryPoint>]
let Main(args) =
let mapToError = fun (x: int64) -> if x > 4L then raise ObsvError else x
let obsv = Observable.Interval(TimeSpan.FromSeconds 1.0).Select(mapToError).Delay(TimeSpan.FromSeconds 4.5)
let sub = obsv.Subscribe(onNext = (fun x -> printfn "%d" x),
onError = (fun exn -> Console.WriteLine "Error!"))
Console.ReadLine() |> ignore
sub.Dispose()
0 Will output:
If the error weren't delayed at all, however, it should happen before |
I guess all that means that we'll have to introduce a specific timestamped queue here. |
@headinthebox Erik, can you provide guidance on what we should do here? It seems that |
Erik has confirmed that 'onError' should emit immediately and that if the 'onNext' events should not be lost then |
Based on Erik's confirmation is this good to merge? |
Unfortunately, I didn't get around to another close look here yet. I'm afraid that this might still need a queue and some synchronization in order to be clean concurrency-wise. Imho the current implementation should work for sane use cases, though. |
I must say that I am surprised that in .NET
does not behave the same as
Erik On Oct 22, 2013, at 10:05 PM, Joachim Hofer [email protected] wrote:
|
Could we implement delay with something like
|
That’s what I effectively do below; except I am not sure why you add the extra call to cache. I am digging into the .NET implementation as we speak On Oct 23, 2013, at 11:44 AM, samuelgruetter [email protected] wrote:
|
(Note that I'm making these claims without having access to a computer where I can do tests, so this risks being complete nonsense ;-) ) |
The .NET implementation has a bug. The version using the Delay operator [0] 31~~>44 If you draw the marble diagram, the correct answer is to call onError after {Exception of type 'System.Exception' was thrown.} Erik var xs = Observable.Interval(TimeSpan.FromSeconds(1))
xs.Subscribe ( x => Console.WriteLine(x) , e => Console.WriteLine("["+e.Message+"]") , () => Console.WriteLine("!") ); var zs = Observable.Interval(TimeSpan.FromSeconds(1))
x)).Concat()
zs.Subscribe ( x => Console.WriteLine(x) , e => Console.WriteLine("{"+e.Message+"}") , () => Console.WriteLine("!") ); From: erik meijer [mailto:[email protected]] That's what I effectively do below; except I am not sure why you add the I am digging into the .NET implementation as we speak since I am not sure that implementation is correct. On Oct 23, 2013, at 11:44 AM, samuelgruetter [email protected] Could we implement delay with something like xs.map(x => Observable.timer(t).map(_ => x).cache()).concat() Reply to this email directly or view it on GitHub |
@jmhofer Can you rebase this with master so it can be merged and ensure it matches the final answers we got from @headinthebox ? |
this is a +1 on this PR. |
I have rebased this onto netflix/rxjava/master and issued a PR to @jmhofer... here is my fork: https://github.com/jonnolen/RxJava/tree/delay |
Completed in #576 |
This implements the operator from #36 in all four variants.
The tests also found a bug in
interval
that I fixed.Maybe this is a bug of
map
though -map
throws exceptions inonNext
, not sure if it should be allowed to do that - theSafeObserver
that it relies on comes too late for scheduled actions. However, this can be discussed and fixed independently.