Skip to content

Commit

Permalink
rabbit_feature_flags: Retry after erpc:call() fails with noconnection
Browse files Browse the repository at this point in the history
[Why]
There could be a transient network issue. Let's give a few more chances
to perform the requested RPC call.

[How]
We retry until the given timeout is reached, if any.

To honor that timeout, we measure the time taken by the RPC call itself.
We also sleep between retries. Before each retry, the timeout is reduced
by the total of the time taken by the RPC call and the sleep.

References #8346.

V2: Treat `infinity` timeout differently. In this case, we never retry
    following a `noconnection` error. The reason is that this timeout is
    used specifically for callbacks executed remotely. We don't know how
    long they take (for instance if there is a lot of data to migrate).
    We don't want an infinite retry loop either, so in this case, we
    don't retry.
(cherry picked from commit 8749c60)
  • Loading branch information
dumbbell authored and mergify[bot] committed Jun 6, 2023
1 parent caf6423 commit 47b1596
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
23 changes: 23 additions & 0 deletions deps/rabbit/src/rabbit_ff_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1186,9 +1186,32 @@ this_node_first(Nodes) ->
Ret :: term() | {error, term()}.

rpc_call(Node, Module, Function, Args, Timeout) ->
SleepBetweenRetries = 5000,
T0 = erlang:monotonic_time(),
try
erpc:call(Node, Module, Function, Args, Timeout)
catch
%% In case of `noconnection' with `Timeout'=infinity, we don't retry
%% at all. This is because the infinity "timeout" is used to run
%% callbacks on remote node and they can last an indefinite amount of
%% time, for instance, if there is a lot of data to migrate.
error:{erpc, noconnection} = Reason
when is_integer(Timeout) andalso Timeout > SleepBetweenRetries ->
?LOG_ERROR(
"Feature flags: no connection to node `~ts`; "
"retrying in ~b milliseconds",
[Node, SleepBetweenRetries],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
timer:sleep(SleepBetweenRetries),
T1 = erlang:monotonic_time(),
TDiff = erlang:convert_time_unit(T1 - T0, native, millisecond),
Remaining = Timeout - TDiff,
Timeout1 = erlang:max(Remaining, 0),
case Timeout1 of
0 -> {error, Reason};
_ -> rpc_call(Node, Module, Function, Args, Timeout1)
end;

Class:Reason:Stacktrace ->
Message0 = erl_error:format_exception(Class, Reason, Stacktrace),
Message1 = lists:flatten(Message0),
Expand Down
25 changes: 25 additions & 0 deletions deps/rabbit/test/feature_flags_v2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
mf_wait_and_count_runs_v2_post_enable/1,
mf_crash_on_joining_node/1,

rpc_calls/1,
enable_unknown_feature_flag_on_a_single_node/1,
enable_supported_feature_flag_on_a_single_node/1,
enable_unknown_feature_flag_in_a_3node_cluster/1,
Expand All @@ -36,6 +37,7 @@
enable_unsupported_feature_flag_in_a_3node_cluster/1,
enable_feature_flag_in_cluster_and_add_member_after/1,
enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2/1,
enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/0,
enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1,
enable_feature_flag_with_post_enable/1,
have_required_feature_flag_in_cluster_and_add_member_with_it_disabled/1,
Expand All @@ -54,6 +56,10 @@ all() ->
groups() ->
Groups =
[
{direct, [parallel],
[
rpc_calls
]},
{cluster_size_1, [parallel],
[
enable_unknown_feature_flag_on_a_single_node,
Expand Down Expand Up @@ -94,6 +100,8 @@ end_per_suite(Config) ->

init_per_group(feature_flags_v2, Config) ->
rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, true});
init_per_group(direct, Config) ->
Config;
init_per_group(cluster_size_1, Config) ->
rabbit_ct_helpers:set_config(Config, {nodes_count, 1});
init_per_group(cluster_size_3, Config) ->
Expand All @@ -104,11 +112,15 @@ init_per_group(_Group, Config) ->
end_per_group(_Group, Config) ->
Config.

init_per_testcase(rpc_calls, Config) ->
Config;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:run_steps(
Config,
[fun(Cfg) -> start_slave_nodes(Cfg, Testcase) end]).

end_per_testcase(rpc_calls, Config) ->
Config;
end_per_testcase(_Testcase, Config) ->
rabbit_ct_helpers:run_steps(
Config,
Expand Down Expand Up @@ -328,6 +340,16 @@ get_peer_proc() ->
%% Testcases.
%% -------------------------------------------------------------------

rpc_calls(_Config) ->
List = [1, 2, 3],
?assertEqual(
lists:sum(List),
rabbit_ff_controller:rpc_call(node(), lists, sum, [List], 11000)),
?assertEqual(
{error, {erpc, noconnection}},
rabbit_ff_controller:rpc_call(
nonode@non_existing_host, lists, sum, [List], 11000)).

enable_unknown_feature_flag_on_a_single_node(Config) ->
[Node] = ?config(nodes, Config),
ok = run_on_node(
Expand Down Expand Up @@ -865,6 +887,9 @@ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) ->
|| Node <- AllNodes],
ok.

enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2() ->
[{timetrap, {minutes, 3}}].

enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) ->
AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config(
nodes, Config),
Expand Down

0 comments on commit 47b1596

Please sign in to comment.