Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to potential pre-commit order inversion #140

Merged
merged 1 commit into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ protected:
void handle_reconnect_resp(resp_msg& resp);
void handle_custom_notification_resp(resp_msg& resp);

bool try_update_precommit_index(ulong desired, const size_t MAX_ATTEMPTS = 10);

void handle_ext_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err);
void handle_ext_resp_err(rpc_exception& err);
void handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p);
Expand Down
42 changes: 25 additions & 17 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -720,25 +720,10 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
// be greater than the last log index this server already has. We should
// always compare the target index with current precommit index, and take
// it only when it is greater than the previous one.
const size_t MAX_ATTEMPTS = 10;
size_t num_attempts = 0;
ulong prev_precommit_index = precommit_index_;
while ( prev_precommit_index < target_precommit_index &&
num_attempts < MAX_ATTEMPTS ) {
if ( precommit_index_.compare_exchange_strong( prev_precommit_index,
target_precommit_index ) ) {
break;
}
// Otherwise: retry until `precommit_index_` is equal to or greater than
// `target_precommit_index`.
num_attempts++;
}
if (num_attempts >= MAX_ATTEMPTS) {
bool pc_updated = try_update_precommit_index(target_precommit_index);
if (!pc_updated) {
// If updating `precommit_index_` failed, we SHOULD NOT update
// commit index as well.
p_er("updating precommit_index_ failed after %zu attempts, "
"last seen precommit_index_ %zu, target %zu",
num_attempts, prev_precommit_index, target_precommit_index);
} else {
commit( std::min( req.get_commit_idx(), target_precommit_index ) );
}
Expand Down Expand Up @@ -773,6 +758,29 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
return resp;
}

bool raft_server::try_update_precommit_index(ulong desired, const size_t MAX_ATTEMPTS) {
// If `MAX_ATTEMPTS == 0`, try forever.
size_t num_attempts = 0;
ulong prev_precommit_index = precommit_index_;
while ( prev_precommit_index < desired &&
num_attempts < MAX_ATTEMPTS ) {
if ( precommit_index_.compare_exchange_strong( prev_precommit_index,
desired ) ) {
return true;
}
// Otherwise: retry until `precommit_index_` is equal to or greater than
// `desired`.
num_attempts++;
}
if (precommit_index_ >= desired) {
return true;
}
p_er("updating precommit_index_ failed after %zu/%zu attempts, "
"last seen precommit_index_ %zu, target %zu",
num_attempts, MAX_ATTEMPTS, prev_precommit_index, desired);
return false;
}

void raft_server::handle_append_entries_resp(resp_msg& resp) {
peer_itor it = peers_.find(resp.get_src());
if (it == peers_.end()) {
Expand Down
2 changes: 1 addition & 1 deletion src/handle_client_request.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ ptr<resp_msg> raft_server::handle_cli_req(req_msg& req) {
if (num_entries) {
log_store_->end_of_append_batch(last_idx - num_entries, num_entries);
}
precommit_index_ = last_idx;
try_update_precommit_index(last_idx);
resp_idx = log_store_->next_slot();

// Finished appending logs and pre_commit of itself.
Expand Down
4 changes: 1 addition & 3 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1429,9 +1429,7 @@ ulong raft_server::store_log_entry(ptr<log_entry>& entry, ulong index) {

if ( role_ == srv_role::leader ) {
// Need to progress precommit index for config.
if (precommit_index_ < log_index) {
precommit_index_ = log_index;
}
try_update_precommit_index(log_index);
}
}

Expand Down