diff --git a/src/rebar_compiler.erl b/src/rebar_compiler.erl index 5ff934bd9..3d7c16ad5 100644 --- a/src/rebar_compiler.erl +++ b/src/rebar_compiler.erl @@ -46,7 +46,7 @@ %% @doc analysis by the caller, in order to let an OTP app %% find and resolve all its dependencies as part of compile_all's new %% API, which presumes a partial analysis is done ahead of time --spec analyze_all(DAG, [App, ...]) -> ok when +-spec analyze_all(DAG, [App, ...]) -> {map(), [App]} when DAG :: {module(), digraph:graph()}, App :: rebar_app_info:t(). analyze_all({Compiler, G}, Apps) -> @@ -146,7 +146,7 @@ compile_all(Compilers, AppInfo) -> % =< 3.13.0 interface; plugins use this! lists:foreach(fun(Compiler) -> OutDir = rebar_app_info:out_dir(AppInfo), G = rebar_compiler_dag:init(OutDir, Compiler, undefined, []), - Ctx = analyze_all({Compiler, G}, [AppInfo]), + {Ctx, _} = analyze_all({Compiler, G}, [AppInfo]), compile_analyzed({Compiler, G}, AppInfo, Ctx), rebar_compiler_dag:maybe_store(G, OutDir, Compiler, undefined, []), rebar_compiler_dag:terminate(G) @@ -188,7 +188,9 @@ run(G, CompilerMod, AppInfo, Contexts) -> ++ case RestFiles of {Sequential, Parallel} -> % parallelizable form compile_each(Sequential, Opts, BaseOpts, Mappings, CompilerMod) ++ - compile_parallel(Parallel, Opts, BaseOpts, Mappings, CompilerMod); + lists:append( + compile_parallel(Parallel, Opts, BaseOpts, Mappings, CompilerMod) + ); _ when is_list(RestFiles) -> % traditional sequential build compile_each(RestFiles, Opts, BaseOpts, Mappings, CompilerMod) end, @@ -250,75 +252,47 @@ store_artifacts(G, [{Source, Target, Meta}|Rest]) -> rebar_compiler_dag:store_artifact(G, Source, Target, Meta), store_artifacts(G, Rest). -compile_worker(QueuePid, Opts, Config, Outs, CompilerMod) -> - QueuePid ! self(), - receive - {compile, Source} -> - Result = - case erlang:function_exported(CompilerMod, compile_and_track, 4) of - false -> - CompilerMod:compile(Source, Outs, Config, Opts); - true -> - CompilerMod:compile_and_track(Source, Outs, Config, Opts) - end, - QueuePid ! {Result, Source}, - compile_worker(QueuePid, Opts, Config, Outs, CompilerMod); - empty -> - ok - end. - -compile_parallel([], _Opts, _BaseOpts, _Mappings, _CompilerMod) -> - []; compile_parallel(Targets, Opts, BaseOpts, Mappings, CompilerMod) -> - Self = self(), - F = fun() -> compile_worker(Self, Opts, BaseOpts, Mappings, CompilerMod) end, - Jobs = min(length(Targets), erlang:system_info(schedulers)), - ?DEBUG("Starting ~B compile worker(s)", [Jobs]), - Pids = [spawn_monitor(F) || _I <- lists:seq(1, Jobs)], - compile_queue(Targets, Pids, Opts, BaseOpts, Mappings, CompilerMod). - -compile_queue([], [], _Opts, _Config, _Outs, _CompilerMod) -> - []; -compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod) -> Tracking = erlang:function_exported(CompilerMod, compile_and_track, 4), - receive - Worker when is_pid(Worker), Targets =:= [] -> - Worker ! empty, - compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod); - Worker when is_pid(Worker) -> - Worker ! {compile, hd(Targets)}, - compile_queue(tl(Targets), Pids, Opts, Config, Outs, CompilerMod); - {ok, Source} -> - ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), - compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod); - {{ok, Tracked}, Source} when Tracking -> - ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), - Tracked ++ - compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod); - {{ok, Warnings}, Source} when not Tracking -> - report(Warnings), - ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), - compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod); - {{ok, Tracked, Warnings}, Source} when Tracking -> - report(Warnings), - ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), - Tracked ++ - compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod); - {skipped, Source} -> - ?DEBUG("~sSkipped ~s", [rebar_utils:indent(1), Source]), - compile_queue(Targets, Pids, Opts, Config, Outs, CompilerMod); - {Error, Source} -> - NewSource = format_error_source(Source, Config), - ?ERROR("Compiling ~ts failed", [NewSource]), - maybe_report(Error), - ?FAIL; - {'DOWN', Mref, _, Pid, normal} -> - Pids2 = lists:delete({Pid, Mref}, Pids), - compile_queue(Targets, Pids2, Opts, Config, Outs, CompilerMod); - {'DOWN', _Mref, _, _Pid, Info} -> - ?ERROR("Compilation failed: ~p", [Info]), - ?FAIL - end. + rebar_parallel:queue( + Targets, + fun compile_worker/2, [Opts, BaseOpts, Mappings, CompilerMod], + fun compile_handler/2, [BaseOpts, Tracking] + ). + +compile_worker(Source, [Opts, Config, Outs, CompilerMod]) -> + Result = case erlang:function_exported(CompilerMod, compile_and_track, 4) of + false -> + CompilerMod:compile(Source, Outs, Config, Opts); + true -> + CompilerMod:compile_and_track(Source, Outs, Config, Opts) + end, + %% Bundle the source to allow proper reporting in the handler: + {Result, Source}. + +compile_handler({ok, Source}, _Args) -> + ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), + ok; +compile_handler({{ok, Tracked}, Source}, [_, Tracking]) when Tracking -> + ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), + {ok, Tracked}; +compile_handler({{ok, Warnings}, Source}, _Args) -> + report(Warnings), + ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), + ok; +compile_handler({{ok, Tracked, Warnings}, Source}, [_, Tracking]) when Tracking -> + report(Warnings), + ?DEBUG("~sCompiled ~s", [rebar_utils:indent(1), Source]), + {ok, Tracked}; +compile_handler({skipped, Source}, _Args) -> + ?DEBUG("~sSkipped ~s", [rebar_utils:indent(1), Source]), + ok; +compile_handler({Error, Source}, [Config | _Rest]) -> + NewSource = format_error_source(Source, Config), + ?ERROR("Compiling ~ts failed", [NewSource]), + maybe_report(Error), + ?FAIL. + %% @doc remove compiled artifacts from an AppDir. -spec clean([module()], rebar_app_info:t()) -> 'ok'. diff --git a/src/rebar_compiler_dag.erl b/src/rebar_compiler_dag.erl index 6eb21fb84..04b4e8c7a 100644 --- a/src/rebar_compiler_dag.erl +++ b/src/rebar_compiler_dag.erl @@ -405,8 +405,6 @@ find_app_(Path, [{AppPath, AppName}|Rest]) -> find_app_(Path, Rest) end. - - %% @private Return what should be the base name of an erl file, relocated to the %% target directory. For example: %% target_base("ebin/", "src/my_module.erl", ".erl", ".beam") -> "ebin/my_module.beam" diff --git a/src/rebar_parallel.erl b/src/rebar_parallel.erl new file mode 100644 index 000000000..e443619ca --- /dev/null +++ b/src/rebar_parallel.erl @@ -0,0 +1,56 @@ +%%% @doc +%%% This module contains a small parallel dispatch queue that allows +%%% to take a list of jobs and run as many of them in parallel as there +%%% are schedulers ongoing. +%%% +%%% Original design by Max Fedorov in the rebar compiler, then generalised +%%% and extracted here to be reused in other circumstances. +%%% @end +-module(rebar_parallel). +-export([queue/5]). +-include("rebar.hrl"). + +queue(Tasks, WorkF, WArgs, Handler, HArgs) -> + Parent = self(), + Worker = fun() -> worker(Parent, WorkF, WArgs) end, + Jobs = min(length(Tasks), erlang:system_info(schedulers)), + ?DEBUG("Starting ~B worker(s)", [Jobs]), + Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)], + parallel_dispatch(Tasks, Pids, Handler, HArgs). + +parallel_dispatch([], [], _, _) -> + []; +parallel_dispatch(Targets, Pids, Handler, Args) -> + receive + {ready, Worker} when is_pid(Worker), Targets =:= [] -> + Worker ! empty, + parallel_dispatch(Targets, Pids, Handler, Args); + {ready, Worker} when is_pid(Worker) -> + [Task|Tasks] = Targets, + Worker ! {task, Task}, + parallel_dispatch(Tasks, Pids, Handler, Args); + {'DOWN', Mref, _, Pid, normal} -> + NewPids = lists:delete({Pid, Mref}, Pids), + parallel_dispatch(Targets, NewPids, Handler, Args); + {'DOWN', _Mref, _, _Pid, Info} -> + ?ERROR("Task failed: ~p", [Info]), + ?FAIL; + {result, Result} -> + case Handler(Result, Args) of + ok -> + parallel_dispatch(Targets, Pids, Handler, Args); + {ok, Acc} -> + [Acc | parallel_dispatch(Targets, Pids, Handler, Args)] + end + end. + +worker(QueuePid, F, Args) -> + QueuePid ! {ready, self()}, + receive + {task, Task} -> + QueuePid ! {result, F(Task, Args)}, + worker(QueuePid, F, Args); + empty -> + ok + end. +