Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce initial_machine_version server config. #498

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

-include("ra.hrl").

-export([init/2,
-export([init/3,
apply/4,
tick/3,
snapshot_installed/5,
Expand All @@ -90,7 +90,9 @@
-type user_command() :: term().
%% the command type for a given machine implementation

-type machine_init_args() :: #{name := atom(), atom() => term()}.
-type machine_init_args() :: #{name := atom(),
machine_version => version(),
atom() => term()}.
%% the configuration passed to the init callback

-type machine() :: {machine, module(), AddInitArgs :: #{term() => term()}}.
Expand Down Expand Up @@ -294,15 +296,11 @@
%% @doc initialise a new machine
%% This is only called on startup only if there isn't yet a snapshot to recover
%% from. Once a snapshot has been taken this is never called again.
-spec init(machine(), atom()) -> state().
init({machine, _, Args} = Machine, Name) ->
%% init always dispatches to the first version
%% as this means every state machine in a mixed version cluster will
%% have a common starting point.
%% TODO: it should be possible to pass a lowest supported state machine
%% version flag in the init args so that old machine version can be purged
Mod = which_module(Machine, 0),
Mod:init(Args#{name => Name}).
-spec init(machine(), atom(), version()) -> state().
init({machine, _, Args} = Machine, Name, Version) ->
Mod = which_module(Machine, Version),
Mod:init(Args#{name => Name,
machine_version => Version}).
kjnilsson marked this conversation as resolved.
Show resolved Hide resolved

-spec apply(module(), command_meta_data(), command(), State) ->
{State, reply(), effects()} | {State, reply()}.
Expand Down
23 changes: 13 additions & 10 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
log_init_args := ra_log:ra_log_init_args(),
initial_members := [ra_server_id()],
machine := machine_conf(),
initial_machine_version => ra_machine:version(),
friendly_name => unicode:chardata(),
metrics_key => term(),
% TODO: review - only really used for
Expand Down Expand Up @@ -352,24 +353,26 @@ init(#{id := Id,
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),

LatestMacVer = ra_machine:version(Machine),
InitialMachineVersion = min(LatestMacVer,
maps:get(initial_machine_version, Config, 0)),
kjnilsson marked this conversation as resolved.
Show resolved Hide resolved

{_FirstIndex, Cluster0, MacVer, MacState,
{Cluster0, EffectiveMacVer, MacState,
{SnapshotIdx, _} = SnapshotIndexTerm} =
case ra_log:recover_snapshot(Log0) of
undefined ->
InitialMachineState = ra_machine:init(Machine, Name),
{0, make_cluster(Id, InitialNodes),
0, InitialMachineState, {0, 0}};
InitialMachineState = ra_machine:init(Machine, Name,
InitialMachineVersion),
{make_cluster(Id, InitialNodes),
InitialMachineVersion, InitialMachineState, {0, 0}};
{#{index := Idx,
term := Term,
cluster := ClusterNodes,
machine_version := MacVersion}, MacSt} ->
Clu = make_cluster(Id, ClusterNodes),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
{Clu, MacVersion, MacSt, {Idx, Term}}
end,
MacMod = ra_machine:which_module(Machine, MacVer),
MacMod = ra_machine:which_module(Machine, EffectiveMacVer),

CommitIndex = max(LastApplied, SnapshotIdx),
Cfg = #cfg{id = Id,
Expand All @@ -378,8 +381,8 @@ init(#{id := Id,
metrics_key = MetricKey,
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, MacVer}],
effective_machine_version = MacVer,
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
effective_machine_version = EffectiveMacVer,
effective_machine_module = MacMod,
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
max_pipeline_count = MaxPipelineCount,
Expand All @@ -389,7 +392,7 @@ init(#{id := Id,
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, MacVer),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, EffectiveMacVer),

NonVoter = get_membership(Cluster0, Id, UId,
maps:get(membership, Config, voter)),
Expand Down
31 changes: 27 additions & 4 deletions src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
-include("ra.hrl").

-spec start_server(System :: atom(), ra_server:ra_server_config()) ->
supervisor:startchild_ret() | {error, not_new | system_not_started} | {badrpc, term()}.
supervisor:startchild_ret() |
{error, not_new | system_not_started | invalid_initial_machine_version} |
{badrpc, term()}.
start_server(System, #{id := NodeId,
uid := UId} = Config)
when is_atom(System) ->
Expand All @@ -61,9 +63,14 @@ start_server_rpc(System, UId, Config0) ->
%% check that the server isn't already registered
case ra_directory:name_of(System, UId) of
undefined ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
case validate_config(Config) of
ok ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
Err ->
Err
end;
Err ->
Err
end;
Expand All @@ -77,6 +84,22 @@ start_server_rpc(System, UId, Config0) ->
end
end.

validate_config(#{system_config := SysConf} = Config) ->
Strat = maps:get(machine_upgrade_strategy, SysConf, all),
case Config of
#{initial_machine_version := InitMacVer,
machine := {module, Mod, Args}} when Strat == all ->
MacVer = ra_machine:version({machine, Mod, Args}),
if MacVer < InitMacVer ->
{error, invalid_initial_machine_version};
true ->
ok
end;
_ ->
ok
end.


restart_server_rpc(System, {RaName, _Node}, AddConfig)
when is_atom(System) ->
case ra_system:fetch(System) of
Expand Down
3 changes: 2 additions & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ execute_state_machine() ->
%% creating a new WAL file with ra_fifo
[Srv] = Nodes = [{ra_dbg, node()}],
ClusterId = ra_dbg,
Config = #{name => ClusterId},
Config = #{name => ClusterId,
machine_version => 0},
Machine = {module, ra_fifo, Config},
ra:start(),
{ok, _, _} = ra:start_cluster(default, ClusterId, Machine, Nodes),
Expand Down
29 changes: 9 additions & 20 deletions test/ra_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
-opaque state() :: #state{}.

-type config() :: #{name := atom(),
machine_version := ra_machine:version(),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
cancel_customer_handler => applied_mfa(),
Expand Down Expand Up @@ -902,7 +903,8 @@ size_test(NumMsg, NumCust) ->
EnqGen = fun(N) -> {N, {enqueue, N}} end,
CustGen = fun(N) -> {N, {checkout, {auto, 100},
spawn(fun() -> ok end)}} end,
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test})),
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test,
machine_version => 0})),
S = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
S2 = S#state{ra_indexes = ra_fifo_index:map(fun(_, _) -> undefined end,
S#state.ra_indexes)},
Expand All @@ -918,29 +920,13 @@ perf_test(NumMsg, NumCust) ->
{N, {settle, N - NumMsg - NumCust - 1, Pid}}
end,
S0 = run_log(1, NumMsg, EnqGen,
init(#{name => size_test})),
init(#{name => size_test,
machine_version => 0})),
S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
_ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
ok
end).

% profile(File) ->
% GzFile = atom_to_list(File) ++ ".gz",
% lg:trace([ra_fifo, maps, queue, ra_fifo_index], lg_file_tracer,
% GzFile, #{running => false, mode => profile}),
% NumMsg = 10000,
% NumCust = 500,
% EnqGen = fun(N) -> {N, {enqueue, self(), N, N}} end,
% Pid = spawn(fun() -> ok end),
% CustGen = fun(N) -> {N, {checkout, {auto, NumMsg},
% {term_to_binary(N), Pid}}} end,
% SetlGen = fun(N) -> {N, {settle, N - NumMsg - NumCust - 1, Pid}} end,
% S0 = run_log(1, NumMsg, EnqGen, element(1, init(#{name => size_test}))),
% S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
% _ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
% lg:stop().


run_log(Num, Num, _Gen, State) ->
State;
run_log(Num, Max, Gen, State0) ->
Expand Down Expand Up @@ -995,6 +981,7 @@ dehydrate_state(#state{messages = Messages0,

test_init(Name) ->
init(#{name => Name,
machine_version => 0,
shadow_copy_interval => 0,
metrics_handler => {?MODULE, metrics_handler, []}}).

Expand Down Expand Up @@ -1243,6 +1230,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
Cid = {<<"completed_customer_yields_demonitor_effect_test">>, self()},
State00 = init(#{name => test,
machine_version => 0,
dead_letter_handler =>
{somemod, somefun, [somearg]}}),
{State0, _, [_, _]} = enq(1, 1, first, State00),
Expand Down Expand Up @@ -1430,6 +1418,7 @@ duplicate_delivery_test() ->
state_enter_test() ->

S0 = init(#{name => the_name,
machine_version => 0,
become_leader_handler => {m, f, [a]}}),
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
Expand Down Expand Up @@ -1505,7 +1494,7 @@ run_log(InitState, Entries) ->
aux_test() ->
_ = ra_machine_ets:start_link(),
Aux0 = init_aux(aux_test),
MacState = init(#{name => aux_test}),
MacState = init(#{name => aux_test, machine_version => 0}),
Log = undefined,
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
Log, MacState),
Expand Down
Loading
Loading