From 4854792d24372ad6676468700ef66f359b8aa43d Mon Sep 17 00:00:00 2001 From: Jonathan Ohlrich Date: Sun, 5 Nov 2017 21:14:29 -0500 Subject: [PATCH 1/2] failing test for blocking calls --- .../AsyncSeqTests.fs | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index e0ef8a2..cc8766c 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -470,6 +470,48 @@ let ``AsyncSeq.bufferByTime`` () = Assert.True ((actual = expected)) +[] +let ``AsyncSeq.bufferByCountAndTime should not block`` () = + let op = + asyncSeq { + while true do + do! Async.Sleep 1000 + yield 0 + } + |> AsyncSeq.bufferByCountAndTime 10 1000 + |> AsyncSeq.take 3 + |> AsyncSeq.iter (ignore) + + // should return immediately + // while a blocking call would take > 3sec + let watch = System.Diagnostics.Stopwatch.StartNew() + let cts = new CancellationTokenSource() + Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token) + watch.Stop() + cts.Cancel(false) + Assert.Less (watch.ElapsedMilliseconds, 1000L) + +[] +let ``AsyncSeq.bufferByTime should not block`` () = + let op = + asyncSeq { + while true do + do! Async.Sleep 1000 + yield 0 + } + |> AsyncSeq.bufferByTime 1000 + |> AsyncSeq.take 3 + |> AsyncSeq.iter (ignore) + + // should return immediately + // while a blocking call would take > 3sec + let watch = System.Diagnostics.Stopwatch.StartNew() + let cts = new CancellationTokenSource() + Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token) + watch.Stop() + cts.Cancel(false) + Assert.Less (watch.ElapsedMilliseconds, 1000L) + [] let ``try finally works no exception``() = let x = ref 0 From 23450c25f27449397f8bb1f06a0305f00df542ce Mon Sep 17 00:00:00 2001 From: Jonathan Ohlrich Date: Sun, 5 Nov 2017 21:30:15 -0500 Subject: [PATCH 2/2] replace use of WaitAny for WhenAny --- src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 34 ++++++++++++------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 221548b..4c872b0 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -104,19 +104,19 @@ module internal Utils = static member internal chooseTasks (a:Task<'T>) (b:Task<'U>) : Async, 'U * Task<'T>>> = async { - let! ct = Async.CancellationToken - let i = Task.WaitAny( [| (a :> Task);(b :> Task) |],ct) - if i = 0 then return (Choice1Of2 (a.Result, b)) - elif i = 1 then return (Choice2Of2 (b.Result, a)) - else return! failwith (sprintf "unreachable, i = %d" i) } + let ta, tb = a :> Task, b :> Task + let! i = Task.WhenAny( ta, tb ) |> Async.AwaitTask + if i = ta then return (Choice1Of2 (a.Result, b)) + elif i = tb then return (Choice2Of2 (b.Result, a)) + else return! failwith "unreachable" } static member internal chooseTasks2 (a:Task<'T>) (b:Task) : Async>> = async { - let! ct = Async.CancellationToken - let i = Task.WaitAny( [| (a :> Task);(b) |],ct) - if i = 0 then return (Choice1Of2 (a.Result, b)) - elif i = 1 then return (Choice2Of2 (a)) - else return! failwith (sprintf "unreachable, i = %d" i) } + let ta = a :> Task + let! i = Task.WhenAny( ta, b ) |> Async.AwaitTask + if i = ta then return (Choice1Of2 (a.Result, b)) + elif i = b then return (Choice2Of2 (a)) + else return! failwith "unreachable" } type MailboxProcessor<'Msg> with member __.PostAndAsyncReplyTask (f:TaskCompletionSource<'a> -> 'Msg) : Task<'a> = @@ -1493,20 +1493,20 @@ module AsyncSeq = let tasks = Array.zeroCreate n for i in 0 .. ss.Length - 1 do let! task = Async.StartChildAsTask (ies.[i].MoveNext()) - do tasks.[i] <- (task :> Task) + do tasks.[i] <- task let fin = ref n while fin.Value > 0 do - let! ct = Async.CancellationToken - let i = Task.WaitAny(tasks, ct) - let v = (tasks.[i] :?> Task<'T option>).Result + let! ti = Task.WhenAny (tasks) |> Async.AwaitTask + let i = Array.IndexOf (tasks, ti) + let v = ti.Result match v with | Some res -> yield res let! task = Async.StartChildAsTask (ies.[i].MoveNext()) - do tasks.[i] <- (task :> Task) - | None -> + do tasks.[i] <- task + | None -> let t = System.Threading.Tasks.TaskCompletionSource() - tasks.[i] <- (t.Task :> Task) // result never gets set + tasks.[i] <- t.Task // result never gets set fin := fin.Value - 1 }