Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lwt_stream with several consumers not lwt-threadsafe #151

Closed
sergezloto opened this issue May 6, 2015 · 6 comments
Closed

Lwt_stream with several consumers not lwt-threadsafe #151

sergezloto opened this issue May 6, 2015 · 6 comments

Comments

@sergezloto
Copy link

Running this:

let (>>=) = Lwt.(>>=)
let (>|=) = Lwt.(>|=)                
let stream, push = Lwt_stream.create ()
(* seed the stream *)
let () = for i = 0 to 100 do push (Some i) done
let rec p i =
  Lwt_stream.next stream >>= fun n ->
  Lwt_log.log_f ~level:Lwt_log.Error "lwt %2d: using %d" i n >>= fun () ->
  Lwt.async (fun () -> Lwt_unix.sleep 2. >|= fun () -> push @@ Some n);
  p i
let main () =
  let lwts = Array.init 25 p |> Array.to_list in
  Lwt.join lwts
let () = Lwt_main.run @@ main ()

results in same item from Lwt_stream returned to all waiting theads:

tst.native: main: lwt 22: using 97
tst.native: main: lwt 23: using 98
tst.native: main: lwt 24: using 99
tst.native: main: lwt  0: using 100
tst.native: main: lwt  0: using 0
tst.native: main: lwt 24: using 0
tst.native: main: lwt 23: using 0
tst.native: main: lwt 22: using 0
tst.native: main: lwt 21: using 0
tst.native: main: lwt 20: using 0
...
@pqwy
Copy link
Contributor

pqwy commented Feb 18, 2016

I was very curious about this too. All the threads that are simultaneously sleeping on a single stream will awake with the same result; quite unexpected.

Except it seems to be the intended semantics. The very first test in the test-suite checks this property:

let mvar = Lwt_mvar.create_empty () in
let stream = Lwt_stream.from (fun () ->
  Lwt_mvar.take mvar >>= fun x ->
    return (Some x)) in
let t1 = Lwt_stream.next stream in
let t2 = Lwt_stream.next stream in
let t3 = Lwt_stream.next stream in
Lwt_mvar.put mvar 1 >>= fun () ->
  t1 >>= fun x1 ->
  t2 >>= fun x2 ->
  t3 >>= fun x3 ->
  return ([x1; x2; x3] = [1; 1; 1]));

And the push-streams exhibit the same behavior:

let main () =
  let stream, push = Lwt_stream.create () in
  let (t1, t2) = Lwt_stream.(next stream, next stream) in
  push (Some 1); push (Some 700);
  t1 >>= fun x -> t2 >|= fun y -> assert (x = y)

The culprit is here; notice how in the positive branch consumers enter a wait (feed), but then recur with the same node.

So changing this is straightforward:

diff --git a/src/core/lwt_stream.ml b/src/core/lwt_stream.ml
index fafe2d4..5e5e78a 100644
--- a/src/core/lwt_stream.ml
+++ b/src/core/lwt_stream.ml
@@ -534,7 +534,7 @@ let get_while_s f s = get_while_s_rec s.node [] f s

 let rec next_rec s node =
   if node == !(s.last) then
-    feed s >>= fun () -> next_rec s node
+    feed s >>= fun () -> next_rec s s.node
   else
     match node.data with
       | Some x ->

... but the real questions is: what is the intended semantics? I personally find multi-pop unexpected. It's relatively racy, in that if I want to distribute (unique) items in from single stream, i need to create my own locking around it.

Conversely, if every sleeper dequeues a distinct node, the current behavior can be restored via Lwt_condition.broadcast.

Nothing in the test-suite breaks if this is changed (other than the topmost Lwt_stream test 😄), but I don't understand what was the intention here, or whether anything else depends on the current behavior. At very least, this should be clearly documented.

@diml, @Drup, @vbmithr; a penny for your thoughts?

@sergezloto
Copy link
Author

It could be that Lwt_stream was intended for internal Lwt use only. At the least, the behaviour should either

  • be documented
  • changed to match the expected
  • or this public api removed altogether

@Drup
Copy link
Member

Drup commented Feb 18, 2016

Removing the API is obviously out of question, it's used by numerous projects. I have no idea what was the intended semantic, since it was added well before my time. @diml ?

@sergezloto
Copy link
Author

Meant: removed from public api documentation.

@ghost
Copy link

ghost commented Feb 19, 2016

Well, I always thought that having several consumers for the same stream was a bit fishy and it wasn't worth making Lwt_stream more complex just to give it a clear semantic. For instance what is expected if you do several Lwt_stream.find simultaneously on the same stream?

Note that the current documentation of Lwt_stream.next matches the current semantic: if you call Lwt_stream.next you do get the next element of the stream.

@pqwy the change you suggest is not optimal: if you have 100 loops reading from the stream concurrently, every time an element is pushed the 100 threads will wakeup but only one will get the element. Worse, you can easily build a pattern where one loop gets everything and the others get nothing. I think this would be the case with this example:

let rec loop n st =
  Lwt_stream.next st >>= fun x ->
  Printf.printf "loop %d got %d\n%!" n x;
  Lwt_main.yield () >>= fun () ->
  loop n st

let main () =
  let st, push = Lwt_stream.create () in
  for i = 1 to 100 do
    Lwt.async (fun () -> loop i st)
 done;
  let rec loop () =
     push (Some 1);
     Lwt_unix.sleep 0.1 >>= loop
  in
  loop ()

let () = Lwt_main.run main

You need to queue waiters manually to be sure this doesn't happen

@aantron aantron modified the milestone: Lwt_stream Jun 29, 2016
@aantron
Copy link
Collaborator

aantron commented Dec 6, 2016

Will be addressed as part of #250. This issue is linked from there. Discussion partially continued there.

@aantron aantron closed this as completed Dec 6, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants