From 3923612ae2352b5f614d75cd727c06c214ec3347 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 20 Jan 2025 15:18:16 +0000 Subject: [PATCH] Enable async_dist when processing send_msg effects for remote nodes. This will allow users to simplify their Ra client implementations not to have to handle lost messages as could occur when the distribution buffer fills. --- src/ra_server_proc.erl | 30 ++++++++++++++++++++---------- test/ra_SUITE.erl | 9 +++------ 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 8f4731e0..e9ecb46e 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -72,6 +72,19 @@ -define(HANDLE_EFFECTS(Effects, EvtType, State0), handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)). +-define(ASYNC_DIST(Node, Send), + case Node == node() of + true -> + Send, + ok; + false -> + %% use async_dist for remote sends + process_flag(async_dist, true), + Send, + process_flag(async_dist, false), + ok + end). + -type query_fun() :: ra:query_fun(). -type query_options() :: #{condition => ra:query_condition()}. @@ -1349,8 +1362,8 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) -> {State, [Next | Actions]}; handle_effect(leader, {send_msg, To, Msg}, _, State, Actions) -> %% default is to send without any wrapping - %% TODO: handle send failure? how? - _ = send(To, Msg, State#state.conf), + ToNode = get_node(To), + ?ASYNC_DIST(ToNode, _ = send(To, Msg, State#state.conf)), {State, Actions}; handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, _, State, Actions) -> @@ -1359,13 +1372,13 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, true -> case can_execute_locally(RaftState, ToNode, State) of true -> - send_msg(Eff, State); + ?ASYNC_DIST(ToNode, send_msg(Eff, State)); false -> ok end; false when RaftState == leader -> %% the effect got here so we can execute - send_msg(Eff, State); + ?ASYNC_DIST(ToNode, send_msg(Eff, State)); false -> ok end, @@ -1978,11 +1991,7 @@ handle_tick_metrics(State) -> can_execute_locally(RaftState, TargetNode, #state{server_state = ServerState} = State) -> - Membership = ra_server:get_membership(ServerState), case RaftState of - _ when RaftState =/= leader andalso - Membership == voter -> - TargetNode == node(); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. @@ -1990,8 +1999,9 @@ can_execute_locally(RaftState, TargetNode, not lists:any(fun ({_, N}) -> N == TargetNode end, Members); leader -> true; - _ -> - false + _ when RaftState =/= leader -> + TargetNode == node() andalso + voter == ra_server:get_membership(ServerState) end. can_execute_on_member(_RaftState, Member, diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 2d8cb621..30c70fc7 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -128,19 +128,16 @@ end_per_testcase(_TestCase, Config) -> Config. single_server_processes_command(Config) -> - % ok = logger:set_primary_config(level, all), Name = ?config(test_name, Config), - N1 = nth_server_name(Config, 1), + {_RaName, _} = N1 = nth_server_name(Config, 1), ok = ra:start_server(default, Name, N1, add_machine(), []), ok = ra:trigger_election(N1), monitor(process, element(1, N1)), % index is 2 as leaders commit a no-op entry on becoming leaders - % debugger:start(), - % int:i(ra_server_proc), - % int:break(ra_server_proc, 440), {ok, 5, _} = ra:process_command(N1, 5, 2000), {ok, 10, _} = ra:process_command(N1, 5, 2000), - terminate_cluster([N1]). + terminate_cluster([N1]), + ok. pipeline_commands(Config) -> Name = ?config(test_name, Config),