From dbf0c251fe06adface106927cd996a215e3dc8f7 Mon Sep 17 00:00:00 2001 From: Arseniy Alekseyev Date: Tue, 8 Jun 2021 11:28:04 +0100 Subject: [PATCH] Remove the Waiting_for_file_changes as a separate state Signed-off-by: Arseniy Alekseyev --- src/dune_engine/scheduler.ml | 238 ++++++++++++++++++++++------------- 1 file changed, 150 insertions(+), 88 deletions(-) diff --git a/src/dune_engine/scheduler.ml b/src/dune_engine/scheduler.ml index d76454c22d3..8ecf11d607b 100644 --- a/src/dune_engine/scheduler.ml +++ b/src/dune_engine/scheduler.ml @@ -553,31 +553,95 @@ end = struct let init q = ignore (Thread.create run q : Thread.t) end -type waiting_for_file_changes = - | Shutdown_requested - | Build_inputs_changed of Memo.Invalidation.t - type status = | (* Ready to start the next build. Waiting for a signal from the user, the - test harness, or the polling loop. The payload is the collection of - filesystem events. *) - Standing_by of - Memo.Invalidation.t - | (* Waiting for file changes to start a new a build *) - Waiting_for_file_changes of - waiting_for_file_changes Fiber.Ivar.t + test harness, or the polling loop. *) + Standing_by | (* Waiting for the propagation of inotify events to finish before starting a build. *) - Waiting_for_inotify_sync of - Memo.Invalidation.t * unit Fiber.Ivar.t + Waiting_for_inotify_sync of unit Fiber.Ivar.t | (* Running a build *) Building | (* Cancellation requested. Build jobs are immediately rejected in this state *) - Restarting_build of - Memo.Invalidation.t + Restarting_build | (* Shut down requested. No new new builds will start *) Shutting_down +(** A variable accumulating the build input changes. + If the variable is full, then some of the memoized build results need + to be invalidated and if the build is in progress, it needs to be restarted. *) +module Invalidation_accumulator_var : sig + type t + + val global : t + + (** The "adding" side-effect happens immediately. The fill then needs to be processed + to make sure the notification is sent to the reader. *) + val add : t -> Memo.Invalidation.t -> Fiber.fill option + + (** Take the value out of this variable and replace it with an empty one. *) + val take : t -> Memo.Invalidation.t + + type wait_result = + | Invalidated + | Shutdown_requested + + val shutdown_requested : t -> Fiber.fill option + + val wait_nonempty : t -> wait_result Fiber.t + +end = struct + + type wait_result = + | Invalidated + | Shutdown_requested + + (* invariant: if [reader] is [Some] then [accum] is empty *) + type t = { + mutable accum : Memo.Invalidation.t; + mutable reader : wait_result Fiber.Ivar.t option; + } + + let global = { + accum = Memo.Invalidation.empty; + reader = None; + } + + let wait_nonempty t = + Fiber.of_thunk (fun () -> + match t.reader with + | Some _ -> failwith "multiple readers" + | None -> + match Memo.Invalidation.is_empty t.accum with + | false -> Fiber.return Invalidated + | true -> + let ivar = Fiber.Ivar.create () in + t.reader <- Some ivar; + Fiber.Ivar.read ivar) + + let shutdown_requested t = match t.reader with + | None -> None + | Some ivar -> Some (Fiber.Fill (ivar, Shutdown_requested)) + + (* invariant violation is tolerated on pre-condition *) + let notify_if_nonempty t = + match Memo.Invalidation.is_empty t.accum, t.reader with + | false, Some reader -> + t.reader <- None; + Some (Fiber.Fill (reader, Invalidated)) + | true, _ | _, None -> None + + let add t x = + t.accum <- Memo.Invalidation.combine t.accum x; + notify_if_nonempty t + + let take t = + let res = t.accum in + t.accum <- Memo.Invalidation.empty; + res + +end + module Handler = struct module Event = struct type build_result = @@ -629,13 +693,11 @@ let with_job_slot f = let* t = t () in let raise_if_cancelled () = match t.status with - | Restarting_build _ - | Shutting_down -> - raise (Memo.Non_reproducible (Failure "Build cancelled")) | Building -> () - | Waiting_for_file_changes _ + | Shutting_down | Restarting_build -> + raise (Memo.Non_reproducible (Failure "Build cancelled")) | Waiting_for_inotify_sync _ - | Standing_by _ -> + | Standing_by -> (* At this stage, we're not running a build, so we shouldn't be running tasks here. *) assert false @@ -752,48 +814,45 @@ end = struct match Event.Queue.next t.events with | Job_completed (job, proc_info) -> Fiber.Fill (job.ivar, proc_info) | Build_inputs_changed events -> ( - let invalidation = - (handle_invalidation_events events : Memo.Invalidation.t) - in - let have_sync = - List.exists (Nonempty_list.to_list events) ~f:(function - | (Sync : Event.build_input_change) -> true - | _ -> false) - in - match Memo.Invalidation.is_empty invalidation && not have_sync with - | true -> iter t (* Ignore the event *) - | false -> ( - match t.status with - | Shutting_down -> iter t - | Restarting_build prev_invalidation -> - t.status <- - Restarting_build - (Memo.Invalidation.combine prev_invalidation invalidation); - (* We're already cancelling build, so file change events don't matter *) - iter t - | Standing_by prev_invalidation -> - t.status <- - Standing_by - (Memo.Invalidation.combine prev_invalidation invalidation); - iter t - | Building -> - t.handler t.config Build_interrupted; - t.status <- Restarting_build invalidation; - Process_watcher.killall t.process_watcher Sys.sigkill; - iter t - | Waiting_for_file_changes ivar -> - Fill (ivar, Build_inputs_changed invalidation) - | Waiting_for_inotify_sync (prev_invalidation, ivar) -> - let invalidation = - Memo.Invalidation.combine prev_invalidation invalidation - in - if have_sync then ( - t.status <- Standing_by invalidation; - Fill (ivar, ()) - ) else ( - t.status <- Waiting_for_inotify_sync (invalidation, ivar); - iter t - ))) + let invalidation = + (handle_invalidation_events events : Memo.Invalidation.t) + in + let have_sync = + List.exists (Nonempty_list.to_list events) ~f:(function + | (Sync : Event.build_input_change) -> true + | _ -> false) + in + match Memo.Invalidation.is_empty invalidation && not have_sync with + | true -> iter t (* Ignore the event *) + | false -> ( + let notification_needed = + Invalidation_accumulator_var.add Invalidation_accumulator_var.global invalidation + in + match t.status, have_sync, notification_needed with + | Waiting_for_inotify_sync ivar, true, None -> + t.status <- Standing_by; + Fill (ivar, ()) + | Waiting_for_inotify_sync _, true, Some _ -> + (* notification needed in [Waiting_for_inotify_sync] state *) + assert false + | _, true, _ -> + failwith "Got an unexpected inotify sync event" + | _ -> + let process_notifications () = + match notification_needed with + | Some fill -> fill + | None -> iter t + in + let () = + match t.status with + | Building -> + t.handler t.config Build_interrupted; + t.status <- Restarting_build; + Process_watcher.killall t.process_watcher Sys.sigkill; + | _ -> () + in + process_notifications () + )) | Worker_task fill -> fill | File_system_watcher_terminated -> filesystem_watcher_terminated (); @@ -803,7 +862,7 @@ end = struct raise (Abort Already_reported) | Yield ivar -> Fill (ivar, ()) - let run t f : _ result = + let run t f : _ result = let fiber = set t (fun () -> Fiber.map_reduce_errors @@ -884,8 +943,12 @@ module Run = struct let poll_iter t step = (match t.status with - | Standing_by invalidations -> Memo.reset invalidations - | _ -> + | Standing_by -> + let invalidations = + Invalidation_accumulator_var.take Invalidation_accumulator_var.global + in + Memo.reset invalidations + | _ -> Code_error.raise "[poll_iter]: expected the build status [Standing_by]" []); t.status <- Building; let+ res = @@ -893,11 +956,10 @@ module Run = struct match t.status with | Building -> Dune_util.Report_error.report exn | Shutting_down - | Restarting_build _ -> + | Restarting_build -> () - | Standing_by _ -> assert false - | Waiting_for_file_changes _ -> - (* We are inside a build, so we aren't waiting for a file change event *) + | Standing_by -> + (* We are inside a build, so we can't be Standing_by *) assert false | Waiting_for_inotify_sync _ -> (* We only use inotify sync between the builds, not in the middle of @@ -914,14 +976,13 @@ module Run = struct (fun () -> step ~report_error) in match t.status with - | Waiting_for_file_changes _ | Waiting_for_inotify_sync _ - | Standing_by _ -> + | Standing_by -> (* We just finished a build, so there's no way this was set *) assert false | Shutting_down -> Build_outcome.Shutdown - | Restarting_build invalidations -> - t.status <- Standing_by invalidations; + | Restarting_build -> + t.status <- Standing_by; Build_outcome.Cancelled_due_to_file_changes | Building -> let build_result : Handler.Event.build_result = @@ -930,7 +991,7 @@ module Run = struct | Ok _ -> Success in t.handler t.config (Build_finish build_result); - t.status <- Standing_by Memo.Invalidation.empty; + t.status <- Standing_by; Build_outcome.Finished res type handle_outcome_result = @@ -939,7 +1000,7 @@ module Run = struct let poll_gen ~get_build_request = let* t = t () in (match t.status with - | Building -> t.status <- Standing_by Memo.Invalidation.empty + | Building -> t.status <- Standing_by | _ -> assert false); let rec loop () = let* (build_request, handle_outcome) = get_build_request t in @@ -962,13 +1023,12 @@ module Run = struct | Shutdown -> Fiber.return (Shutdown) | Cancelled_due_to_file_changes -> Fiber.return Proceed | Finished _res -> - let ivar = Fiber.Ivar.create () in - t.status <- Waiting_for_file_changes ivar; - let* next = Fiber.Ivar.read ivar in - match next with - | Shutdown_requested -> Fiber.return Shutdown - | Build_inputs_changed invalidations -> - t.status <- Standing_by invalidations; + let* result = + Invalidation_accumulator_var.wait_nonempty Invalidation_accumulator_var.global + in + match result with + | Shutdown_requested -> Fiber.return (Shutdown) + | Invalidated -> t.handler t.config Source_files_changed; Fiber.return Proceed in @@ -977,8 +1037,8 @@ module Run = struct let wait_for_inotify_sync t = let ivar = Fiber.Ivar.create () in match t.status with - | Standing_by invalidation -> - t.status <- Waiting_for_inotify_sync (invalidation, ivar); + | Standing_by -> + t.status <- Waiting_for_inotify_sync ivar; Fiber.Ivar.read ivar | _ -> assert false @@ -1002,7 +1062,7 @@ module Run = struct let handle_outcome (res : Build_outcome.t) = let+ () = Fiber.Ivar.fill response_ivar (match res with - | Finished (Ok _) -> Build_outcome_for_rpc.Success + | Finished (Ok _) -> Build_outcome_for_rpc.Success | Finished (Error _) | Cancelled_due_to_file_changes | Shutdown -> @@ -1060,9 +1120,11 @@ let wait_for_process pid = let shutdown () = let* t = t () in let fill_file_changes = - match t.status with - | Waiting_for_file_changes ivar -> Fiber.Ivar.fill ivar Shutdown_requested - | _ -> Fiber.return () + match + Invalidation_accumulator_var.shutdown_requested Invalidation_accumulator_var.global + with + | None -> Fiber.return () + | Some (Fill (ivar, v)) -> Fiber.Ivar.fill ivar v in t.status <- Shutting_down; Process_watcher.killall t.process_watcher Sys.sigkill;