Skip to content

Commit

Permalink
Extract & Generalize parallel queue from compiler
Browse files Browse the repository at this point in the history
Into a standalone module where it can be reused for optimisation work
later on.
  • Loading branch information
ferd committed May 12, 2020
1 parent 90bead2 commit 235543d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 72 deletions.
114 changes: 44 additions & 70 deletions src/rebar_compiler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
%% @doc analysis by the caller, in order to let an OTP app
%% find and resolve all its dependencies as part of compile_all's new
%% API, which presumes a partial analysis is done ahead of time
-spec analyze_all(DAG, [App, ...]) -> ok when
-spec analyze_all(DAG, [App, ...]) -> {map(), [App]} when
DAG :: {module(), digraph:graph()},
App :: rebar_app_info:t().
analyze_all({Compiler, G}, Apps) ->
Expand Down Expand Up @@ -146,7 +146,7 @@ compile_all(Compilers, AppInfo) -> % =< 3.13.0 interface; plugins use this!
lists:foreach(fun(Compiler) ->
OutDir = rebar_app_info:out_dir(AppInfo),
G = rebar_compiler_dag:init(OutDir, Compiler, undefined, []),
Ctx = analyze_all({Compiler, G}, [AppInfo]),
{Ctx, _} = analyze_all({Compiler, G}, [AppInfo]),
compile_analyzed({Compiler, G}, AppInfo, Ctx),
rebar_compiler_dag:maybe_store(G, OutDir, Compiler, undefined, []),
rebar_compiler_dag:terminate(G)
Expand Down Expand Up @@ -188,7 +188,9 @@ run(G, CompilerMod, AppInfo, Contexts) ->
++ case RestFiles of
{Sequential, Parallel} -> % parallelizable form
compile_each(Sequential, Opts, BaseOpts, Mappings, CompilerMod) ++
compile_parallel(Parallel, Opts, BaseOpts, Mappings, CompilerMod);
lists:append(
compile_parallel(Parallel, Opts, BaseOpts, Mappings, CompilerMod)
);
_ when is_list(RestFiles) -> % traditional sequential build
compile_each(RestFiles, Opts, BaseOpts, Mappings, CompilerMod)
end,
Expand Down Expand Up @@ -250,75 +252,47 @@ store_artifacts(G, [{Source, Target, Meta}|Rest]) ->
rebar_compiler_dag:store_artifact(G, Source, Target, Meta),
store_artifacts(G, Rest).

compile_worker(QueuePid, Opts, Config, Outs, CompilerMod) ->
QueuePid ! self(),
receive
{compile, Source} ->
Result =
case erlang:function_exported(CompilerMod, compile_and_track, 4) of
false ->
CompilerMod:compile(Source, Outs, Config, Opts);
true ->
CompilerMod:compile_and_track(Source, Outs, Config, Opts)
end,
QueuePid ! {Result, Source},
compile_worker(QueuePid, Opts, Config, Outs, CompilerMod);
empty ->
ok
end.

compile_parallel([], _Opts, _BaseOpts, _Mappings, _CompilerMod) ->
[];
compile_parallel(Targets, Opts, BaseOpts, Mappings, CompilerMod) ->
Self = self(),
F = fun() -> compile_worker(Self, Opts, BaseOpts, Mappings, CompilerMod) end,
Jobs = min(length(Targets), erlang:system_info(schedulers)),
?DEBUG("Starting ~B compile worker(s)", [Jobs]),
Pids = [spawn_monitor(F) || _I <- lists:seq(1, Jobs)],
compile_queue(Targets, Pids, Opts, BaseOpts, Mappings, CompilerMod).

compile_queue([], [], _Opts, _Config, _Outs, _CompilerMod) ->
[];
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod) ->
Tracking = erlang:function_exported(CompilerMod, compile_and_track, 4),
receive
Worker when is_pid(Worker), Targets =:= [] ->
Worker ! empty,
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod);
Worker when is_pid(Worker) ->
Worker ! {compile, hd(Targets)},
compile_queue(tl(Targets), Pids, Opts, Config, Outs, CompilerMod);
{ok, Source} ->
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod);
{{ok, Tracked}, Source} when Tracking ->
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
Tracked ++
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod);
{{ok, Warnings}, Source} when not Tracking ->
report(Warnings),
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod);
{{ok, Tracked, Warnings}, Source} when Tracking ->
report(Warnings),
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
Tracked ++
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod);
{skipped, Source} ->
?DEBUG("~sSkipped ~s", [rebar_utils:indent(1), Source]),
compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod);
{Error, Source} ->
NewSource = format_error_source(Source, Config),
?ERROR("Compiling ~ts failed", [NewSource]),
maybe_report(Error),
?FAIL;
{'DOWN', Mref, _, Pid, normal} ->
Pids2 = lists:delete({Pid, Mref}, Pids),
compile_queue(Targets, Pids2, Opts, Config, Outs, CompilerMod);
{'DOWN', _Mref, _, _Pid, Info} ->
?ERROR("Compilation failed: ~p", [Info]),
?FAIL
end.
rebar_parallel:queue(
Targets,
fun compile_worker/2, [Opts, BaseOpts, Mappings, CompilerMod],
fun compile_handler/2, [BaseOpts, Tracking]
).

compile_worker(Source, [Opts, Config, Outs, CompilerMod]) ->
Result = case erlang:function_exported(CompilerMod, compile_and_track, 4) of
false ->
CompilerMod:compile(Source, Outs, Config, Opts);
true ->
CompilerMod:compile_and_track(Source, Outs, Config, Opts)
end,
%% Bundle the source to allow proper reporting in the handler:
{Result, Source}.

compile_handler({ok, Source}, _Args) ->
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
ok;
compile_handler({{ok, Tracked}, Source}, [_, Tracking]) when Tracking ->
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
{ok, Tracked};
compile_handler({{ok, Warnings}, Source}, _Args) ->
report(Warnings),
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
ok;
compile_handler({{ok, Tracked, Warnings}, Source}, [_, Tracking]) when Tracking ->
report(Warnings),
?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]),
{ok, Tracked};
compile_handler({skipped, Source}, _Args) ->
?DEBUG("~sSkipped ~s", [rebar_utils:indent(1), Source]),
ok;
compile_handler({Error, Source}, [Config | _Rest]) ->
NewSource = format_error_source(Source, Config),
?ERROR("Compiling ~ts failed", [NewSource]),
maybe_report(Error),
?FAIL.


%% @doc remove compiled artifacts from an AppDir.
-spec clean([module()], rebar_app_info:t()) -> 'ok'.
Expand Down
2 changes: 0 additions & 2 deletions src/rebar_compiler_dag.erl
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,6 @@ find_app_(Path, [{AppPath, AppName}|Rest]) ->
find_app_(Path, Rest)
end.



%% @private Return what should be the base name of an erl file, relocated to the
%% target directory. For example:
%% target_base("ebin/", "src/my_module.erl", ".erl", ".beam") -> "ebin/my_module.beam"
Expand Down
56 changes: 56 additions & 0 deletions src/rebar_parallel.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
%%% @doc
%%% This module contains a small parallel dispatch queue that allows
%%% to take a list of jobs and run as many of them in parallel as there
%%% are schedulers ongoing.
%%%
%%% Original design by Max Fedorov in the rebar compiler, then generalised
%%% and extracted here to be reused in other circumstances.
%%% @end
-module(rebar_parallel).
-export([queue/5]).
-include("rebar.hrl").

queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Parent = self(),
Worker = fun() -> worker(Parent, WorkF, WArgs) end,
Jobs = min(length(Tasks), erlang:system_info(schedulers)),
?DEBUG("Starting ~B worker(s)", [Jobs]),
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
parallel_dispatch(Tasks, Pids, Handler, HArgs).

parallel_dispatch([], [], _, _) ->
[];
parallel_dispatch(Targets, Pids, Handler, Args) ->
receive
{ready, Worker} when is_pid(Worker), Targets =:= [] ->
Worker ! empty,
parallel_dispatch(Targets, Pids, Handler, Args);
{ready, Worker} when is_pid(Worker) ->
[Task|Tasks] = Targets,
Worker ! {task, Task},
parallel_dispatch(Tasks, Pids, Handler, Args);
{'DOWN', Mref, _, Pid, normal} ->
NewPids = lists:delete({Pid, Mref}, Pids),
parallel_dispatch(Targets, NewPids, Handler, Args);
{'DOWN', _Mref, _, _Pid, Info} ->
?ERROR("Task failed: ~p", [Info]),
?FAIL;
{result, Result} ->
case Handler(Result, Args) of
ok ->
parallel_dispatch(Targets, Pids, Handler, Args);
{ok, Acc} ->
[Acc | parallel_dispatch(Targets, Pids, Handler, Args)]
end
end.

worker(QueuePid, F, Args) ->
QueuePid ! {ready, self()},
receive
{task, Task} ->
QueuePid ! {result, F(Task, Args)},
worker(QueuePid, F, Args);
empty ->
ok
end.

0 comments on commit 235543d

Please sign in to comment.