From 3f3068ed9d7e45a653307861d1225cd06c240197 Mon Sep 17 00:00:00 2001 From: "Hezekiah M. Carty" Date: Fri, 13 Oct 2017 23:42:11 -0600 Subject: [PATCH] Support disposal of Lwt_pool.t elements --- src/core/lwt_pool.ml | 35 +++++++++++++++++++++++++++-------- src/core/lwt_pool.mli | 38 ++++++++++++++++++++++++++------------ test/core/test_lwt_pool.ml | 20 +++++++++++++++++--- 3 files changed, 70 insertions(+), 23 deletions(-) diff --git a/src/core/lwt_pool.ml b/src/core/lwt_pool.ml index 9631197ca6..4c3cf1f737 100644 --- a/src/core/lwt_pool.ml +++ b/src/core/lwt_pool.ml @@ -34,6 +34,8 @@ type 'a t = { (* Check a member when its use failed. *) validate : 'a -> bool Lwt.t; (* Validate old pool members. *) + dispose : 'a -> unit Lwt.t; + (* Dispose of a pool member. *) max : int; (* Size of the pool. *) mutable count : int; @@ -44,11 +46,12 @@ type 'a t = { (* Threads waiting for a member. *) } -let create m ?(check = fun _ f -> f true) ?(validate = fun _ -> Lwt.return_true) create = +let create m ?(check = fun _ f -> f true) ?(validate = fun _ -> Lwt.return_true) ?(dispose = fun _ -> Lwt.return_unit) create = { max = m; create = create; validate = validate; check = check; + dispose = dispose; count = 0; list = Queue.create (); waiters = Lwt_sequence.create () } @@ -104,10 +107,12 @@ let check_elt p c = | false -> (* Remove this member and create a new one. *) p.count <- p.count - 1; + p.dispose c >>= fun () -> create_member p) (fun e -> (* Validation failed: create a new member if at least one thread is waiting. *) + p.dispose c >>= fun () -> replace_acquired p; Lwt.fail e) @@ -127,12 +132,19 @@ let acquire p = (* Release a member when its use failed. *) let checked_release p c = - p.check c begin fun ok -> - if ok then - release p c - else - replace_acquired p - end + let ok = ref false in + p.check c (fun result -> ok := result); + if !ok then ( + (* Element is ok - release it back to the pool *) + release p c; + Lwt.return_unit + ) + else ( + (* Element is not ok - dispose of it and replace with a new one *) + p.dispose c >>= fun () -> + replace_acquired p; + Lwt.return_unit + ) let use p f = acquire p >>= fun c -> @@ -143,5 +155,12 @@ let use p f = release p c; t) (fun e -> - checked_release p c; + checked_release p c >>= fun () -> Lwt.fail e) + +let clear p = + Queue.fold ( + fun promise element -> + promise >>= fun () -> + p.dispose element + ) Lwt.return_unit p.list diff --git a/src/core/lwt_pool.mli b/src/core/lwt_pool.mli index a6238878fe..35dd617f15 100644 --- a/src/core/lwt_pool.mli +++ b/src/core/lwt_pool.mli @@ -27,31 +27,45 @@ The pool also provides a limit on the number of connections that can simultaneously be open. - Note that pools are not for keeping Lwt threads. Lwt threads are very cheap - to create and are pure. It is neither desirable nor possible to reuse them. - If you want to have a pool of {e system} threads, consider module + Note that pools are not for keeping Lwt promises. Lwt promises are very + cheap to create. It is neither desirable nor possible to reuse them. If + you want to have a pool of {e system} threads, consider using [Lwt_preemptive]. *) type 'a t - (** Pools containing values of type ['a]. *) + (** Pools containing elements of type ['a]. *) val create : int -> ?check : ('a -> (bool -> unit) -> unit) -> ?validate : ('a -> bool Lwt.t) -> + ?dispose : ('a -> unit Lwt.t) -> (unit -> 'a Lwt.t) -> 'a t - (** [create n ?check ?validate f] creates a new pool with at most - [n] elements. [f] is the function to use to create a new element + (** [create n ?check ?validate ?dispose f] creates a new pool with at most + [n] elements. [f] is the function to use to create a new element. Elements are created on demand. - An element of the pool is validated by the optional [validate] - function before its {!use}. Invalid elements are re-created. + An element of the pool is validated by the optional [validate] function + before it is accessed by {!use}. Invalid elements are passed to [dispose] + and then re-created with [f]. - The optional function [check] is called after a [use] of an - element failed. It must call its argument exactly once with - [true] if the element is still valid and [false] otherwise. *) + If a call to {!use} fails with a pool element that element will be passed + to the optional function [check] as [check element callback]. [check] + must call [callback] exactly once with [true] if [element] is still valid + and [false] otherwise. If [check] calls [callback false] then [dispose] + will be run on [element]. + + Note that [dispose] is {b not} guaranteed to be called on the elements in + a pool when the pool is garbage collected. The {!clear} function should + be used if the elements of the pool need to be explicitly disposed of. *) val use : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t (** [use p f] takes one free element of the pool [p] and gives it to the function [f]. The element is put back into the pool after the - thread created by [f] completes. *) + promise created by [f] completes. *) + +val clear : 'a t -> unit Lwt.t + (** [clear p] will call the [dispose] function associated with [p] on every + element in [p] if [dispose] was defined. Otherwise it is a no-op. + + Disposals are performed sequentially in an undefined order. *) diff --git a/test/core/test_lwt_pool.ml b/test/core/test_lwt_pool.ml index 22c2d65523..efed530830 100644 --- a/test/core/test_lwt_pool.ml +++ b/test/core/test_lwt_pool.ml @@ -18,6 +18,7 @@ * 02111-1307, USA. *) +open Lwt.Infix open Test exception Dummy_error @@ -100,15 +101,28 @@ let suite = Lwt.return (Lwt.state u2 = Lwt.Return 2) ); - test "on check, bad elements are replaced" + test "on check, bad elements are disposed of and replaced" (fun () -> let gen = (fun () -> let n = ref 1 in Lwt.return n) in let c = (fun n f -> f (!n > 0)) in - let p = Lwt_pool.create 1 ~check: c gen in + let disposed = ref false in + let d _ = disposed := true; Lwt.return_unit in + let p = Lwt_pool.create 1 ~check: c ~dispose:d gen in let task = (fun n -> n := !n + 1; Lwt.return !n) in let _ = Lwt_pool.use p (fun n -> n := 0; Lwt.fail Dummy_error) in let u2 = Lwt_pool.use p task in - Lwt.return (Lwt.state u2 = Lwt.Return 2) + Lwt.return (Lwt.state u2 = Lwt.Return 2 && !disposed) + ); + + test "clear disposes of all elements" + (fun () -> + let gen = (fun () -> let n = ref 1 in Lwt.return n) in + let count = ref 0 in + let d _ = incr count; Lwt.return_unit in + let p = Lwt_pool.create 1 ~dispose:d gen in + let _ = Lwt_pool.use p (fun _ -> Lwt.return_unit) in + Lwt_pool.clear p >>= fun () -> + Lwt.return (!count = 1) ); test "waiter are notified on replacement"