diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 4bdaddfd..1eb05c98 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -1030,40 +1030,36 @@ add_applied_condition2(StoreId, Options, Timeout) -> end. add_applied_condition3(StoreId, Options, LeaderId, Timeout) -> - %% We query the leader to know the last index it committed. We also - %% double-check it is still the leader; if it is not, we recurse. + %% We query the leader to know the last index it committed in which term. T0 = khepri_utils:start_timeout_window(Timeout), - case ra:member_overview(LeaderId, Timeout) of - {ok, Overview, LeaderId} -> - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - - %% Now that we know the last committed index of the leader, we can - %% perform an arbitrary query on the local server. The query will - %% wait for that same index to be applied locally before it is - %% executed. - %% - %% We don't care about the result of that query. We just want to - %% block until the latest commands are applied locally. - #{log := #{last_index := LastIndex}, - current_term := CurrentTerm} = Overview, - Condition = {applied, {LastIndex, CurrentTerm}}, - Options1 = Options#{condition => Condition, - timeout => NewTimeout}, - {ok, Options1}; - {ok, _Overview, NewLeaderId} -> - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout); - {timeout, _LeaderId} -> + try + case ra:key_metrics(LeaderId, Timeout) of + #{last_index := LastIndex, term := Term} -> + NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0), + + %% Now that we know the last committed index of the leader, we + %% can perform an arbitrary query on the local server. The + %% query will wait for that same index to be applied locally + %% before it is executed. + %% + %% We don't care about the result of that query. We just want + %% to block until the latest commands are applied locally. + Condition = {applied, {LastIndex, Term}}, + Options1 = Options#{condition => Condition, + timeout => NewTimeout1}, + {ok, Options1}; + _ -> + timer:sleep(200), + NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), + add_applied_condition1(StoreId, Options, NewTimeout) + end + catch + error:{erpc, timeout} -> {error, timeout}; - {error, Reason} - when ?HAS_TIME_LEFT(Timeout) andalso - (Reason == noproc orelse Reason == nodedown orelse - Reason == shutdown) -> + error:{erpc, noconnection} -> timer:sleep(200), - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - add_applied_condition1(StoreId, Options, NewTimeout); - Error -> - Error + NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0), + add_applied_condition1(StoreId, Options, NewTimeout2) end. -spec get_timeout(Options) -> Timeout when