Skip to content

Commit

Permalink
Remove the Waiting_for_file_changes as a separate state
Browse files Browse the repository at this point in the history
Signed-off-by: Arseniy Alekseyev <[email protected]>
  • Loading branch information
aalekseyev committed Jun 8, 2021
1 parent 8a3bf97 commit dbf0c25
Showing 1 changed file with 150 additions and 88 deletions.
238 changes: 150 additions & 88 deletions src/dune_engine/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -553,31 +553,95 @@ end = struct
let init q = ignore (Thread.create run q : Thread.t)
end

type waiting_for_file_changes =
| Shutdown_requested
| Build_inputs_changed of Memo.Invalidation.t

type status =
| (* Ready to start the next build. Waiting for a signal from the user, the
test harness, or the polling loop. The payload is the collection of
filesystem events. *)
Standing_by of
Memo.Invalidation.t
| (* Waiting for file changes to start a new a build *)
Waiting_for_file_changes of
waiting_for_file_changes Fiber.Ivar.t
test harness, or the polling loop. *)
Standing_by
| (* Waiting for the propagation of inotify events to finish before starting a
build. *)
Waiting_for_inotify_sync of
Memo.Invalidation.t * unit Fiber.Ivar.t
Waiting_for_inotify_sync of unit Fiber.Ivar.t
| (* Running a build *)
Building
| (* Cancellation requested. Build jobs are immediately rejected in this state *)
Restarting_build of
Memo.Invalidation.t
Restarting_build
| (* Shut down requested. No new new builds will start *)
Shutting_down

(** A variable accumulating the build input changes.
If the variable is full, then some of the memoized build results need
to be invalidated and if the build is in progress, it needs to be restarted. *)
module Invalidation_accumulator_var : sig
type t

val global : t

(** The "adding" side-effect happens immediately. The fill then needs to be processed
to make sure the notification is sent to the reader. *)
val add : t -> Memo.Invalidation.t -> Fiber.fill option

(** Take the value out of this variable and replace it with an empty one. *)
val take : t -> Memo.Invalidation.t

type wait_result =
| Invalidated
| Shutdown_requested

val shutdown_requested : t -> Fiber.fill option

val wait_nonempty : t -> wait_result Fiber.t

end = struct

type wait_result =
| Invalidated
| Shutdown_requested

(* invariant: if [reader] is [Some] then [accum] is empty *)
type t = {
mutable accum : Memo.Invalidation.t;
mutable reader : wait_result Fiber.Ivar.t option;
}

let global = {
accum = Memo.Invalidation.empty;
reader = None;
}

let wait_nonempty t =
Fiber.of_thunk (fun () ->
match t.reader with
| Some _ -> failwith "multiple readers"
| None ->
match Memo.Invalidation.is_empty t.accum with
| false -> Fiber.return Invalidated
| true ->
let ivar = Fiber.Ivar.create () in
t.reader <- Some ivar;
Fiber.Ivar.read ivar)

let shutdown_requested t = match t.reader with
| None -> None
| Some ivar -> Some (Fiber.Fill (ivar, Shutdown_requested))

(* invariant violation is tolerated on pre-condition *)
let notify_if_nonempty t =
match Memo.Invalidation.is_empty t.accum, t.reader with
| false, Some reader ->
t.reader <- None;
Some (Fiber.Fill (reader, Invalidated))
| true, _ | _, None -> None

let add t x =
t.accum <- Memo.Invalidation.combine t.accum x;
notify_if_nonempty t

let take t =
let res = t.accum in
t.accum <- Memo.Invalidation.empty;
res

end

module Handler = struct
module Event = struct
type build_result =
Expand Down Expand Up @@ -629,13 +693,11 @@ let with_job_slot f =
let* t = t () in
let raise_if_cancelled () =
match t.status with
| Restarting_build _
| Shutting_down ->
raise (Memo.Non_reproducible (Failure "Build cancelled"))
| Building -> ()
| Waiting_for_file_changes _
| Shutting_down | Restarting_build ->
raise (Memo.Non_reproducible (Failure "Build cancelled"))
| Waiting_for_inotify_sync _
| Standing_by _ ->
| Standing_by ->
(* At this stage, we're not running a build, so we shouldn't be running
tasks here. *)
assert false
Expand Down Expand Up @@ -752,48 +814,45 @@ end = struct
match Event.Queue.next t.events with
| Job_completed (job, proc_info) -> Fiber.Fill (job.ivar, proc_info)
| Build_inputs_changed events -> (
let invalidation =
(handle_invalidation_events events : Memo.Invalidation.t)
in
let have_sync =
List.exists (Nonempty_list.to_list events) ~f:(function
| (Sync : Event.build_input_change) -> true
| _ -> false)
in
match Memo.Invalidation.is_empty invalidation && not have_sync with
| true -> iter t (* Ignore the event *)
| false -> (
match t.status with
| Shutting_down -> iter t
| Restarting_build prev_invalidation ->
t.status <-
Restarting_build
(Memo.Invalidation.combine prev_invalidation invalidation);
(* We're already cancelling build, so file change events don't matter *)
iter t
| Standing_by prev_invalidation ->
t.status <-
Standing_by
(Memo.Invalidation.combine prev_invalidation invalidation);
iter t
| Building ->
t.handler t.config Build_interrupted;
t.status <- Restarting_build invalidation;
Process_watcher.killall t.process_watcher Sys.sigkill;
iter t
| Waiting_for_file_changes ivar ->
Fill (ivar, Build_inputs_changed invalidation)
| Waiting_for_inotify_sync (prev_invalidation, ivar) ->
let invalidation =
Memo.Invalidation.combine prev_invalidation invalidation
in
if have_sync then (
t.status <- Standing_by invalidation;
Fill (ivar, ())
) else (
t.status <- Waiting_for_inotify_sync (invalidation, ivar);
iter t
)))
let invalidation =
(handle_invalidation_events events : Memo.Invalidation.t)
in
let have_sync =
List.exists (Nonempty_list.to_list events) ~f:(function
| (Sync : Event.build_input_change) -> true
| _ -> false)
in
match Memo.Invalidation.is_empty invalidation && not have_sync with
| true -> iter t (* Ignore the event *)
| false -> (
let notification_needed =
Invalidation_accumulator_var.add Invalidation_accumulator_var.global invalidation
in
match t.status, have_sync, notification_needed with
| Waiting_for_inotify_sync ivar, true, None ->
t.status <- Standing_by;
Fill (ivar, ())
| Waiting_for_inotify_sync _, true, Some _ ->
(* notification needed in [Waiting_for_inotify_sync] state *)
assert false
| _, true, _ ->
failwith "Got an unexpected inotify sync event"
| _ ->
let process_notifications () =
match notification_needed with
| Some fill -> fill
| None -> iter t
in
let () =
match t.status with
| Building ->
t.handler t.config Build_interrupted;
t.status <- Restarting_build;
Process_watcher.killall t.process_watcher Sys.sigkill;
| _ -> ()
in
process_notifications ()
))
| Worker_task fill -> fill
| File_system_watcher_terminated ->
filesystem_watcher_terminated ();
Expand All @@ -803,7 +862,7 @@ end = struct
raise (Abort Already_reported)
| Yield ivar -> Fill (ivar, ())

let run t f : _ result =
let run t f : _ result =
let fiber =
set t (fun () ->
Fiber.map_reduce_errors
Expand Down Expand Up @@ -884,20 +943,23 @@ module Run = struct

let poll_iter t step =
(match t.status with
| Standing_by invalidations -> Memo.reset invalidations
| _ ->
| Standing_by ->
let invalidations =
Invalidation_accumulator_var.take Invalidation_accumulator_var.global
in
Memo.reset invalidations
| _ ->
Code_error.raise "[poll_iter]: expected the build status [Standing_by]" []);
t.status <- Building;
let+ res =
let report_error exn =
match t.status with
| Building -> Dune_util.Report_error.report exn
| Shutting_down
| Restarting_build _ ->
| Restarting_build ->
()
| Standing_by _ -> assert false
| Waiting_for_file_changes _ ->
(* We are inside a build, so we aren't waiting for a file change event *)
| Standing_by ->
(* We are inside a build, so we can't be Standing_by *)
assert false
| Waiting_for_inotify_sync _ ->
(* We only use inotify sync between the builds, not in the middle of
Expand All @@ -914,14 +976,13 @@ module Run = struct
(fun () -> step ~report_error)
in
match t.status with
| Waiting_for_file_changes _
| Waiting_for_inotify_sync _
| Standing_by _ ->
| Standing_by ->
(* We just finished a build, so there's no way this was set *)
assert false
| Shutting_down -> Build_outcome.Shutdown
| Restarting_build invalidations ->
t.status <- Standing_by invalidations;
| Restarting_build ->
t.status <- Standing_by;
Build_outcome.Cancelled_due_to_file_changes
| Building ->
let build_result : Handler.Event.build_result =
Expand All @@ -930,7 +991,7 @@ module Run = struct
| Ok _ -> Success
in
t.handler t.config (Build_finish build_result);
t.status <- Standing_by Memo.Invalidation.empty;
t.status <- Standing_by;
Build_outcome.Finished res

type handle_outcome_result =
Expand All @@ -939,7 +1000,7 @@ module Run = struct
let poll_gen ~get_build_request =
let* t = t () in
(match t.status with
| Building -> t.status <- Standing_by Memo.Invalidation.empty
| Building -> t.status <- Standing_by
| _ -> assert false);
let rec loop () =
let* (build_request, handle_outcome) = get_build_request t in
Expand All @@ -962,13 +1023,12 @@ module Run = struct
| Shutdown -> Fiber.return (Shutdown)
| Cancelled_due_to_file_changes -> Fiber.return Proceed
| Finished _res ->
let ivar = Fiber.Ivar.create () in
t.status <- Waiting_for_file_changes ivar;
let* next = Fiber.Ivar.read ivar in
match next with
| Shutdown_requested -> Fiber.return Shutdown
| Build_inputs_changed invalidations ->
t.status <- Standing_by invalidations;
let* result =
Invalidation_accumulator_var.wait_nonempty Invalidation_accumulator_var.global
in
match result with
| Shutdown_requested -> Fiber.return (Shutdown)
| Invalidated ->
t.handler t.config Source_files_changed;
Fiber.return Proceed
in
Expand All @@ -977,8 +1037,8 @@ module Run = struct
let wait_for_inotify_sync t =
let ivar = Fiber.Ivar.create () in
match t.status with
| Standing_by invalidation ->
t.status <- Waiting_for_inotify_sync (invalidation, ivar);
| Standing_by ->
t.status <- Waiting_for_inotify_sync ivar;
Fiber.Ivar.read ivar
| _ -> assert false

Expand All @@ -1002,7 +1062,7 @@ module Run = struct
let handle_outcome (res : Build_outcome.t) =
let+ () = Fiber.Ivar.fill response_ivar
(match res with
| Finished (Ok _) -> Build_outcome_for_rpc.Success
| Finished (Ok _) -> Build_outcome_for_rpc.Success
| Finished (Error _)
| Cancelled_due_to_file_changes
| Shutdown ->
Expand Down Expand Up @@ -1060,9 +1120,11 @@ let wait_for_process pid =
let shutdown () =
let* t = t () in
let fill_file_changes =
match t.status with
| Waiting_for_file_changes ivar -> Fiber.Ivar.fill ivar Shutdown_requested
| _ -> Fiber.return ()
match
Invalidation_accumulator_var.shutdown_requested Invalidation_accumulator_var.global
with
| None -> Fiber.return ()
| Some (Fill (ivar, v)) -> Fiber.Ivar.fill ivar v
in
t.status <- Shutting_down;
Process_watcher.killall t.process_watcher Sys.sigkill;
Expand Down

0 comments on commit dbf0c25

Please sign in to comment.