-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Lwt_stream.iter_n #312
Conversation
`iter_n` provides concurrent iteration over a stream with an upper bound on the level of concurrency used.
See #250 and #250 (comment) in the comment thread. @aantron For what it's worth, I use this all over my code and it's absence from |
Async defines: type how = [
| `Parallel
| `Sequential
| `Max_concurrent_jobs of int
] and iter operations are defined like this: val Deferred.List.iter : ?how:how -> 'a list -> f:('a -> unit Deferred.t) -> unit Deferred.t Maybe it would be useful for here. |
@agarwal Thanks - that seems promising. Could be worth adding for 3.0.0 or 4.0.0 to make this functionality more discoverable. If we are going to have another 2.x release I'd like to see |
The CI failures look like they are unrelated to this PR. |
CI failures indeed unrelated to the PR. I restarted the failed builds, but the AppVeyor one might fail again. This seems desirable, but I will have to delay taking a thorough look at it (including @agarwal's suggestion) for some time (weeks or months), due to other Lwt tasks in my queue. |
If we take @agarwal's suggestion then it would ideally apply throughout at least Lwt_stream and Lwt_list. |
Yes. Indeed, in all cases, it would be nice to maintain some degree of consistency between |
On Fri, Feb 10, 2017 at 04:46:31PM -0800, Anton wrote:
Yes. Indeed, in all cases, it would be nice to maintain some degree of consistency between `Lwt_stream` and `Lwt_list`, as well as the availability of the various `_s`, `_p`, and `_n` functions (if any).
The motivating example (performing concurrent HTTP requests over a list/stream
of URLs while limiting concurrency) sounds like a task for a... pool.
However, the common idiom
Lwt_list.iter_p (fun x -> Lwt_pool.use pool (fun resource -> foo resource x)) l
is unsatisfactory because it creates lots of thunks that will block on
Lwt_pool.use. Lwt is able to cope with many threads, but this is clearly no
good when working with Lwt_stream.iter_p.
So here's an idea: what if, instead of adding a iter_n function to each
iterable thing, we provide a function in the pool/region with the right
semantics, of type
val enter_p : t -> int -> (unit -> unit Lwt.t) -> unit Lwt.t
that (1) blocks when the region is full, yet (2) spawns the promises in the
background (like iter_p), and (3) if any promise fails, fails with the same
error.
This would allow to turn any iter_s into a bounded-concurrency iter_n by
doing
let reg = Region.make concurrency in
iter_s (fun x -> Region.enter_p reg 1 (fun () -> f x)) >>= fun () ->
Region.wait reg (* needed to wait for the last [concurrency] tasks to end,
* not required in the infinite Lwt_stream use-case *)
(when working with pooled resources, the region would be attached to a pool on
creation)
It is more verbose than having an iter_n per structure, but it only needs
to be implemented once in Lwt_pool and would be more powerful than a mere
iter_n because it can work with resource pools (so you limit concurrency
across concurrent iter_s-turned-into-iter_n invocations). It also would allow
to adjust concurrency dynamically by resizing the pool/region (e.g.
congestion control when performing many concurrent requests against a server).
If the general mechanism is implemented, the more convenient
Lwt_list.iter_n and Lwt_stream.iter_n could be built trivially atop
it.
…--
Mauricio Fernández
|
On Tue, Feb 21, 2017 at 03:03:25PM +0100, Mauricio Fernández wrote:
This would allow to turn any iter_s into a bounded-concurrency iter_n by
doing
let reg = Region.make concurrency in
iter_s (fun x -> Region.enter_p reg 1 (fun () -> f x)) >>= fun () ->
Region.wait reg (* needed to wait for the last [concurrency] tasks to end,
* not required in the Lwt_stream use-case *)
I have adapted the long-discarded (Lwt_util IIRC) region code in this POC:
https://gist.github.com/mfp/0359cd3ad6494648ae63d2b49819b27e
…--
Mauricio Fernández
|
On Thu, Mar 02, 2017 at 03:25:40AM -0800, Gabriel Radanne wrote:
@mfp Additionally, this looks very much like an lwt version of @c-cube's [sequence](https://github.com/c-cube/sequence/).
That's an interesting parallelism I hadn't realized.
So we have the trivial parallel execution combinator and the bounded
concurrency combinator I POC'ed above. Do you think any of Sequence's
combinators would be of practical use with a "Lwt-styled sequence"
[('a -> unit Lwt.t) -> unit Lwt.t]?
I feel the issue is getting entangled with the more general #250.
…--
Mauricio Fernández
|
@mfp Honestly, I have no idea. My gut feeling is "probably", but I never used any of them, really. There is a clear distinction between Lwt_stream and "Sequence-like for lwt", the amount of control is not at all the same. I feel like these kind of combinators should wait a bit for the new iterators that might/should land in ocaml itself. |
b8b3168
to
0773186
Compare
I think we will merge this into |
@Drup not sure the standard iterator type (if it ever gets merged) will be suitable for Lwt… A proper blocking iterator would have to return a |
Thank you :) |
iter_n
provides concurrent iteration over a stream with an upper boundon the level of concurrency used.