Skip to content

Commit

Permalink
Lwt_io.shutdown_server: wait for close to finish
Browse files Browse the repository at this point in the history
Resolves #259.
  • Loading branch information
aantron committed Nov 25, 2016
1 parent 41053b0 commit 45ce74d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 23 deletions.
21 changes: 15 additions & 6 deletions src/unix/lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1412,10 +1412,12 @@ let with_connection ?fd ?in_buffer ?out_buffer sockaddr f =
(fun () -> close_if_not_closed ic <&> close_if_not_closed oc)

type server = {
shutdown : unit Lazy.t;
shutdown : unit Lwt.t Lazy.t;
}

let shutdown_server server = Lazy.force server.shutdown
let shutdown_server_2 server = Lazy.force server.shutdown

let shutdown_server server = Lwt.async (fun () -> shutdown_server_2 server)

let establish_server ?fd ?(buffer_size = !default_buffer_size) ?(backlog=5) sockaddr f =
let sock = match fd with
Expand All @@ -1426,7 +1428,9 @@ let establish_server ?fd ?(buffer_size = !default_buffer_size) ?(backlog=5) sock
Lwt_unix.bind sock sockaddr;
Lwt_unix.listen sock backlog;
let abort_waiter, abort_wakener = Lwt.wait () in
let abort_waiter = abort_waiter >>= fun _ -> Lwt.return `Shutdown in
let abort_waiter = abort_waiter >>= fun () -> Lwt.return `Shutdown in
(* Signals that the listening socket has been closed. *)
let closed_waiter, closed_wakener = Lwt.wait () in
let rec loop () =
Lwt.pick [Lwt_unix.accept sock >|= (fun x -> `Accept x); abort_waiter] >>= function
| `Accept(fd, addr) ->
Expand All @@ -1439,15 +1443,17 @@ let establish_server ?fd ?(buffer_size = !default_buffer_size) ?(backlog=5) sock
loop ()
| `Shutdown ->
Lwt_unix.close sock >>= fun () ->
match sockaddr with
(match sockaddr with
| Unix.ADDR_UNIX path when path <> "" && path.[0] <> '\x00' ->
Unix.unlink path;
Lwt.return_unit
| _ ->
Lwt.return_unit
Lwt.return_unit) >>= fun () ->
Lwt.wakeup closed_wakener ();
Lwt.return_unit
in
ignore (loop ());
{ shutdown = lazy(Lwt.wakeup abort_wakener `Shutdown) }
{ shutdown = lazy (Lwt.wakeup abort_wakener (); closed_waiter) }

let establish_server_safe ?fd ?buffer_size ?backlog sockaddr f =
let best_effort_close channel =
Expand Down Expand Up @@ -1526,4 +1532,7 @@ module Versioned =
struct
let establish_server_1 = establish_server
let establish_server_2 = establish_server_safe

let shutdown_server_1 = shutdown_server
let shutdown_server_2 = shutdown_server_2
end
33 changes: 29 additions & 4 deletions src/unix/lwt_io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,19 @@ To use the safer version immediately, use Lwt_io.Versioned.establish_server_2"]
by [f] completes. *)

val shutdown_server : server -> unit
(** Close the given server's listening socket. This function does not wait for
the close operation to actually complete. It does not affect the sockets
of connections that have already been accepted, i.e. passed to [f] by
[establish_server]. *)
[@@ocaml.deprecated
"This function will soon evaluate to a thread that waits for the close system
call to complete. This will be a breaking change for some builds. See
https://github.com/ocsigen/lwt/issues/259
To keep the current signature, use Lwt_io.Versioned.shutdown_server_1
To use the new version immediately, use Lwt_io.Versioned.shutdown_server_2"]
(** Closes the given server's listening socket. This function does not wait
for the [close] operation to actually complete. It does not affect the
sockets of connections that have already been accepted, i.e. passed to [f]
by [establish_server].
@deprecated Will be replaced by {!Versioned.shutdown_server_2}, which
evaluates to a thread that waits for [close] to complete. *)

val lines_of_file : file_name -> string Lwt_stream.t
(** [lines_of_file name] returns a stream of all lines of the file
Expand Down Expand Up @@ -604,4 +613,20 @@ sig
connection raises an exception, it is passed to
[!Lwt.async_exception_hook]. To handle exceptions raised by [close], call
it manually inside [f]. *)

val shutdown_server_1 : server -> unit
[@@ocaml.deprecated
"Deprecated in favor of Lwt_io.Versioned.shutdown_server_2. See
https://github.com/ocsigen/lwt/issues/259"]
(** Alias for the current {!Lwt_io.shutdown_server}.
@deprecated Use {!shutdown_server_2}. *)

val shutdown_server_2 : server -> unit Lwt.t
(** Closes the given server's listening socket. The thread returned by this
function waits for the underlying [close] system call to complete.
This function does not affect sockets of connections that have already
been accepted by the server, i.e. those passed by [establish_server] to
its callback [f]. *)
end
27 changes: 14 additions & 13 deletions tests/unix/test_lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ let with_async_exception_hook hook f =

let local = Unix.ADDR_INET (Unix.inet_addr_loopback, 4321)

(* Add small delay to help ensure [close] system calls are issued on listening
sockets as a result of calling [Lwt_io.shutdown_server], before proceeding.
In the future, it would be better if [Lwt_io.shutdown_server] produced a
thread that could be waited on. *)
let shutdown_server_and_wait server =
Lwt_io.shutdown_server server;
Lwt_unix.sleep 0.05

(* Helpers for [establish_server_2] tests. *)
module Establish_server =
struct
Expand All @@ -67,7 +59,7 @@ struct
in

client_finished >>= fun () ->
shutdown_server_and_wait server
Lwt_io.Versioned.shutdown_server_2 server

(* Dirty hack for forcing [Lwt_io.close] to fail, to test response to [close]
exceptions. Impolitely closes the [n]th last file descriptor allocated by
Expand Down Expand Up @@ -192,7 +184,7 @@ let suite = suite "lwt_io" [

with_connection local (fun _ -> Lwt.return_unit) >>= fun () ->
Lwt.wakeup client_finished ();
shutdown_server_and_wait server >>= fun () ->
Lwt_io.Versioned.shutdown_server_2 server >>= fun () ->
handler);

(* Counterpart to establish_server: shutdown test. Confirms that shutdown is
Expand All @@ -216,7 +208,7 @@ let suite = suite "lwt_io" [

>>= fun result ->

shutdown_server_and_wait server >|= fun () ->
Lwt_io.Versioned.shutdown_server_2 server >|= fun () ->
result);

test "establish_server_2: implicit close"
Expand All @@ -243,6 +235,12 @@ let suite = suite "lwt_io" [
in

run >>= fun () ->
(* Give a little time for the close system calls on the connection sockets
to complete. The Lwt_io and Lwt_unix APIs do not currently allow
binding on the implicit closes of these sockets, so resorting to a
delay. *)
Lwt_unix.sleep 0.05 >>= fun () ->

is_closed_in !in_channel' >>= fun in_closed_after_handler ->
is_closed_out !out_channel' >|= fun out_closed_after_handler ->

Expand Down Expand Up @@ -274,9 +272,12 @@ let suite = suite "lwt_io" [
run

>>= fun () ->
(* See comment in other implicit close test. *)
Lwt_unix.sleep 0.05 >>= fun () ->

is_closed_in !in_channel' >>= fun in_closed_after_handler ->
is_closed_out !out_channel' >|= fun out_closed_after_handler ->

in_closed_after_handler && out_closed_after_handler);

(* This does a simple double close of the channels (second close is implicit).
Expand Down Expand Up @@ -376,7 +377,7 @@ let suite = suite "lwt_io" [
Lwt.return_unit)

>>= fun () ->
shutdown_server_and_wait server >>= fun () ->
Lwt_io.Versioned.shutdown_server_2 server >>= fun () ->
is_closed_in !in_channel' >>= fun in_closed ->
is_closed_out !out_channel' >|= fun out_closed ->
in_closed && out_closed);
Expand Down Expand Up @@ -416,6 +417,6 @@ let suite = suite "lwt_io" [

>>= fun () ->
Lwt.wakeup resume_server ();
shutdown_server_and_wait server >|= fun () ->
Lwt_io.Versioned.shutdown_server_2 server >|= fun () ->
!exceptions_observed = 2);
]

0 comments on commit 45ce74d

Please sign in to comment.