Skip to content

Commit

Permalink
Add Lwt_stream.iter_n
Browse files Browse the repository at this point in the history
`iter_n` provides concurrent iteration over a stream with an upper bound
on the level of concurrency used.
  • Loading branch information
hcarty authored and aantron committed Feb 5, 2018
1 parent 55bede2 commit ed19c3a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/core/lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,33 @@ let rec iter_p_rec node f s =

let iter_p f s = iter_p_rec s.node f s

let iter_n ?(max_threads = 1) f stream =
begin
if max_threads <= 0 then
let message =
Printf.sprintf "Lwt_stream.iter_n: max_threads must be > 0, %d given"
max_threads
in
invalid_arg message
end;
let rec loop running available =
begin
if available > 0 then (
Lwt.return (running, available)
)
else (
Lwt.nchoose_split running >>= fun (complete, running) ->
Lwt.return (running, available + List.length complete)
)
end >>= fun (running, available) ->
get stream >>= function
| None ->
Lwt.join running
| Some elt ->
loop (f elt :: running) (pred available)
in
loop [] max_threads

let rec find_rec node f s =
if node == !(s.last) then
feed s >>= fun () -> find_rec node f s
Expand Down
7 changes: 7 additions & 0 deletions src/core/lwt_stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
(** [iter f s] iterates over all elements of the stream. *)

val iter_n : ?max_threads:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
(** [iter_n ?max_threads f s] iterates over all elements of the stream [s].
Iteration is performed concurrently with up to [max_threads] concurrent
instances of [f].
@param max_threads defaults to [1]. *)

val find : ('a -> bool) -> 'a t -> 'a option Lwt.t
val find_s : ('a -> bool Lwt.t) -> 'a t -> 'a option Lwt.t
(** [find f s] find an element in a stream. *)
Expand Down

0 comments on commit ed19c3a

Please sign in to comment.