Skip to content

Commit

Permalink
Proper testing for pool config
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Nov 29, 2024
1 parent b1c3f76 commit 32bc5eb
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 48 deletions.
104 changes: 63 additions & 41 deletions src/throttle/amoc_throttle_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-define(DEFAULT_STEP_INTERVAL, 100). %% every 100ms

-export([verify_config/1, verify_gradual_config/1, pool_config/2, process_pool_config/2]).
-export([no_of_processes/0]).
-export_type([config/0, gradual_plan/0, pool_config/0]).

-type process_number() :: non_neg_integer().
Expand Down Expand Up @@ -58,60 +59,81 @@ verify_gradual_config(Config) ->
-spec pool_config(amoc_throttle:rate(), amoc_throttle:interval()) -> pool_config().
pool_config(infinity, _) ->
Config = #{max_n => infinity, delay => 0, status => active, pid => undefined},
maps:from_keys(lists:seq(1, no_of_processes()), Config);
maps:from_keys(lists:seq(1, ?MODULE:no_of_processes()), Config);
pool_config(0, _) ->
Config = #{max_n => 0, delay => infinity, status => active, pid => undefined},
maps:from_keys(lists:seq(1, no_of_processes()), Config);
maps:from_keys(lists:seq(1, ?MODULE:no_of_processes()), Config);
pool_config(Rate, 0) ->
Config = #{max_n => Rate, delay => 0, status => inactive, pid => undefined},
PoolConfig = #{1 := First} = maps:from_keys(lists:seq(1, no_of_processes()), Config),
PoolConfig = #{1 := First} = maps:from_keys(lists:seq(1, ?MODULE:no_of_processes()), Config),
PoolConfig#{1 := First#{status => active}};
pool_config(Rate, Interval) ->
NoOfProcesses = no_of_processes(),
RatePerMinutePerProcess = (60000 * Rate div Interval) div NoOfProcesses,
DelayPerProcess = (NoOfProcesses * Interval) div Rate,
Rem = ((60000 * Rate div Interval) rem NoOfProcesses)
+ ((NoOfProcesses * Interval) rem Rate),
calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem).
pool_config(Rate, Interval) when ?POS_INT(Rate), ?POS_INT(Interval) ->
NoOfProcesses = ?MODULE:no_of_processes(),
RatesPerProcess = calculate_rate_per_process(NoOfProcesses, Rate, Interval, +0.0, []),
#{} = lists:foldl(fun assign_process/2, #{}, RatesPerProcess).

-define(THRESHOLD, 10).
calculate_rate_per_process(1, Rate, Interval, RoundingError, Acc) ->
case delay(RoundingError, Rate, Interval) of
{Delay, Remaining} when Delay =:= infinity; Remaining < 0.5 ->
[{1, Rate, Delay} | Acc];
{Delay, _} ->
[{1, Rate, Delay + 1} | Acc]
end;
calculate_rate_per_process(N, Rate, Interval, RoundingError, Acc) when is_integer(N), N > 1 ->
ProcessRate = Rate div N,
case ProcessRate of
_ when ProcessRate =< ?THRESHOLD, Rate =< ?THRESHOLD ->
{Delay, RoundingError1} = delay(RoundingError, Rate, Interval),
Acc1 = [{N, Rate, Delay} | Acc],
calculate_rate_per_process(N - 1, 0, Interval, RoundingError1, Acc1);
_ when ProcessRate =< ?THRESHOLD ->
{Delay, RoundingError1} = delay(RoundingError, ?THRESHOLD, Interval),
Acc1 = [{N, ?THRESHOLD, Delay} | Acc],
calculate_rate_per_process(N - 1, Rate - ?THRESHOLD, Interval, RoundingError1, Acc1);
_ ->
{Delay, RoundingError1} = delay(RoundingError, ProcessRate, Interval),
Acc1 = [{N, ProcessRate, Delay} | Acc],
calculate_rate_per_process(N - 1, Rate - ProcessRate, Interval, RoundingError1, Acc1)
end.

delay(RemainingError, 0, _Interval) ->
{infinity, RemainingError};
delay(RemainingError, Rate, Interval) when RemainingError >= 1.0 ->
case {Interval div Rate, Interval rem Rate} of
{DelayBetweenExecutions, 0} ->
{DelayBetweenExecutions + 1, RemainingError - 1.0};
{DelayBetweenExecutions, Remaining} ->
{DelayBetweenExecutions + 1, RemainingError + (Remaining / Rate) - 1.0}
end;
delay(RemainingError, Rate, Interval) ->
case {Interval div Rate, Interval rem Rate} of
{DelayBetweenExecutions, 0} ->
{DelayBetweenExecutions, RemainingError};
{DelayBetweenExecutions, Remaining} ->
{DelayBetweenExecutions, RemainingError + (Remaining / Rate)}
end.

assign_process({N, RatePerProcess, infinity}, Config) ->
Config#{N => #{max_n => RatePerProcess,
delay => infinity,
status => inactive,
pid => undefined}};
assign_process({N, RatePerProcess, Delay}, Config) ->
Config#{N => #{max_n => RatePerProcess,
delay => Delay,
status => active,
pid => undefined}}.

-spec process_pool_config(pid(), pool_config()) -> pool_config().
process_pool_config(PoolSup, PoolConfig) ->
Workers = amoc_throttle_pool:get_workers(PoolSup),
Fun1 = fun(N, Config) -> Config#{pid => maps:get(N, Workers)} end,
maps:map(Fun1, PoolConfig).

-spec calculate_availability(integer(), integer(), pos_integer(), integer()) -> pool_config().
calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses, Rem) ->
Fun = fun(N, {Acc, R}) ->
case {RatePerMinutePerProcess < NoOfProcesses, R} of
{true, 0} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess + 1,
status => inactive, pid => undefined},
{Acc#{N => Config}, R};
{true, R} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess,
status => active, pid => undefined},
{Acc#{N => Config}, R - 1};
{false, 0} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess,
status => active, pid => undefined},
{Acc#{N => Config}, R};
{false, R} ->
Config = #{max_n => RatePerMinutePerProcess,
delay => DelayPerProcess + 1,
status => active, pid => undefined},
{Acc#{N => Config}, R - 1}
end
end,
{#{} = PoolConfig, _} = lists:foldl(Fun, {#{}, Rem}, lists:seq(1, NoOfProcesses)),
PoolConfig.

-spec no_of_processes() -> non_neg_integer().
no_of_processes() ->
3 * erlang:system_info(schedulers_online).
min(30, 2 * erlang:system_info(schedulers_online)).

-spec do_verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan().
do_verify_gradual_config(
Expand Down Expand Up @@ -159,4 +181,4 @@ do_verify_gradual_config(
calculate_step(N, N, _, _, To) -> To;
calculate_step(0, _, _, From, _) -> From;
calculate_step(N, _, StepRate, From, _) ->
From + floor(StepRate * N).
From + round(StepRate * N).
101 changes: 94 additions & 7 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

all() ->
[
{group, api}
{group, api},
{group, properties}
].

groups() ->
Expand All @@ -35,14 +36,32 @@ groups() ->
change_rate_gradually,
change_interarrival_gradually,
change_rate_gradually_verify_descriptions,
change_rate_gradually_verify_descriptions_properties,
just_wait,
wait_for_process_to_die_sends_a_kill,
async_runner_dies_while_waiting_raises_exit,
async_runner_dies_when_throttler_dies,
pause_and_resume,
get_state
]}
]},
{properties, [],
[
change_rate_gradually_verify_descriptions_properties,
% Note that the smallest delay possible for a process is 1ms (receive operations),
% hence if we give for example 10 workers 1ms delays, we get 600_000 ticks per minute.
% and if we give for example 48 workers 1ms delays, we get 2_880_000 ticks per minute.
% That means, that is realistically the maximum rate we could possibly manage
% with a static pool of such number of workers.
pool_config_is_precise_for_rates_1,
pool_config_is_precise_for_rates_2,
pool_config_is_precise_for_rates_3,
pool_config_is_precise_for_rates_4,
pool_config_is_precise_for_rates_5,
pool_config_is_precise_for_rates_6,
pool_config_is_precise_for_rates_7,
pool_config_is_precise_for_rates_8,
pool_config_is_precise_for_rates_9,
pool_config_is_precise_for_rates_10
]}
].

init_per_suite(Config) ->
Expand All @@ -57,6 +76,18 @@ end_per_suite(_) ->
telemetry_helpers:stop(),
ok.

init_per_group(properties, Config) ->
meck:new(amoc_throttle_config, [passthrough, non_strict, no_link]),
meck:expect(amoc_throttle_config, no_of_processes, [], 100),
Config;
init_per_group(_, Config) ->
Config.

end_per_group(properties, _Config) ->
meck:unload(amoc_throttle_config);
end_per_group(_, _Config) ->
ok.

init_per_testcase(_, Config) ->
Config.

Expand Down Expand Up @@ -351,7 +382,41 @@ change_rate_gradually_verify_descriptions_properties(_) ->
integer(1, 1 bsl 8),
integer(1, 1 bsl 24)},
Fun(From, To, Interval, StepInterval, StepCount)),
run_prop(?FUNCTION_NAME, Prop, 1 bsl 16, 3).
run_prop(?FUNCTION_NAME, Prop, 1 bsl 16, 1).

pool_config_is_precise_for_rates_1(_) ->
pool_config_property_tests(lists:seq(1, 100_000), timer:minutes(1)).

pool_config_is_precise_for_rates_2(_) ->
pool_config_property_tests(lists:seq(100_000, 200_000), timer:minutes(1)).

pool_config_is_precise_for_rates_3(_) ->
pool_config_property_tests(lists:seq(200_000, 300_000), timer:minutes(1)).

pool_config_is_precise_for_rates_4(_) ->
pool_config_property_tests(lists:seq(300_000, 400_000), timer:minutes(1)).

pool_config_is_precise_for_rates_5(_) ->
pool_config_property_tests(lists:seq(400_000, 500_000), timer:minutes(1)).

pool_config_is_precise_for_rates_6(_) ->
pool_config_property_tests(lists:seq(500_000, 600_000), timer:minutes(1)).

pool_config_is_precise_for_rates_7(_) ->
pool_config_property_tests(lists:seq(600_000, 700_000), timer:minutes(1)).

pool_config_is_precise_for_rates_8(_) ->
pool_config_property_tests(lists:seq(700_000, 800_000), timer:minutes(1)).

pool_config_is_precise_for_rates_9(_) ->
pool_config_property_tests(lists:seq(800_000, 900_000), timer:minutes(1)).

pool_config_is_precise_for_rates_10(_) ->
pool_config_property_tests(lists:seq(900_000, 1_000_000), timer:minutes(1)).

% TODO: introduce dynamically sized pools in order to manage higher rates.
pool_config_is_precise_for_high_rates(_) ->
pool_config_property_tests(lists:seq(1 bsl 24, 1 bsl 32), timer:minutes(1)).

just_wait(_) ->
%% it fails if the throttle wasn't started yet
Expand Down Expand Up @@ -431,9 +496,31 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) ->
end,
?assert(lists:any(IsLowRateEventFn, TelemetryEvents)).

run_prop(PropName, Property, NumTests, WorkersPerScheduler) ->
Opts = [noshrink, {start_size, 1}, {numtests, NumTests},
{numworkers, WorkersPerScheduler * erlang:system_info(schedulers_online)}],
pool_config_property_tests(RateGen, IntervalGen) ->
Fun = fun(Rate, Interval) ->
R1 = amoc_throttle_config:pool_config(Rate, Interval),
accumulated_is_requested(Rate, Interval, R1)
end,
[ Fun(Rate, IntervalGen) || Rate <- RateGen].

accumulated_is_requested(Rate, Interval, Res) ->
Fold = fun(_N, #{status := active, delay := D}, Acc) ->
Acc + (1 / D);
(_, _, Acc) ->
Acc
end,
NumberOfActionsPerMs = maps:fold(Fold, +0.0, Res),
Expected = Rate / Interval,
Error = (abs(NumberOfActionsPerMs - Expected) * 100),
Expected >= Error
orelse throw(#{throttle => #{rate => Rate, interval => Interval},
expected_rate_per_minute => Expected,
returned_aggregated_rate_per_minute => NumberOfActionsPerMs,
error_percentage => Error/Expected,
config => maps:filter(fun(_, #{status := Status}) -> Status =:= active end, Res)}).

run_prop(PropName, Property, NumTests, Workers) ->
Opts = [noshrink, {start_size, 1}, {numtests, NumTests}, {numworkers, Workers}],
Res = proper:counterexample(proper:conjunction([{PropName, Property}]), Opts),
?assertEqual(true, Res).

Expand Down

0 comments on commit 32bc5eb

Please sign in to comment.