Skip to content

Commit

Permalink
khepri_machine: Skip extra query before a query with a fence
Browse files Browse the repository at this point in the history
... if it is unneeded.

[Why]
We do that extra query to ensure that previous async commands were
handled by the local Ra server before we proceed with the query with a
fence. This comes with a performance penalty of course.

We don't need that extra query if the previous command or query made by
the calling process was synchronous.

[How]
We now keep a flag in the calling process dictionary to indicate if the
last command was synchonous or it was a query. The flag is cleared with
an async command.

When we have to perform a query with a fence, we look at this flag to
determine if the extra query is needed.
  • Loading branch information
dumbbell committed Aug 13, 2024
1 parent 7033ded commit 73f219c
Showing 1 changed file with 69 additions and 14 deletions.
83 changes: 69 additions & 14 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,12 @@ do_process_sync_command(StoreId, Command, Options) ->
CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
T0 = khepri_utils:start_timeout_window(Timeout),
Dest = case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined -> LeaderId;
undefined -> RaServer
LeaderId when LeaderId =/= undefined ->
last_message_was_async(StoreId),
LeaderId;
undefined ->
last_message_was_sync(StoreId),
RaServer
end,
case ra:process_command(Dest, Command, CommandOptions) of
{ok, Ret, _LeaderId} ->
Expand Down Expand Up @@ -903,9 +907,11 @@ process_async_command(
StoreId, Command, ?DEFAULT_RA_COMMAND_CORRELATION = Correlation, Priority) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
last_message_was_async(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority);
process_async_command(
StoreId, Command, Correlation, Priority) ->
last_message_was_async(StoreId),
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
ra:pipeline_command(LeaderId, Command, Correlation, Priority);
Expand Down Expand Up @@ -983,6 +989,7 @@ process_query(StoreId, QueryFun, Options) ->
end.

process_query1(StoreId, QueryFun, Options) ->
last_message_was_sync(StoreId),
LocalServerId = {StoreId, node()},
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, _NewLeaderId} ->
Expand All @@ -1009,9 +1016,10 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% the order of operations between updates and queries. We have to follow
%% several steps to prepare that condition.
%%
%% We first send an arbitrary query to the local Ra server. This is to
%% make sure that previously submitted pipelined commands were processed
%% by that server.
%% If the last message from the calling process to the local Ra server was
%% an async command or if it never sent a command yet, we first send an
%% arbitrary query to the local Ra server. This is to make sure that
%% previously submitted pipelined commands were processed by that server.
%%
%% For instance, if there was a pipelined command without any correlation
%% ID, it ensures it was forwarded to the leader. Likewise for a
Expand All @@ -1020,23 +1028,33 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% We can't have this guaranty for pipelined commands with a correlation
%% because the caller is responsible for receiving the rejection from the
%% follower and handle the redirect to the leader.
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
case was_last_message_sync(StoreId) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
add_applied_condition2(StoreId, Options, Timeout);
false ->
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
end
end.

add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query, there is a great chance that the leader
%% was cached, though not 100% guarantied.
%% After the previous local query or sync command if there was one, there
%% is a great chance that the leader was cached, though not 100%
%% guarantied.
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
%% Let's consider that the previous message was async. This will
%% force a new local query during the retry. We should know the
%% leader after that.
last_message_was_async(StoreId),
add_applied_condition1(StoreId, Options, Timeout)
end.

Expand Down Expand Up @@ -1091,6 +1109,43 @@ get_timeout(_) -> khepri_app:get_default_timeout().
clear_cache(_StoreId) ->
ok.

-define(LAST_MSG_WAS_SYNC_KEY(StoreId),
{?MODULE, last_message_was_sync, StoreId}).

-spec last_message_was_sync(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Remenbers that the calling process just made a synchronous command or
%% a query.

last_message_was_sync(StoreId) ->
Key = ?LAST_MSG_WAS_SYNC_KEY(StoreId),
_ = erlang:put(Key, true),
ok.

-spec last_message_was_async(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Remenbers that the calling process just made an asynchronous command.

last_message_was_async(StoreId) ->
Key = ?LAST_MSG_WAS_SYNC_KEY(StoreId),
_ = erlang:erase(Key),
ok.

-spec was_last_message_sync(StoreId) -> LastMsgWasSync when
StoreId :: khepri:store_id(),
LastMsgWasSync :: boolean().
%% @doc Indicates if the calling process sent a synchronous command or a query
%% before this call.
%%
%% @returns `true' if the calling process sent a synchrorous command or a
%% query to the given store before this call, `false' if the calling process
%% never sent anything to the given store, or if the last message was an
%% asynchrorous command, th

was_last_message_sync(StoreId) ->
Key = ?LAST_MSG_WAS_SYNC_KEY(StoreId),
erlang:get(Key) =:= true.

%% -------------------------------------------------------------------
%% ra_machine callbacks.
%% -------------------------------------------------------------------
Expand Down

0 comments on commit 73f219c

Please sign in to comment.