-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flaws of Lwt_stream #250
Comments
I don't have the solution(s), but issues with
I think |
Thanks. Agreed.
I would like to move this along. Of course,
If |
An example for a potential open Lwt.Infix
let map_n ?(max_threads = 1) f stream =
begin
if max_threads <= 0 then
Lwt.fail_invalid_arg "map_n: max_threads must be > 0"
else
Lwt.return_unit
end >>= fun () ->
let rec loop running available =
begin
if available > 0 then (
Lwt.return (running, available)
)
else (
Lwt.nchoose_split running >>= fun (complete, running) ->
Lwt.return (running, available + List.length complete)
)
end >>= fun (running, available) ->
Lwt_stream.get stream >>= function
| None ->
Lwt.join running
| Some elt ->
loop (f elt :: running) (pred available)
in
loop [] max_threads |
I agree with @hcarty, it doesn't suck, but it has shortcomings. A small suggestion on the |
@c-cube That was my first attempt, but |
Consider a finite stream that yields
Presently, neither of these is the case. The number of dequeuing events , Operationally, if there is an element available in the stream, the following In operational terms, this makes certain sense. Semantically, it makes no sense whatsoever, and makes streams between impossible to reason about, and plain useless, in situations with multiple readers. This, purely semantic, hitch is my greatest gripe about streams, and it's the one I complained about in #151. The fix proposed really only addressed the semantics, but it can both lead to thread starvation and causes needless work. I have a draft of the updated stream implementation, which takes care of explicitly queuing the readers. It guarantees the behavior 2 without these downsides. In addition, it makes the implementation shorter. If there is any interest, I can flesh it out, benchmark it against the current version, and post for review. Alternatively, I was playing with the idea of publishing this implementation as a separate package. A further issue is that push-based streams are conflated with pull-based streams, and it can be argued that push-streams should be more like channels, while pull-streams should be more like lists, but I can live with the two being lumped into one as this didn't bite me in practice. Edit: Forgot to add that the test suite passes when the concurrent-readers behavior is changed, and I could not provoke other programs to misbehave with that code. So changing this is totally feasible. |
That's an interesting analysis. I like this semantics of "dequeuing events". Note that For the type 'a llist = 'a llist_cell Lwt.t lazy_t
and 'a llist_cell =
| Nil
| Cons of 'a * 'a llist |
@pqwy A desire to have your second case with |
I'd like to agree with @c-cube and @hcarty above that |
@aantron Are you interested in a PR for the |
@hcarty If you could wait, and we discuss it later, that would be best. I am still pretty far off in the backlog from being able to give this topic the consideration it deserves. It wouldn't be fair to ask you to do work when I can't respond to it properly. But PRs are always appreciated in principle, so thank you for offering :) |
I was experimenting with alternate implementations for streams and, after some discussions with @aantron (#257), created this:
This uses the most basic primitive of LWT as its data structure, so I don't think there could be an implementation that uses less resources. Each element of the stream is a thread obtained from When the push function goes out of scope, the final node in the stream becomes @pqwy: This implementation promises More complicated requirements (e.g. bounded streams) could be built from this. I find, however, the simplicity of this example appealing. As is, it appears to be a very efficient way to broadcast data to multiple threads. |
@rneswold just a nitpick, to me, handling non-memory resources using the GC is quite bad. If you wait until a value is finalized to close the stream (and release the underlying resources such as file descriptors) it will probably be too late, and the program will fail with "too many file descriptors are open". Same, requiring to exhaust the stream before it closes is really inefficient (what if I read the first lines of a 100MB file, then do not need the line iterator? reading the whole file before closing it is quite inefficient). I'd argue that a |
@c-cube In general, I agree with you. Many types of resources should be reclaimed as soon as possible. In the case of streams, however, we can make an exception:
I've been reading about ways to write software so that many runtime errors can be caught at compile time1 (by designing the API in such a way that it's impossible to get in the runtime error state.) My stream implementation follows this in that you can only add items to the stream. When the push function goes out of scope, the stream is closed. There's no need to check at run-time whether you're pushing values on a closed stream because it's impossible. This stream model is very useful and efficient in broadcasting data to LWT threads. I think your concern is that, if we read a file and convert it to a stream of lines, the file handle is tied to the lifetime of the stream. This is a valid concern and I'm glad you brought it up. In fact, I encourage criticisms. But rather than write-off this implementation, I'd like to see if there's a way to support your use-cases with minimal change to the core design. |
Wow. My reasoning and math sounded good until I started measuring. The current bench.ml : https://gist.github.com/rneswold/b47e3ce141ae12e10695972de2fba68f I set |
I am working a lot with NodeJS and |
Here's an updated functional stream implementation I've been playing with: https://gist.github.com/rneswold/0d80560a80314ce3f1aa57a64ee406dd @aantron pointed out to me the equivalency between Since the implementation is based on It still doesn't address @c-cube 's concerns of lazy consumption. I tried writing I provide this example to further discussion of this issue. At the very least, I think I'll package this up as a separate OPAM package once I finish the remaining functions (to make it closely compatible with |
I disagree. The Gc runs depending on how much resources you allocate. If you use a custom block with finalizer you can fine tune how often the Gc should run. For anything holding file descriptors I would say that should be at least every 100 file descriptors. That way you can have 900 files open and being reachable and still never run out of file descriptors. As soon as you get to 1000 open file descriptors the Gc would run and get you back to the 900 still reachable ones so you never hit the 1024 limit. If I see a problem here it's more that standard file decriptors are not custom blocks but simply ints. So they aren't Gc'ed at all. They are not counted as resource other than the 4/8 bytes of memory they take up in some record or tuple. And then the Gc happily creates 1 million of them before the minor heap gets processed. Obviously then you run out of file descriptors. The above only works if wrap them into a custom block at a higher level. |
@mrvn I didn't know you could do such things in custom blocks. But as things stand now, the GC will not protect you against this very annoying error of "too many file descriptors", and it is not clear (to me) how Lwt can change that. |
@c-cube You would have to wrap every data structure containing a file decriptor in Lwt with a custom block. The Unix.file_descr should have been a custom block in the first place and close itself when it becomes unreachable (unless already closed explicitly). Only way to tell the Gc about the cost of some external resource that I know of, i.e. make it run more often. |
That is why I prefer RAII-like approaches, especially |
@aantron should Lwt_stream be used for production code at all? In the documentation page (http://ocsigen.org/lwt/dev/api/Lwt_stream) you suggest to look at @c-cube's lwt-pipe, but that is also marked as incomplete -- while the functionalities I tried seem to work fine. I like the CML approach to concurrency and it would be nice to have something that we could rely upon with Lwt. The other aspect of channels that I like is that they enforce the share nothing, thus should make it easier to have code that will run properly on ocaml multicore. What is your recommendation? Thanks very much in advance! A+ |
@kydos yep, The idea of that message is to avoid people using |
Thank for the prompt response @aantron. I agree that the note on the documentation should be updated as it is perhaps a bit too discouraging :-) Keep up with the great work on Lwt! A+ |
@aantron, to give you some more context, we are using OCaml and Lwt to build a secure and high performance broker for the zenoh protocol. If you are familiar with NDN (Named Data Networking), zenoh addresses the problem of NDN while taking into account the new requirements posed by IoT. Anyway, I'd like to maintain the "impure" code well isolated and use channels to compose the different processing stages. OCaml requires some discipline to ensure that no mutable state is ever transmitted across a channel, but let's stay we enforce that currently by design and careful coding. As you can imagine we need the channels (or streams) connecting the various stages to be quite performant. Thus I did some quick benchmarking and my findings give the following rankings in terms of raw throughput as well as stability of the throughout:
Thus, going back to the reflection on your statement from yesterday, it looks like Lwt_stream is the way to go when using Lwt. That said, we want to use Lwt not only because we like it but also because of MirageOS. If you are interested I can contribute the test code. Ciao, P.S. Have you done much experimenting with Lwt on ocaml-multicore? I was it is one of the open issues. Once you have something working, if you want us to do some benchmarking on throughput would be happy to do so. |
That's interesting. I'd be curious to know by how much |
@kydos I've done only some rudimentary experiments with Lwt on multicore. I'll be sure to get back when there is something concrete. |
@c-cube, on the in-bound processing path the writer is the transport plug-in, which may be reading data from TCP/IP, UDP/IP, raw ETH, etc., and producing protocol frames. Next is our protocol engine. Thus in this case the reader may be slower than the writer, especially on high throughput networks or with large numbers of entries on our routing tables. In any case the test I did was synthetic. The ultimate truth would be provided by running throughput tests on our broker and seeing what the difference really is. Perhaps I'll have somebody in our team looking at this, but won't be probably before a month. I'll let you know. |
@c-cube I'll run the test and will let you know what I get. A+ |
Closing, as there are no plans to do anything with/about |
Please tell why
Lwt_stream
doesn't work for you, ideally at the level of semantics (i.e. prefer "lazy streams aren't right for our use case because blahblah" to "Lwt_stream
is missing a specific function that can easily be added").Hopefully we can
Lwt_stream
towards deprecation,cc @c-cube @hcarty @seliopou @Drup @diml
Please pull in anyone else who has interest/expertise.
Some existing anguish: #239, #151.
EDIT: Despite the issue title, thank you to @diml.
EDIT: Note also #155.
The text was updated successfully, but these errors were encountered: