-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lwt_stream with several consumers not lwt-threadsafe #151
Comments
I was very curious about this too. All the threads that are simultaneously sleeping on a single stream will awake with the same result; quite unexpected. Except it seems to be the intended semantics. The very first test in the test-suite checks this property: let mvar = Lwt_mvar.create_empty () in
let stream = Lwt_stream.from (fun () ->
Lwt_mvar.take mvar >>= fun x ->
return (Some x)) in
let t1 = Lwt_stream.next stream in
let t2 = Lwt_stream.next stream in
let t3 = Lwt_stream.next stream in
Lwt_mvar.put mvar 1 >>= fun () ->
t1 >>= fun x1 ->
t2 >>= fun x2 ->
t3 >>= fun x3 ->
return ([x1; x2; x3] = [1; 1; 1])); And the push-streams exhibit the same behavior: let main () =
let stream, push = Lwt_stream.create () in
let (t1, t2) = Lwt_stream.(next stream, next stream) in
push (Some 1); push (Some 700);
t1 >>= fun x -> t2 >|= fun y -> assert (x = y) The culprit is here; notice how in the positive branch consumers enter a wait ( So changing this is straightforward:
... but the real questions is: what is the intended semantics? I personally find multi-pop unexpected. It's relatively racy, in that if I want to distribute (unique) items in from single stream, i need to create my own locking around it. Conversely, if every sleeper dequeues a distinct node, the current behavior can be restored via Nothing in the test-suite breaks if this is changed (other than the topmost |
It could be that
|
Removing the API is obviously out of question, it's used by numerous projects. I have no idea what was the intended semantic, since it was added well before my time. @diml ? |
Meant: removed from public api documentation. |
Well, I always thought that having several consumers for the same stream was a bit fishy and it wasn't worth making Note that the current documentation of @pqwy the change you suggest is not optimal: if you have 100 loops reading from the stream concurrently, every time an element is pushed the 100 threads will wakeup but only one will get the element. Worse, you can easily build a pattern where one loop gets everything and the others get nothing. I think this would be the case with this example: let rec loop n st =
Lwt_stream.next st >>= fun x ->
Printf.printf "loop %d got %d\n%!" n x;
Lwt_main.yield () >>= fun () ->
loop n st
let main () =
let st, push = Lwt_stream.create () in
for i = 1 to 100 do
Lwt.async (fun () -> loop i st)
done;
let rec loop () =
push (Some 1);
Lwt_unix.sleep 0.1 >>= loop
in
loop ()
let () = Lwt_main.run main You need to queue waiters manually to be sure this doesn't happen |
Will be addressed as part of #250. This issue is linked from there. Discussion partially continued there. |
Running this:
results in same item from Lwt_stream returned to all waiting theads:
The text was updated successfully, but these errors were encountered: