Skip to content

Commit

Permalink
Rpc higher priority than build jobs (ocaml#4743)
Browse files Browse the repository at this point in the history
* refactor scheduler event sources

* actually switch the order

Signed-off-by: Arseniy Alekseyev <[email protected]>
  • Loading branch information
aalekseyev authored Jun 17, 2021
1 parent 26f5e57 commit 7310d10
Showing 1 changed file with 103 additions and 50 deletions.
153 changes: 103 additions & 50 deletions src/dune_engine/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ end = struct
end

module Queue = struct
type event = t

type t =
{ jobs_completed : (job * Proc.Process_info.t) Queue.t
; mutable invalidation_events : Invalidation_event.t list
Expand Down Expand Up @@ -265,60 +267,111 @@ end = struct
q.yield <- Some ivar;
Fiber.Ivar.read ivar

module Event_source : sig
type queue := t

type t

val signal : t

val invalidation : t

val jobs_completed : t

val worker_tasks_completed : t

val yield : t

val chain : t list -> t

val run : t -> queue -> event option
end = struct
type queue = t

type t = queue -> event option

let run t q = t q

let signal : t =
fun q ->
Option.map (Signal.Set.choose q.signals) ~f:(fun signal ->
q.signals <- Signal.Set.remove q.signals signal;
Signal signal)

let invalidation q =
match q.invalidation_events with
| [] -> None
| events -> (
q.invalidation_events <- [];
let terminated = ref false in
let events =
List.filter_map events ~f:(function
| Filesystem_event Sync -> Some (Sync : build_input_change)
| Invalidation invalidation ->
Some (Invalidation invalidation : build_input_change)
| Filesystem_event Watcher_terminated ->
terminated := true;
None
| Filesystem_event (File_changed path) ->
let abs_path = Path.to_absolute_filename path in
if Table.mem q.ignored_files abs_path then (
(* only use ignored record once *)
Table.remove q.ignored_files abs_path;
None
) else
(* CR-soon amokhov: Generate more precise events. *)
Some (Fs_event (Fs_memo.Event.create ~kind:Unknown ~path)))
in
match !terminated with
| true -> Some File_system_watcher_terminated
| false ->
Option.map (Nonempty_list.of_list events) ~f:(fun events ->
Build_inputs_changed events))

let jobs_completed q =
Option.map (Queue.pop q.jobs_completed) ~f:(fun (job, proc_info) ->
q.pending_jobs <- q.pending_jobs - 1;
assert (q.pending_jobs >= 0);
Job_completed (job, proc_info))

let worker_tasks_completed q =
Option.map (Queue.pop q.worker_tasks_completed) ~f:(fun fill ->
q.pending_worker_tasks <- q.pending_worker_tasks - 1;
Worker_task fill)

let yield q =
Option.map q.yield ~f:(fun ivar ->
q.yield <- None;
Yield ivar)

let chain list q = List.find_map list ~f:(fun f -> f q)
end

let next q =
Option.iter q.stats ~f:Dune_stats.record_gc_and_fd;
Mutex.lock q.mutex;
let rec loop () =
match Signal.Set.choose q.signals with
| Some signal ->
q.signals <- Signal.Set.remove q.signals signal;
Signal signal
| None -> (
match q.invalidation_events with
| [] -> (
match Queue.pop q.jobs_completed with
| None -> (
match Queue.pop q.worker_tasks_completed with
| Some fill ->
q.pending_worker_tasks <- q.pending_worker_tasks - 1;
Worker_task fill
| None -> (
match q.yield with
| Some ivar ->
q.yield <- None;
Yield ivar
| None -> wait ()))
| Some (job, proc_info) ->
q.pending_jobs <- q.pending_jobs - 1;
assert (q.pending_jobs >= 0);
Job_completed (job, proc_info))
| events -> (
q.invalidation_events <- [];
let terminated = ref false in
let events =
List.filter_map events ~f:(function
| Filesystem_event Sync -> Some (Sync : build_input_change)
| Invalidation invalidation ->
Some (Invalidation invalidation : build_input_change)
| Filesystem_event Watcher_terminated ->
terminated := true;
None
| Filesystem_event (File_changed path) ->
let abs_path = Path.to_absolute_filename path in
if Table.mem q.ignored_files abs_path then (
(* only use ignored record once *)
Table.remove q.ignored_files abs_path;
None
) else
(* CR-soon amokhov: Generate more precise events. *)
Some (Fs_event (Fs_memo.Event.create ~kind:Unknown ~path)))
in
match !terminated with
| true -> File_system_watcher_terminated
| false -> (
match Nonempty_list.of_list events with
| None -> loop ()
| Some events -> Build_inputs_changed events)))
match
Event_source.(
run
(chain
(* Event sources are listed in priority order. Signals are the
highest priority to maximize responsiveness to Ctrl+C.
[worker_tasks_completed] and [invalidation] is used for
reacting to user input, so their latency is also important.
[jobs_completed] and [yield] are where the bulk of the work
is done, so they are the lowest priority to avoid starving
other things. *)
[ signal
; invalidation
; worker_tasks_completed
; jobs_completed
; yield
]))
q
with
| None -> wait ()
| Some event -> event
and wait () =
q.got_event <- false;
Condition.wait q.cond q.mutex;
Expand Down

0 comments on commit 7310d10

Please sign in to comment.