From 85398df143fd229a8804d797dff91ac6236843ef Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 31 Jan 2025 13:06:49 +0000 Subject: [PATCH 1/2] Fix log/2 effect regression. the {log, Indexes, Fun} effect should only be evaluated by the leader but this was regressed in the removal of the filter_follower_effects function --- src/ra_server_proc.erl | 71 ++++++++++++++++++----------------- test/ra_machine_int_SUITE.erl | 26 +++++++++---- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index e9ecb46e..11180edd 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1383,16 +1383,6 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, ok end, {State, Actions}; -handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType, - State, Actions) - when LogOrLogExt == log orelse LogOrLogExt == log_ext -> - case can_execute_locally(RaftState, Node, State) of - true -> - handle_effect(RaftState, {LogOrLogExt, Idxs, Fun}, EvtType, - State, Actions); - false -> - {State, Actions} - end; handle_effect(leader, {append, Cmd}, _EvtType, State, Actions) -> Evt = {command, normal, {'$usr', Cmd, noreply}}, {State, [{next_event, cast, Evt} | Actions]}; @@ -1405,33 +1395,29 @@ handle_effect(_RaftState, {try_append, Cmd, ReplyMode}, _EvtType, State, Actions %% limited to the leader Evt = {command, normal, {'$usr', Cmd, ReplyMode}}, {State, [{next_event, cast, Evt} | Actions]}; -handle_effect(RaftState, {log, Idxs, Fun}, EvtType, - State = #state{server_state = SS0}, Actions) - when is_list(Idxs) -> - %% Useful to implement a batch send of data obtained from the log. - %% 1) Retrieve all data from the list of indexes - {ok, Cmds, SS} = ra_server:log_read(Idxs, SS0), - %% 2) Apply the fun to the list of commands as a whole and deal with any effects - case Fun(Cmds) of - [] -> - {State#state{server_state = SS}, Actions}; - Effects -> - %% recurse with the new effects - handle_effects(RaftState, Effects, EvtType, - State#state{server_state = SS}, Actions) - end; -handle_effect(RaftState, {log_ext, Idxs, Fun}, EvtType, - State = #state{server_state = SS0}, Actions) - when is_list(Idxs) -> - ReadState = ra_server:log_partial_read(Idxs, SS0), - case Fun(ReadState) of - [] -> - {State, Actions}; - Effects -> - %% recurse with the new effects +handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType, + State0, Actions) + when LogOrLogExt == log orelse + LogOrLogExt == log_ext -> + case can_execute_locally(RaftState, Node, State0) of + true -> + {Effects, State} = handle_log_effect(LogOrLogExt, Idxs, Fun, + State0), handle_effects(RaftState, Effects, EvtType, - State, Actions) + State, Actions); + false -> + {State0, Actions} end; +handle_effect(leader, {LogOrLogExt, Idxs, Fun}, EvtType, State0, Actions) + when is_list(Idxs) andalso + (LogOrLogExt == log orelse + LogOrLogExt == log_ext) -> + %% Useful to implement a batch send of data obtained from the log. + %% 1) Retrieve all data from the list of indexes + {Effects, State} = handle_log_effect(LogOrLogExt, Idxs, Fun, + State0), + handle_effects(leader, Effects, EvtType, + State, Actions); handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) -> {_, ServerState, Effects} = ra_server:handle_aux(RaftState, cast, Cmd, State0#state.server_state), @@ -2122,3 +2108,18 @@ schedule_command_flush(Delayed) -> _ -> ok = gen_statem:cast(self(), flush_commands) end. + + +handle_log_effect(log, Idxs, Fun, + #state{server_state = SS0} = State) + when is_list(Idxs) -> + %% Useful to implement a batch send of data obtained from the log. + %% 1) Retrieve all data from the list of indexes + {ok, Cmds, SS} = ra_server:log_read(Idxs, SS0), + %% 2) Apply the fun to the list of commands as a whole and deal with any effects + {Fun(Cmds), State#state{server_state = SS}}; +handle_log_effect(log_ext, Idxs, Fun, + #state{server_state = SS0} = State) + when is_list(Idxs) -> + ReadState = ra_server:log_partial_read(Idxs, SS0), + {Fun(ReadState), State}. diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index 518bfca9..8c584a19 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -610,19 +610,31 @@ log_effect(Config) -> {log, lists:reverse(Idxs), fun (Cmds) -> Datas = [D || {_, D} <- Cmds], - [{send_msg, Self, - {datas, Datas}}] + %% using a plain send here to + %% ensure this effect is only + %% evaluated on leader + Self ! {datas, Datas}, + [] end}} end), ClusterName = ?config(cluster_name, Config), - ServerId = ?config(server_id, Config), - ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]), - {ok, _, ServerId} = ra:process_command(ServerId, {cmd, <<"hi1">>}), + ServerId1 = ?config(server_id, Config), + ServerId2 = ?config(server_id2, Config), + ServerId3 = ?config(server_id3, Config), + ok = start_cluster(ClusterName, {module, Mod, #{}}, + [ServerId1, ServerId2, ServerId3]), + {ok, _, ServerId} = ra:process_command(ServerId1, {cmd, <<"hi1">>}), {ok, _, ServerId} = ra:process_command(ServerId, {cmd, <<"hi2">>}), - {ok, _, ServerId} = ra:process_command(ServerId, get_data), + {ok, ok, ServerId} = ra:process_command(ServerId, get_data), receive {datas, [<<"hi1">>, <<"hi2">>]} -> - ok + receive + {datas, [<<"hi1">>, <<"hi2">>]} -> + ct:fail("unexpected second log effect execution"), + ok + after 100 -> + ok + end after 5000 -> flush(), exit(data_timeout) From 51523d6365e5908a0e45cfdf7b07bc9862aa5318 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 31 Jan 2025 14:12:21 +0000 Subject: [PATCH 2/2] Remove overly chatty debug log message And pass the right start index in segment writer. --- src/ra_log_segment_writer.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index 5f789895..f6ab0478 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -179,7 +179,8 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir, ok = prim_file:delete(filename:join(Dir, WalFile)), T2 = erlang:monotonic_time(), Diff = erlang:convert_time_unit(T2 - T1, native, millisecond), - ?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file ~s in ~bms", + ?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file " + "~s in ~bms", [System, length(RangesList), WalFile, Diff]), {noreply, State}; handle_cast({truncate_segments, Who, {_Range, Name} = SegRef}, @@ -315,8 +316,6 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}}, segment_conf = SegConf} = State) -> Dir = filename:join(DataDir, binary_to_list(ServerUId)), StartIdx = start_index(ServerUId, StartIdx0), - ?DEBUG("~s ~s ~b:~b to ~b", - [?FUNCTION_NAME, ServerUId, StartIdx0, StartIdx, EndIdx]), case open_file(Dir, SegConf) of enoent -> ?DEBUG("segment_writer: skipping segment as directory ~ts does " @@ -325,7 +324,7 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}}, %% clean up the tables for this process []; Segment0 -> - case append_to_segment(ServerUId, Tid, StartIdx0, EndIdx, + case append_to_segment(ServerUId, Tid, StartIdx, EndIdx, Segment0, State) of undefined -> ?WARN("segment_writer: skipping segments for ~w as