Skip to content

Commit

Permalink
Breaking: switch to safer Lwt_io.establish_server
Browse files Browse the repository at this point in the history
Originally added in #258 and exposed in #260, 41053b0.
  • Loading branch information
aantron committed Apr 8, 2017
1 parent 431355a commit 502cc71
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 67 deletions.
11 changes: 7 additions & 4 deletions src/unix/lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1465,14 +1465,17 @@ let establish_server_base

server, started

let establish_server ?fd ?buffer_size ?backlog sockaddr f =
(* Old, deprecated version of [establish_server]. This function has to persist
for a while, in some form, until it is no longer exposed as
[Lwt_io.Versioned.establish_server_1]. *)
let establish_server_deprecated ?fd ?buffer_size ?backlog sockaddr f =
let blocking_bind fd addr =
Lwt.return (Lwt_unix.Versioned.bind_1 fd addr) [@ocaml.warning "-3"]
in
establish_server_base blocking_bind ?fd ?buffer_size ?backlog sockaddr f
|> fst

let establish_server_safe
let establish_server
?fd ?buffer_size ?backlog ?(no_close = false) sockaddr f =
let best_effort_close channel =
(* First, check whether the channel is closed. f may have already tried to
Expand Down Expand Up @@ -1556,8 +1559,8 @@ let default_buffer_size _ = !default_buffer_size

module Versioned =
struct
let establish_server_1 = establish_server
let establish_server_2 = establish_server_safe
let establish_server_1 = establish_server_deprecated
let establish_server_2 = establish_server

let shutdown_server_1 = shutdown_server
let shutdown_server_2 = shutdown_server_2
Expand Down
89 changes: 35 additions & 54 deletions src/unix/lwt_io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -417,43 +417,34 @@ val establish_server :
?fd : Lwt_unix.file_descr ->
?buffer_size : int ->
?backlog : int ->
Unix.sockaddr -> (input_channel * output_channel -> unit) -> server
[@@ocaml.deprecated
" The signature and semantics of this function will soon change:
- the callback parameter f will evaluate to a promise (-> unit Lwt.t),
- channels will be closed automatically when that promise resolves, to avoid
leaking file descriptors, and
- the result will be a promise (-> server Lwt.t).
This will be breaking change in Lwt 3.0.0. See
https://github.com/ocsigen/lwt/pull/258
To keep the current functionality, use Lwt_io.Versioned.establish_server_1
To use the safer version immediately, use Lwt_io.Versioned.establish_server_2
Both alternatives require Lwt >= 2.7.0."]
(** [establish_server ?fd ?buffer_size ?backlog sockaddr f] creates a server
which listens for incoming connections. New connections are passed to [f].
?no_close : bool ->
Unix.sockaddr -> (input_channel * output_channel -> unit Lwt.t) ->
server Lwt.t
(** [establish_server sockaddr f] creates a server which listens for incoming
connections on [sockaddr]. New connections are passed to [f]. The
connections are closed automatically as promises returned by [f] complete.
[establish_server] does not start separate threads for running [f], nor
close the connections passed to [f]. Thus, the skeleton of a practical
server based on [establish_server] might look like this:
To prevent automatic closing, apply [establish_server] with
[~no_close:true].
{[
Lwt_io.establish_server address (fun (ic, oc) ->
Lwt.async (fun () ->
[~fd] can be specified to use an existing file descriptor for listening.
Otherwise, the default is for [establish_server] to create a fresh one.
(* ... *)
[~backlog] is the argument passed to {!Lwt_unix.listen}.
Lwt.catch (fun () -> Lwt_io.close oc) (fun _ -> Lwt.return_unit) >>=
Lwt.catch (fun () -> Lwt_io.close ic) (fun _ -> Lwt.return_unit)))
]}
The server does not wait on each promise returned by [f] before accepting
more connections. It accepts connections concurrently.
If [fd] is not specified, a fresh file descriptor will be created for
listening.
If [f] raises an exception, or the promise fails, the exception is passed to
{!Lwt.async_exception_hook}. Likewise, if the automatic [close] of a
connection raises an exception, it is passed to {!Lwt.async_exception_hook}.
To robustly handle these exceptions, you should call {!close} manually
inside [f], and wrap it in your own handler.
[backlog] is the argument passed to [Lwt_unix.listen].
The returned promise (a [server Lwt.t]) resolves when the server's listening
socket is bound to [sockaddr], right before the server first calls [accept].
@deprecated Will be replaced by {!Versioned.establish_server_2}, which
closes the channels passed to [f] automatically when a thread returned
by [f] completes. *)
@since 3.0.0 *)

val shutdown_server : server -> unit
[@@ocaml.deprecated
Expand Down Expand Up @@ -590,13 +581,17 @@ sig
?fd : Lwt_unix.file_descr ->
?buffer_size : int ->
?backlog : int ->
Unix.sockaddr -> (input_channel * output_channel -> unit) -> server
[@@ocaml.deprecated
" Deprecated in favor of Lwt_io.Versioned.establish_server_2. See
Unix.sockaddr -> (input_channel * output_channel -> unit) ->
server
[@@ocaml.deprecated
" Deprecated in favor of Lwt_io.establish_server. See
https://github.com/ocsigen/lwt/pull/258"]
(** Alias for the current {!Lwt_io.establish_server}.
(** Old version of {!Lwt_io.establish_server}. The current
{!Lwt_io.establish_server} automatically closes channels passed to the
callback, and notifies the caller when the server's listening socket is
bound.
@deprecated Use {!establish_server_2}.
@deprecated Use {!Lwt_io.establish_server}.
@since 2.7.0 *)

val establish_server_2 :
Expand All @@ -606,29 +601,15 @@ sig
?no_close : bool ->
Unix.sockaddr -> (input_channel * output_channel -> unit Lwt.t) ->
server Lwt.t
(** [establish_server_2 sockaddr f] creates a server which listens for
incoming connections. New connections are passed to [f]. When threads
returned by [f] complete, the connections are closed automatically. To
prevent automatic closing, apply [establish_server_2] with
[~no_close:true].
The [?fd] and [?backlog] arguments have the same meaning as in
{!Lwt_io.establish_server}. [?buffer_size] sets the internal buffer size
of the channels passed to [f].
The server does not wait for each thread. It begins accepting new
connections immediately.
If a thread raises an exception, it is passed to
[!Lwt.async_exception_hook]. Likewise, if the automatic [close] of a
connection raises an exception, it is passed to
[!Lwt.async_exception_hook]. To robustly handle these exceptions, you
should call [close] manually inside [f], and use your own handler.
[@@ocaml.deprecated
" In Lwt >= 3.0.0, this is an alias for Lwt_io.establish_server."]
(** Since Lwt 3.0.0, this is just an alias for {!Lwt_io.establish_server}.
@deprecated Use {!Lwt_io.establish_server}.
@since 2.7.0 *)

val shutdown_server_1 : server -> unit
[@@ocaml.deprecated
[@@ocaml.deprecated
" Deprecated in favor of Lwt_io.Versioned.shutdown_server_2. See
https://github.com/ocsigen/lwt/issues/259"]
(** Alias for the current {!Lwt_io.shutdown_server}.
Expand Down
18 changes: 9 additions & 9 deletions tests/unix/test_lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ let with_async_exception_hook hook f =

let local = Unix.ADDR_INET (Unix.inet_addr_loopback, 4321)

(* Helpers for [establish_server_2] tests. *)
(* Helpers for [establish_server] tests. *)
module Establish_server =
struct
let with_client f =
let handler_finished, notify_handler_finished = Lwt.wait () in

Lwt_io.Versioned.establish_server_2
Lwt_io.establish_server
local
(fun channels ->
Lwt.finalize
Expand Down Expand Up @@ -210,7 +210,7 @@ let suite = suite "lwt_io" [
Lwt_io.Versioned.shutdown_server_2 server >|= fun () ->
result);

test "establish_server_2: implicit close"
test "establish_server: implicit close"
(fun () ->
let open Establish_server in

Expand Down Expand Up @@ -248,7 +248,7 @@ let suite = suite "lwt_io" [
in_closed_after_handler &&
out_closed_after_handler);

test "establish_server_2: implicit close on exception"
test "establish_server: implicit close on exception"
(fun () ->
let open Establish_server in

Expand Down Expand Up @@ -282,7 +282,7 @@ let suite = suite "lwt_io" [
(* This does a simple double close of the channels (second close is implicit).
If something breaks, the test will finish with an exception, or
Lwt.async_exception_hook will kill the process. *)
test "establish_server_2: explicit close"
test "establish_server: explicit close"
(fun () ->
let open Establish_server in

Expand All @@ -307,7 +307,7 @@ let suite = suite "lwt_io" [
sockets again, the exception will go to Lwt.async_exception_hook and kill
the tester. The correct behavior is for implicit close to do nothing if the
user already tried to close the sockets. *)
test "establish_server_2: no duplicate exceptions"
test "establish_server: no duplicate exceptions"
~only_if:(fun () -> not Sys.win32)
(fun () ->
let open Establish_server in
Expand Down Expand Up @@ -335,7 +335,7 @@ let suite = suite "lwt_io" [
(* Screws up the open sockets so closing them fails with EBADF. Then, raises
an exception from the handler. Checks that the handler exception arrives
at Lwt.async_exception_hook before the exceptions from implicit close. *)
test "establish_server_2: order of exceptions"
test "establish_server: order of exceptions"
~only_if:(fun () -> not Sys.win32)
(fun () ->
let open Establish_server in
Expand Down Expand Up @@ -367,7 +367,7 @@ let suite = suite "lwt_io" [
let in_channel' = ref Lwt_io.stdin in
let out_channel' = ref Lwt_io.stdout in

Lwt_io.Versioned.establish_server_2 local (fun _ -> Lwt.return_unit)
Lwt_io.establish_server local (fun _ -> Lwt.return_unit)
>>= fun server ->

Lwt_io.with_connection local (fun (in_channel, out_channel) ->
Expand Down Expand Up @@ -400,7 +400,7 @@ let suite = suite "lwt_io" [

let handler_started, notify_handler_started = Lwt.wait () in
let finish_server, resume_server = Lwt.wait () in
Lwt_io.Versioned.establish_server_2 local
Lwt_io.establish_server local
(fun _ ->
Lwt.wakeup notify_handler_started ();
finish_server) >>= fun server ->
Expand Down

0 comments on commit 502cc71

Please sign in to comment.