Skip to content

Commit

Permalink
Merge pull request eBay#47 from ClickHouse/pause-commit-while-snap-sync
Browse files Browse the repository at this point in the history
Pause state machine while snapshot is synced
  • Loading branch information
antonio2368 authored Aug 29, 2022
2 parents 33f60f9 + 91b5301 commit bdba298
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,19 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms, bool initial_commit_exec)
}

ulong index_to_commit = sm_commit_index_ + 1;
ptr<log_entry> le = log_store_->entry_at(index_to_commit);
p_tr( "commit upto %llu, current idx %llu\n",
quick_commit_index_.load(), index_to_commit );

ptr<log_entry> le = log_store_->entry_at(index_to_commit);
if (!le)
{
// LCOV_EXCL_START
p_ft( "failed to get log entry with idx %llu", index_to_commit );
ctx_->state_mgr_->system_exit(raft_err::N19_bad_log_idx_for_term);
::exit(-1);
// LCOV_EXCL_STOP
}

if (le->get_term() == 0) {
// LCOV_EXCL_START
// Zero term means that log store is corrupted
Expand Down Expand Up @@ -873,7 +882,7 @@ void raft_server::pause_state_machine_exeuction(size_t timeout_ms) {
}

void raft_server::resume_state_machine_execution() {
p_in( "pause state machine execution, previously %s, state machine %s",
p_in( "resume state machine execution, previously %s, state machine %s",
sm_commit_paused_ ? "PAUSED" : "ACTIVE",
sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" );
sm_commit_paused_ = false;
Expand Down
13 changes: 13 additions & 0 deletions src/handle_snapshot_sync.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,19 @@ bool raft_server::handle_snapshot_sync_req(snapshot_sync_req& req) {
}

if (is_last_obj) {
// let's pause committing in backgroud so it doesn't access logs
// while they are being compacted
pause_state_machine_exeuction();
while (sm_commit_exec_in_progress_)
std::this_thread::sleep_for(std::chrono::milliseconds(500));

struct ExecAutoResume {
explicit ExecAutoResume(std::function<void()> func) : clean_func_(func) {}
~ExecAutoResume() { clean_func_(); }
std::function<void()> clean_func_;
} exec_auto_resume([this](){ resume_state_machine_execution(); });


receiving_snapshot_ = false;

// Only follower will run this piece of code, but let's check it again
Expand Down

0 comments on commit bdba298

Please sign in to comment.