Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new APIs to pause/resume state machine execution #234

Merged
merged 2 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,32 @@ public:
*/
void set_inc_term_func(srv_state::inc_term_func func);

/**
* Pause the background execution of the state machine.
* If an operation execution is currently happening, the state
* machine may not be paused immediately.
*
* @param timeout_ms If non-zero, this function will be blocked until
* either it completely pauses the state machine execution
* or reaches the given time limit in milliseconds.
* Otherwise, this function will return immediately, and
* there is a possibility that the state machine execution
* is still happening.
*/
void pause_state_machine_exeuction(size_t timeout_ms = 0);

/**
* Resume the background execution of state machine.
*/
void resume_state_machine_execution();

/**
* Check if the state machine execution is paused.
*
* @return `true` if paused.
*/
bool is_state_machine_execution_paused() const;

protected:
typedef std::unordered_map<int32, ptr<peer>>::const_iterator peer_itor;

Expand Down Expand Up @@ -996,6 +1022,16 @@ protected:
*/
std::atomic<bool> write_paused_;

/**
* If `true`, state machine commit will be paused.
*/
std::atomic<bool> sm_commit_paused_;

/**
* If `true`, the background thread is doing state machine execution.
*/
std::atomic<bool> sm_commit_exec_in_progress_;

/**
* Server ID indicates the candidate for the next leader,
* as a part of leadership takeover task.
Expand Down
26 changes: 23 additions & 3 deletions src/global_mgr.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,29 @@ void nuraft_global_mgr::commit_worker_loop(ptr<worker_handle> handle) {
}
if (!target) continue;

ptr<logger>& l_ = target->l_;

// Whenever we find a task to execute, skip next sleeping for any tasks
// that can be queued in the meantime.
skip_sleeping = true;

p_tr("execute commit for %p", target.get());

if (target->sm_commit_paused_) {
p_tr("commit of this server has been paused");
// Since there can be other Raft server waiting for being served,
// need to skip nest sleep.
continue;
}

if ( target->quick_commit_index_ <= target->sm_commit_index_ ||
target->log_store_->next_slot() - 1 <= target->sm_commit_index_ ) {
// State machine's commit index is large enough not to execute commit
// (see the comment in `commit_in_bg()`).
continue;
}

ptr<logger>& l_ = target->l_;
p_tr("executed commit by global worker, queue length %zu", queue_length);
p_tr("execute commit by global worker, queue length %zu", queue_length);
bool finished_in_time =
target->commit_in_bg_exec(config_.max_scheduling_unit_ms_);
if (!finished_in_time) {
Expand All @@ -322,7 +336,8 @@ void nuraft_global_mgr::commit_worker_loop(ptr<worker_handle> handle) {
p_tr("couldn't finish in time (%zu ms), re-push to queue",
config_.max_scheduling_unit_ms_);
request_commit(target);
skip_sleeping = true;
} else {
p_tr("executed in time");
}
}
}
Expand Down Expand Up @@ -367,6 +382,11 @@ void nuraft_global_mgr::append_worker_loop(ptr<worker_handle> handle) {
if (!target) continue;

ptr<logger>& l_ = target->l_;

// Whenever we find a task to execute, skip next sleeping for any tasks
// that can be queued in the meantime.
skip_sleeping = true;

p_tr("executed append_entries by global worker, queue length %zu",
queue_length);
target->append_entries_in_bg_exec();
Expand Down
71 changes: 68 additions & 3 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#include "internal_timer.hxx"
#include "raft_server.hxx"

#include "cluster_config.hxx"
Expand Down Expand Up @@ -107,9 +108,16 @@ void raft_server::commit_in_bg() {
sm_commit_index_ >= log_store_->next_slot() - 1 ) {
std::unique_lock<std::mutex> lock(commit_cv_lock_);

auto wait_check = [this] () {
return (log_store_->next_slot() - 1 > sm_commit_index_ &&
quick_commit_index_ > sm_commit_index_) || stopping_;
auto wait_check = [this]() {
if (stopping_) {
// WARNING: `stopping_` flag should have the highest priority.
return true;
}
if (sm_commit_paused_) {
return false;
}
return ( log_store_->next_slot() - 1 > sm_commit_index_ &&
quick_commit_index_ > sm_commit_index_ );
};
p_tr("commit_cv_ sleep\n");
commit_cv_.wait(lock, wait_check);
Expand Down Expand Up @@ -161,6 +169,14 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms) {
return true;
}

sm_commit_exec_in_progress_ = true;
// Clear the flag automatically once we exit this function.
struct ExecCommitAutoCleaner {
ExecCommitAutoCleaner(std::function<void()> func) : clean_func_(func) {}
~ExecCommitAutoCleaner() { clean_func_(); }
std::function<void()> clean_func_;
} exec_auto_cleaner([this](){ sm_commit_exec_in_progress_ = false; });

p_db( "commit upto %ld, curruent idx %ld\n",
quick_commit_index_.load(), sm_commit_index_.load() );

Expand Down Expand Up @@ -193,6 +209,11 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms) {
}
first_loop_exec = false;

// Break the loop if state machine commit is paused.
if (sm_commit_paused_) {
break;
}

ulong index_to_commit = sm_commit_index_ + 1;
ptr<log_entry> le = log_store_->entry_at(index_to_commit);
p_tr( "commit upto %llu, curruent idx %llu\n",
Expand Down Expand Up @@ -776,5 +797,49 @@ void raft_server::remove_peer_from_peers(const ptr<peer>& pp) {
peers_.erase(pp->get_id());
}

void raft_server::pause_state_machine_exeuction(size_t timeout_ms) {
p_in( "pause state machine execution, previously %s, state machine %s, "
"timeout %zu ms",
sm_commit_paused_ ? "PAUSED" : "ACTIVE",
sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING",
timeout_ms );
sm_commit_paused_ = true;

if (!timeout_ms) {
return;
}
timer_helper timer(timeout_ms * 1000);
while (sm_commit_exec_in_progress_ && !timer.timeout()) {
timer_helper::sleep_ms(10);
}
p_in( "waited %zu ms, state machine %s",
timer.get_ms(),
sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" );
}

void raft_server::resume_state_machine_execution() {
p_in( "pause state machine execution, previously %s, state machine %s",
sm_commit_paused_ ? "PAUSED" : "ACTIVE",
sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" );
sm_commit_paused_ = false;

nuraft_global_mgr* mgr = nuraft_global_mgr::get_instance();
if (mgr) {
// Global mgr.
mgr->request_commit( this->shared_from_this() );
} else {
// Local commit thread.
std::unique_lock<std::mutex> l(commit_cv_lock_);
commit_cv_.notify_one();
}
}

bool raft_server::is_state_machine_execution_paused() const {
if (sm_commit_paused_ && !sm_commit_exec_in_progress_) {
return true;
}
return false;
}

} // namespace nuraft;

2 changes: 2 additions & 0 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ raft_server::raft_server(context* ctx, const init_options& opt)
, commit_bg_stopped_(false)
, append_bg_stopped_(false)
, write_paused_(false)
, sm_commit_paused_(false)
, sm_commit_exec_in_progress_(false)
, next_leader_candidate_(-1)
, im_learner_(false)
, serving_req_(false)
Expand Down
97 changes: 97 additions & 0 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1981,6 +1981,100 @@ int snapshot_context_timeout_removed_server_test() {
return 0;
}

int pause_state_machine_execution_test(bool use_global_mgr) {
reset_log_files();

if (use_global_mgr) {
nuraft_global_mgr::init();
}

std::string s1_addr = "tcp://127.0.0.1:20010";
std::string s2_addr = "tcp://127.0.0.1:20020";
std::string s3_addr = "tcp://127.0.0.1:20030";

RaftAsioPkg s1(1, s1_addr);
RaftAsioPkg s2(2, s2_addr);
RaftAsioPkg s3(3, s3_addr);
std::vector<RaftAsioPkg*> pkgs = {&s1, &s2, &s3};

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );

// Set async.
for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
pp->raftServer->update_params(param);
}

// Append messages asynchronously.
const size_t NUM = 10;
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
std::list<ulong> idx_list;
std::mutex idx_list_lock;
for (size_t ii=0; ii<NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

cmd_result< ptr<buffer> >::handler_type my_handler =
std::bind( async_handler,
&idx_list,
&idx_list_lock,
std::placeholders::_1,
std::placeholders::_2 );
ret->when_ready( my_handler );

handlers.push_back(ret);
}

// Pause S3's state machine.
s3.raftServer->pause_state_machine_exeuction(100);

CHK_TRUE( s3.raftServer->is_state_machine_execution_paused() );

TestSuite::sleep_sec(1, "replication");

// Now all async handlers should have result.
{
std::lock_guard<std::mutex> l(idx_list_lock);
CHK_EQ(NUM, idx_list.size());
}

// The state machines of S1 and S2 should be identical, but not S3.
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_FALSE( s3.getTestSm()->isSame( *s1.getTestSm() ) );

// Resume the state machine.
s3.raftServer->resume_state_machine_execution();
TestSuite::sleep_sec(1, "resuming state machine execution");

// Now it should have the same data.
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );

// Pause again.
s3.raftServer->pause_state_machine_exeuction(100);

s1.raftServer->shutdown();
s2.raftServer->shutdown();

// Even with paused state machine, shutdown should work.
s3.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
if (use_global_mgr) {
nuraft_global_mgr::shutdown();
}
return 0;
}

} // namespace asio_service_test;
using namespace asio_service_test;

Expand Down Expand Up @@ -2070,6 +2164,9 @@ int main(int argc, char** argv) {
snapshot_context_timeout_removed_server_test );
}

ts.doTest( "pause state machine execution test",
pause_state_machine_execution_test,
TestRange<bool>( {false, true} ) );

#ifdef ENABLE_RAFT_STATS
_msg("raft stats: ENABLED\n");
Expand Down