Skip to content

Commit

Permalink
Switch to using nonblocking fds with lwt
Browse files Browse the repository at this point in the history
Summary:
This is the result of a week long investigation into Flow
occasionally hanging. Here's what you should know:

* A read/write of a blocking fd will block until the fd is readable/writable
* A read/write of a nonblocking fd will return -1 immediately if the fd is not readable/writable
* Lwt super extra hates running blocking system calls. It will create system threads and run the blocking system call on them
* There's a bug in Lwt in scheduling the system threads: ocsigen/lwt#569

From our point of view, there's very little difference in using blocking
or non-blocking fds. We had just been using blocking fds since I
couldn't really tell if one was better than the other,

However, using non-blocking keeps us from needing system threads and
works around the bug, so let's use them!

Reviewed By: samwgoldman

Differential Revision: D7464034

fbshipit-source-id: e0ba602381a8bef7dd374ee1cd5fb0fdef9ad7d9
  • Loading branch information
gabelevi authored and facebook-github-bot committed Apr 3, 2018
1 parent b55ba90 commit e314362
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 11 deletions.
4 changes: 2 additions & 2 deletions hack/dfind/dfindLibLwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ struct
let return = Lwt.return
let (>>=) = Lwt.(>>=)

let descr_of_in_channel ic = Lwt_unix.of_unix_file_descr (Daemon.descr_of_in_channel ic)
let descr_of_out_channel oc = Lwt_unix.of_unix_file_descr (Daemon.descr_of_out_channel oc)
let descr_of_in_channel ic = Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true (Daemon.descr_of_in_channel ic)
let descr_of_out_channel oc = Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true (Daemon.descr_of_out_channel oc)

let to_fd_with_preamble ?timeout ?flags fd v =
if timeout <> None
Expand Down
4 changes: 2 additions & 2 deletions hack/procs/workerControllerLwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ let call w (type a) (type b) (f : a -> b) (x : a) : b Lwt.t =

let infd = Daemon.descr_of_in_channel inc in
let outfd = Daemon.descr_of_out_channel outc in
let infd_lwt = Lwt_unix.of_unix_file_descr ~blocking:true infd in
let outfd_lwt = Lwt_unix.of_unix_file_descr ~blocking:true outfd in
let infd_lwt = Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true infd in
let outfd_lwt = Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true outfd in

let request = wrap_request w f x in

Expand Down
8 changes: 5 additions & 3 deletions src/monitor/flowServerMonitor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ let internal_start ~is_daemon ?waiting_fd monitor_options =
let handle_waiting_start_command = match waiting_fd with
| None -> Lwt.return_unit
| Some fd ->
let fd = Lwt_unix.of_unix_file_descr ~blocking:true fd in
let fd = Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true fd in
handle_waiting_start_command fd
in

Expand All @@ -139,11 +139,13 @@ let internal_start ~is_daemon ?waiting_fd monitor_options =
(* We can start up the socket acceptor even before the server starts *)
Lwt.async (fun () ->
SocketAcceptor.run
(Lwt_unix.of_unix_file_descr ~blocking:true monitor_socket_fd)
(Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true monitor_socket_fd)
monitor_options.FlowServerMonitorOptions.autostop
);
Lwt.async (fun () ->
SocketAcceptor.run_legacy (Lwt_unix.of_unix_file_descr ~blocking:true legacy_socket_fd)
SocketAcceptor.run_legacy (
Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true legacy_socket_fd
)
);

(* Wait forever! Mwhahahahahaha *)
Expand Down
4 changes: 2 additions & 2 deletions src/monitor/flowServerMonitorServer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ end = struct
let in_fd =
ic
|> Daemon.descr_of_in_channel
|> Lwt_unix.of_unix_file_descr ~blocking:true in
|> Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true in
let out_fd =
oc
|> Daemon.descr_of_out_channel
|> Lwt_unix.of_unix_file_descr ~blocking:true in
|> Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true in

let close_if_open fd =
try Lwt_unix.close fd
Expand Down
2 changes: 1 addition & 1 deletion src/monitor/logger/flowServerMonitorLogger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ let init_logger log_fd =

let template = "$(date).$(milliseconds) [$(level)] $(message)" in

let log_fd = Option.map log_fd ~f:Lwt_unix.of_unix_file_descr in
let log_fd = Option.map log_fd ~f:(Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true) in

let fds = Lwt_unix.stderr :: (Option.value_map log_fd ~default:[] ~f:(fun fd -> [fd])) in
Lwt.async (fun () -> WriteLoop.run fds);
Expand Down
3 changes: 2 additions & 1 deletion src/monitor/monitorRPC.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ let with_outfd ~on_disabled ~f = with_channel snd ~on_disabled ~f

(* The main server process will initialize this with the channels to the monitor process *)
let init ~channels:(ic,oc) =
let infd = Lwt_unix.of_unix_file_descr (Daemon.descr_of_in_channel ic) in
let infd =
Lwt_unix.of_unix_file_descr ~blocking:false ~set_flags:true (Daemon.descr_of_in_channel ic) in
let outfd = Daemon.descr_of_out_channel oc in
state := Initialized {infd; outfd}

Expand Down

0 comments on commit e314362

Please sign in to comment.