Skip to content

Commit

Permalink
fix: disallow multiple build commands (ocaml#6360)
Browse files Browse the repository at this point in the history
Running dune concurrently is no longer allowed because we create the rpc
socket file eagerly and treat it as a lock

Signed-off-by: Rudi Grinberg <[email protected]>
  • Loading branch information
rgrinberg authored Nov 8, 2022
1 parent a814a69 commit 833e99a
Show file tree
Hide file tree
Showing 27 changed files with 558 additions and 66 deletions.
7 changes: 6 additions & 1 deletion bin/build_cmd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ let run_build_command_poll_passive ~(common : Common.t) ~config ~request:_ :
(* CR-someday aalekseyev: It would've been better to complain if [request] is
non-empty, but we can't check that here because [request] is a function.*)
let open Fiber.O in
let rpc = Common.rpc common in
let rpc =
match Common.rpc common with
| `Allow server -> server
| `Forbid_builds ->
Code_error.raise "rpc server must be allowed in passive mode" []
in
Scheduler.go_with_rpc_server_and_console_status_reporting ~common ~config
(fun () ->
Scheduler.Run.poll_passive
Expand Down
1 change: 1 addition & 0 deletions bin/clean.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ let command =
useless but with some FS this also causes [dune clean] to fail (cf
https://github.com/ocaml/dune/issues/2964). *)
let _config = Common.init common ~log_file:No_log_file in
Dune_util.Global_lock.lock_exn ~timeout:None;
Dune_engine.Target_promotion.files_in_source_tree_to_delete ()
|> Path.Set.iter ~f:Path.unlink_no_err;
Path.rm_rf Path.build_dir
Expand Down
26 changes: 23 additions & 3 deletions bin/common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type t =
; build_dir : string
; no_print_directory : bool
; store_orig_src_dir : bool
; rpc : Dune_rpc_impl.Server.t Lazy.t
; rpc : [ `Allow of Dune_rpc_impl.Server.t Lazy.t | `Forbid_builds ]
; default_target : Arg.Dep.t (* For build & runtest only *)
; watch : Watch_mode_config.t
; print_metrics : bool
Expand Down Expand Up @@ -75,7 +75,12 @@ let default_target t = t.default_target

let prefix_target t s = t.root.reach_from_root_prefix ^ s

let rpc t = Lazy.force t.rpc
let rpc t =
match t.rpc with
| `Forbid_builds -> `Forbid_builds
| `Allow rpc -> `Allow (Lazy.force rpc)

let forbid_builds t = { t with rpc = `Forbid_builds }

let stats t = t.stats

Expand Down Expand Up @@ -1012,7 +1017,22 @@ let term ~default_root_is_cwd =
at_exit (fun () -> Dune_stats.close stats);
stats)
in
let rpc = lazy (Dune_rpc_impl.Server.create ~root:root.dir stats) in
let rpc =
`Allow
(lazy
(let registry =
match watch with
| Yes _ -> `Add
| No -> `Skip
in
let lock_timeout =
match watch with
| Yes Passive -> Some 1.0
| _ -> None
in
Dune_rpc_impl.Server.create ~lock_timeout ~registry ~root:root.dir
stats))
in
if store_digest_preimage then Dune_engine.Reversible_digest.enable ();
if print_metrics then (
Memo.Perf_counters.enable ();
Expand Down
10 changes: 9 additions & 1 deletion bin/common.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@ val capture_outputs : t -> bool

val root : t -> Workspace_root.t

val rpc : t -> Dune_rpc_impl.Server.t
val rpc :
t
-> [ `Allow of Dune_rpc_impl.Server.t
(** Will run rpc if in watch mode and acquire the build lock *)
| `Forbid_builds
(** Promise not to build anything. For now, this isn't checked *)
]

val forbid_builds : t -> t

val stats : t -> Dune_stats.t option

Expand Down
24 changes: 22 additions & 2 deletions bin/import.ml
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,47 @@ module Scheduler = struct
(Constant
(Pp.seq message (Pp.verbatim ", waiting for filesystem changes...")))

let rpc server =
{ Dune_engine.Rpc.run = Dune_rpc_impl.Server.run server
; stop = Dune_rpc_impl.Server.stop server
; ready = Dune_rpc_impl.Server.ready server
}

let go ~(common : Common.t) ~config:dune_config f =
let stats = Common.stats common in
let config =
let insignificant_changes = Common.insignificant_changes common in
Dune_config.for_scheduler dune_config stats ~insignificant_changes
~signal_watcher:`Yes
in
let f =
match Common.rpc common with
| `Allow server ->
fun () -> Dune_engine.Rpc.with_background_rpc (rpc server) f
| `Forbid_builds -> f
in
Run.go config ~on_event:(on_event dune_config) f

let go_with_rpc_server_and_console_status_reporting ~(common : Common.t)
~config:dune_config run =
let server =
match Common.rpc common with
| `Allow server -> rpc server
| `Forbid_builds ->
Code_error.raise "rpc must be enabled in polling mode" []
in
let stats = Common.stats common in
let config =
let insignificant_changes = Common.insignificant_changes common in
Dune_config.for_scheduler dune_config stats ~insignificant_changes
~signal_watcher:`Yes
in
let file_watcher = Common.file_watcher common in
let rpc = Common.rpc common in
let run () =
Fiber.fork_and_join_unit (fun () -> Dune_rpc_impl.Server.run rpc) run
let open Fiber.O in
Dune_engine.Rpc.with_background_rpc server @@ fun () ->
let* () = Dune_engine.Rpc.ensure_ready () in
run ()
in
Run.go config ~file_watcher ~on_event:(on_event dune_config) run
end
Expand Down
1 change: 1 addition & 0 deletions bin/install_uninstall.ml
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ let install_uninstall ~what =
"Select context to install from. By default, install files from \
all defined contexts.")
and+ sections = Sections.term in
let common = Common.forbid_builds common in
let config = Common.init ~log_file:No_log_file common in
Scheduler.go ~common ~config (fun () ->
let open Fiber.O in
Expand Down
4 changes: 3 additions & 1 deletion bin/ocaml_merlin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ let term =
debugging purposes only and should not be considered as a stable \
output.")
in
let common = Common.set_print_directory common false in
let common =
Common.set_print_directory common false |> Common.forbid_builds
in
let config = Common.init common ~log_file:No_log_file in
Scheduler.go ~common ~config (fun () ->
match dump_config with
Expand Down
3 changes: 2 additions & 1 deletion bin/rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ let active_server () =
| None -> User_error.raise [ Pp.text "rpc server not running" ]

let client_term common f =
let common = Common.forbid_builds common in
let common = Common.set_print_directory common false in
let config = Common.init ~log_file:No_log_file common in
Scheduler.go ~common ~config f
Expand Down Expand Up @@ -74,7 +75,7 @@ let establish_client_session ~wait =
match connection with
| Ok conn -> Some conn
| Error message ->
Console.print_user_message message;
if not wait then Console.print_user_message message;
None)
in
establish_connection_or_raise ~wait once
Expand Down
16 changes: 11 additions & 5 deletions src/csexp_rpc/csexp_rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ module Server = struct
}

let create fd sockaddr ~backlog =
Unix.set_nonblock fd;
Unix.setsockopt fd Unix.SO_REUSEADDR true;
Socket.bind fd sockaddr;
Unix.listen fd backlog;
let r_interrupt_accept, w_interrupt_accept = Unix.pipe ~cloexec:true () in
Unix.set_nonblock r_interrupt_accept;
Expand Down Expand Up @@ -268,6 +265,7 @@ module Server = struct
[ `Init of Unix.file_descr | `Running of Transport.t | `Closed ]
; backlog : int
; sockaddr : Unix.sockaddr
; ready : unit Fiber.Ivar.t
}

let create sockaddr ~backlog =
Expand All @@ -276,19 +274,27 @@ module Server = struct
(Unix.domain_of_sockaddr sockaddr)
Unix.SOCK_STREAM 0
in
{ sockaddr; backlog; state = `Init fd }
Unix.set_nonblock fd;
Unix.setsockopt fd Unix.SO_REUSEADDR true;
match Socket.bind fd sockaddr with
| exception Unix.Unix_error (EADDRINUSE, _, _) -> Error `Already_in_use
| () ->
Ok { sockaddr; backlog; state = `Init fd; ready = Fiber.Ivar.create () }

let ready t = Fiber.Ivar.read t.ready

let serve (t : t) =
let* async = Worker.create () in
match t.state with
| `Closed -> Code_error.raise "already closed" []
| `Running _ -> Code_error.raise "already running" []
| `Init fd ->
let+ transport =
let* transport =
Worker.task_exn async ~f:(fun () ->
Transport.create fd t.sockaddr ~backlog:t.backlog)
in
t.state <- `Running transport;
let+ () = Fiber.Ivar.fill t.ready () in
let accept () =
Worker.task async ~f:(fun () ->
Transport.accept transport
Expand Down
6 changes: 5 additions & 1 deletion src/csexp_rpc/csexp_rpc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ module Server : sig
(** RPC Server *)
type t

val create : Unix.sockaddr -> backlog:int -> t
val create : Unix.sockaddr -> backlog:int -> (t, [ `Already_in_use ]) result

(** [ready t] returns a fiber that completes when clients can start connecting
to the server *)
val ready : t -> unit Fiber.t

val stop : t -> unit

Expand Down
1 change: 1 addition & 0 deletions src/dune_engine/dune_engine.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ module Report_errors_config = Report_errors_config
module Compound_user_error = Compound_user_error
module Reflection = Reflection
module No_io = No_io
module Rpc = Rpc
49 changes: 49 additions & 0 deletions src/dune_engine/rpc.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
open Import
open Fiber.O

type server =
{ run : unit Fiber.t
; stop : unit Fiber.t
; ready : unit Fiber.t
}

type t =
{ server : server
; pool : Fiber.Pool.t
; mutable state : [ `Awaiting_start | `Running | `Stopped ]
}

let t = Fiber.Var.create ()

let stop ({ state; server; pool } as t) =
let* () = Fiber.return () in
match state with
| `Stopped -> Fiber.return ()
| `Awaiting_start -> Fiber.Pool.stop pool
| `Running ->
t.state <- `Stopped;
Fiber.fork_and_join_unit
(fun () -> Fiber.Pool.stop pool)
(fun () -> server.stop)

let with_background_rpc server f =
let pool = Fiber.Pool.create () in
let v = { state = `Awaiting_start; server; pool } in
Fiber.Var.set t v (fun () ->
Fiber.fork_and_join_unit
(fun () -> Fiber.Pool.run pool)
(fun () -> Fiber.finalize f ~finally:(fun () -> stop v)))

let ensure_ready () =
let* ({ state; server; pool } as t) = Fiber.Var.get_exn t in
match state with
| `Stopped -> Code_error.raise "server already stopped" []
| `Running -> Fiber.return ()
| `Awaiting_start ->
t.state <- `Running;
let* () = Fiber.Pool.task pool ~f:(fun () -> server.run) in
server.ready

let stop () =
let* t = Fiber.Var.get_exn t in
stop t
11 changes: 11 additions & 0 deletions src/dune_engine/rpc.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
type server =
{ run : unit Fiber.t
; stop : unit Fiber.t
; ready : unit Fiber.t
}

val with_background_rpc : server -> (unit -> 'a Fiber.t) -> 'a Fiber.t

val ensure_ready : unit -> unit Fiber.t

val stop : unit -> unit Fiber.t
3 changes: 2 additions & 1 deletion src/dune_rpc_impl/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ module Connection = struct
| Error exn ->
Error
(User_error.make
[ Pp.text "failed to connect to RPC server %s"
[ Pp.textf "failed to connect to RPC server %s"
(Where.to_string where)
; Exn_with_backtrace.pp exn
])

Expand Down
Loading

0 comments on commit 833e99a

Please sign in to comment.