Skip to content

Commit

Permalink
khepri_machine: Use ra:key_metrics/2 instead of ra:member_overview/2
Browse files Browse the repository at this point in the history
[Why]
`ra:member_overview/2` is a very expensive call.

[How]
We just need the last index and the current term from the leader and
`ra:key_metrics/2` provides this piece of information too.

The difference is huge: in my benchmark, the query rate goes from 15
queries per second to 100k. This is in association with a related change
in Ra; see rabbitmq/ra#462.
  • Loading branch information
dumbbell committed Jul 31, 2024
1 parent 4ab83d8 commit 137bacb
Showing 1 changed file with 27 additions and 31 deletions.
58 changes: 27 additions & 31 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1030,40 +1030,36 @@ add_applied_condition2(StoreId, Options, Timeout) ->
end.

add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%% We query the leader to know the last index it committed. We also
%% double-check it is still the leader; if it is not, we recurse.
%% We query the leader to know the last index it committed in which term.
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:member_overview(LeaderId, Timeout) of
{ok, Overview, LeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),

%% Now that we know the last committed index of the leader, we can
%% perform an arbitrary query on the local server. The query will
%% wait for that same index to be applied locally before it is
%% executed.
%%
%% We don't care about the result of that query. We just want to
%% block until the latest commands are applied locally.
#{log := #{last_index := LastIndex},
current_term := CurrentTerm} = Overview,
Condition = {applied, {LastIndex, CurrentTerm}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout},
{ok, Options1};
{ok, _Overview, NewLeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout);
{timeout, _LeaderId} ->
try
case ra:key_metrics(LeaderId, Timeout) of
#{last_index := LastIndex, term := Term} ->
NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0),

%% Now that we know the last committed index of the leader, we
%% can perform an arbitrary query on the local server. The
%% query will wait for that same index to be applied locally
%% before it is executed.
%%
%% We don't care about the result of that query. We just want
%% to block until the latest commands are applied locally.
Condition = {applied, {LastIndex, Term}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout1},
{ok, Options1};
_ ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout)
end
catch
error:{erpc, timeout} ->
{error, timeout};
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
error:{erpc, noconnection} ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout);
Error ->
Error
NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout2)
end.

-spec get_timeout(Options) -> Timeout when
Expand Down

0 comments on commit 137bacb

Please sign in to comment.