From 22b6a9912f887416bff5a1df7476ae8051766258 Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Mon, 19 Jul 2021 21:01:44 -0700 Subject: [PATCH 1/2] Add new APIs to pause/resume state machine execution * The state machine execution is done in background asynchronously. We need an API to control the execution flow for various purposes. --- include/libnuraft/raft_server.hxx | 36 ++++++++++++ src/global_mgr.cxx | 26 ++++++++- src/handle_commit.cxx | 69 +++++++++++++++++++++- src/raft_server.cxx | 2 + tests/unit/asio_service_test.cxx | 95 +++++++++++++++++++++++++++++++ 5 files changed, 222 insertions(+), 6 deletions(-) diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 581b6d49..4b756f5c 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -638,6 +638,32 @@ public: */ void set_inc_term_func(srv_state::inc_term_func func); + /** + * Pause the background execution of the state machine. + * If an operation execution is currently happening, the state + * machine may not be paused immediately. + * + * @param timeout_ms If non-zero, this function will be blocked until + * either it completely pauses the state machine execution + * or reaches the given time limit in milliseconds. + * Otherwise, this function will return immediately, and + * there is a possibility that the state machine execution + * is still happening. + */ + void pause_state_machine_exeuction(size_t timeout_ms = 0); + + /** + * Resume the background execution of state machine. + */ + void resume_state_machine_execution(); + + /** + * Check if the state machine execution is paused. + * + * @return `true` if paused. + */ + bool is_state_machine_execution_paused() const; + protected: typedef std::unordered_map>::const_iterator peer_itor; @@ -996,6 +1022,16 @@ protected: */ std::atomic write_paused_; + /** + * If `true`, state machine commit will be paused. + */ + std::atomic sm_commit_paused_; + + /** + * If `true`, the background thread is doing state machine execution. + */ + std::atomic sm_commit_exec_in_progress_; + /** * Server ID indicates the candidate for the next leader, * as a part of leadership takeover task. diff --git a/src/global_mgr.cxx b/src/global_mgr.cxx index 38a55761..348110a4 100644 --- a/src/global_mgr.cxx +++ b/src/global_mgr.cxx @@ -305,6 +305,21 @@ void nuraft_global_mgr::commit_worker_loop(ptr handle) { } if (!target) continue; + ptr& l_ = target->l_; + + // Whenever we find a task to execute, skip next sleeping for any tasks + // that can be queued in the meantime. + skip_sleeping = true; + + p_tr("execute commit for %p", target.get()); + + if (target->sm_commit_paused_) { + p_tr("commit of this server has been paused"); + // Since there can be other Raft server waiting for being served, + // need to skip nest sleep. + continue; + } + if ( target->quick_commit_index_ <= target->sm_commit_index_ || target->log_store_->next_slot() - 1 <= target->sm_commit_index_ ) { // State machine's commit index is large enough not to execute commit @@ -312,8 +327,7 @@ void nuraft_global_mgr::commit_worker_loop(ptr handle) { continue; } - ptr& l_ = target->l_; - p_tr("executed commit by global worker, queue length %zu", queue_length); + p_tr("execute commit by global worker, queue length %zu", queue_length); bool finished_in_time = target->commit_in_bg_exec(config_.max_scheduling_unit_ms_); if (!finished_in_time) { @@ -322,7 +336,8 @@ void nuraft_global_mgr::commit_worker_loop(ptr handle) { p_tr("couldn't finish in time (%zu ms), re-push to queue", config_.max_scheduling_unit_ms_); request_commit(target); - skip_sleeping = true; + } else { + p_tr("executed in time"); } } } @@ -367,6 +382,11 @@ void nuraft_global_mgr::append_worker_loop(ptr handle) { if (!target) continue; ptr& l_ = target->l_; + + // Whenever we find a task to execute, skip next sleeping for any tasks + // that can be queued in the meantime. + skip_sleeping = true; + p_tr("executed append_entries by global worker, queue length %zu", queue_length); target->append_entries_in_bg_exec(); diff --git a/src/handle_commit.cxx b/src/handle_commit.cxx index e780dc7b..f1d9e907 100644 --- a/src/handle_commit.cxx +++ b/src/handle_commit.cxx @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and limitations under the License. **************************************************************************/ +#include "internal_timer.hxx" #include "raft_server.hxx" #include "cluster_config.hxx" @@ -107,9 +108,16 @@ void raft_server::commit_in_bg() { sm_commit_index_ >= log_store_->next_slot() - 1 ) { std::unique_lock lock(commit_cv_lock_); - auto wait_check = [this] () { - return (log_store_->next_slot() - 1 > sm_commit_index_ && - quick_commit_index_ > sm_commit_index_) || stopping_; + auto wait_check = [this]() { + if (stopping_) { + // WARNING: `stopping_` flag should have the highest priority. + return true; + } + if (sm_commit_paused_) { + return false; + } + return ( log_store_->next_slot() - 1 > sm_commit_index_ && + quick_commit_index_ > sm_commit_index_ ); }; p_tr("commit_cv_ sleep\n"); commit_cv_.wait(lock, wait_check); @@ -161,6 +169,14 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms) { return true; } + sm_commit_exec_in_progress_ = true; + // Clear the flag automatically once we exit this function. + struct ExecCommitAutoCleaner { + ExecCommitAutoCleaner(std::function func) : clean_func_(func) {} + ~ExecCommitAutoCleaner() { clean_func_(); } + std::function clean_func_; + } exec_auto_cleaner([this](){ sm_commit_exec_in_progress_ = false; }); + p_db( "commit upto %ld, curruent idx %ld\n", quick_commit_index_.load(), sm_commit_index_.load() ); @@ -193,6 +209,11 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms) { } first_loop_exec = false; + // Break the loop if state machine commit is paused. + if (sm_commit_paused_) { + break; + } + ulong index_to_commit = sm_commit_index_ + 1; ptr le = log_store_->entry_at(index_to_commit); p_tr( "commit upto %llu, curruent idx %llu\n", @@ -776,5 +797,47 @@ void raft_server::remove_peer_from_peers(const ptr& pp) { peers_.erase(pp->get_id()); } +void raft_server::pause_state_machine_exeuction(size_t timeout_ms) { + p_in( "pause state machine execution, previously %s, state machine %s", + sm_commit_paused_ ? "PAUSED" : "ACTIVE", + sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" ); + sm_commit_paused_ = true; + + if (!timeout_ms) { + return; + } + timer_helper timer(timeout_ms * 1000); + while (sm_commit_exec_in_progress_ && !timer.timeout()) { + timer_helper::sleep_ms(10); + } + p_in( "waited %zu ms, state machine %s", + timer.get_ms(), + sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" ); +} + +void raft_server::resume_state_machine_execution() { + p_in( "pause state machine execution, previously %s, state machine %s", + sm_commit_paused_ ? "PAUSED" : "ACTIVE", + sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" ); + sm_commit_paused_ = false; + + nuraft_global_mgr* mgr = nuraft_global_mgr::get_instance(); + if (mgr) { + // Global mgr. + mgr->request_commit( this->shared_from_this() ); + } else { + // Local commit thread. + std::unique_lock l(commit_cv_lock_); + commit_cv_.notify_one(); + } +} + +bool raft_server::is_state_machine_execution_paused() const { + if (sm_commit_paused_ && !sm_commit_exec_in_progress_) { + return true; + } + return false; +} + } // namespace nuraft; diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 42e26b90..94244e35 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -67,6 +67,8 @@ raft_server::raft_server(context* ctx, const init_options& opt) , commit_bg_stopped_(false) , append_bg_stopped_(false) , write_paused_(false) + , sm_commit_paused_(false) + , sm_commit_exec_in_progress_(false) , next_leader_candidate_(-1) , im_learner_(false) , serving_req_(false) diff --git a/tests/unit/asio_service_test.cxx b/tests/unit/asio_service_test.cxx index a80f1b7b..52241edd 100644 --- a/tests/unit/asio_service_test.cxx +++ b/tests/unit/asio_service_test.cxx @@ -1981,6 +1981,98 @@ int snapshot_context_timeout_removed_server_test() { return 0; } +int pause_state_machine_execution_test(bool use_global_mgr) { + reset_log_files(); + + if (use_global_mgr) { + nuraft_global_mgr::init(); + } + + std::string s1_addr = "tcp://127.0.0.1:20010"; + std::string s2_addr = "tcp://127.0.0.1:20020"; + std::string s3_addr = "tcp://127.0.0.1:20030"; + + RaftAsioPkg s1(1, s1_addr); + RaftAsioPkg s2(2, s2_addr); + RaftAsioPkg s3(3, s3_addr); + std::vector pkgs = {&s1, &s2, &s3}; + + _msg("launching asio-raft servers\n"); + CHK_Z( launch_servers(pkgs, false) ); + + _msg("organizing raft group\n"); + CHK_Z( make_group(pkgs) ); + + // Set async. + for (auto& entry: pkgs) { + RaftAsioPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.return_method_ = raft_params::async_handler; + pp->raftServer->update_params(param); + } + + // Append messages asynchronously. + const size_t NUM = 10; + std::list< ptr< cmd_result< ptr > > > handlers; + std::list idx_list; + std::mutex idx_list_lock; + for (size_t ii=0; ii msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + ptr< cmd_result< ptr > > ret = + s1.raftServer->append_entries( {msg} ); + + cmd_result< ptr >::handler_type my_handler = + std::bind( async_handler, + &idx_list, + &idx_list_lock, + std::placeholders::_1, + std::placeholders::_2 ); + ret->when_ready( my_handler ); + + handlers.push_back(ret); + } + + // Pause S3's state machine. + s3.raftServer->pause_state_machine_exeuction(100); + + TestSuite::sleep_sec(1, "replication"); + + // Now all async handlers should have result. + { + std::lock_guard l(idx_list_lock); + CHK_EQ(NUM, idx_list.size()); + } + + // The state machines of S1 and S2 should be identical, but not S3. + CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) ); + CHK_FALSE( s3.getTestSm()->isSame( *s1.getTestSm() ) ); + + // Resume the state machine. + s3.raftServer->resume_state_machine_execution(); + TestSuite::sleep_sec(1, "resuming state machine execution"); + + // Now it should have the same data. + CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) ); + + // Pause again. + s3.raftServer->pause_state_machine_exeuction(100); + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + + // Even with paused state machine, shutdown should work. + s3.raftServer->shutdown(); + TestSuite::sleep_sec(1, "shutting down"); + + SimpleLogger::shutdown(); + if (use_global_mgr) { + nuraft_global_mgr::shutdown(); + } + return 0; +} + } // namespace asio_service_test; using namespace asio_service_test; @@ -2070,6 +2162,9 @@ int main(int argc, char** argv) { snapshot_context_timeout_removed_server_test ); } + ts.doTest( "pause state machine execution test", + pause_state_machine_execution_test, + TestRange( {false, true} ) ); #ifdef ENABLE_RAFT_STATS _msg("raft stats: ENABLED\n"); From d66b96b5be1fc196f3c4b161dc489fd7c1e44660 Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Tue, 20 Jul 2021 16:16:47 -0700 Subject: [PATCH 2/2] [Update PR] Log and test --- src/handle_commit.cxx | 6 ++++-- tests/unit/asio_service_test.cxx | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/handle_commit.cxx b/src/handle_commit.cxx index f1d9e907..97318237 100644 --- a/src/handle_commit.cxx +++ b/src/handle_commit.cxx @@ -798,9 +798,11 @@ void raft_server::remove_peer_from_peers(const ptr& pp) { } void raft_server::pause_state_machine_exeuction(size_t timeout_ms) { - p_in( "pause state machine execution, previously %s, state machine %s", + p_in( "pause state machine execution, previously %s, state machine %s, " + "timeout %zu ms", sm_commit_paused_ ? "PAUSED" : "ACTIVE", - sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING" ); + sm_commit_exec_in_progress_ ? "RUNNING" : "SLEEPING", + timeout_ms ); sm_commit_paused_ = true; if (!timeout_ms) { diff --git a/tests/unit/asio_service_test.cxx b/tests/unit/asio_service_test.cxx index 52241edd..a970fa0d 100644 --- a/tests/unit/asio_service_test.cxx +++ b/tests/unit/asio_service_test.cxx @@ -2037,6 +2037,8 @@ int pause_state_machine_execution_test(bool use_global_mgr) { // Pause S3's state machine. s3.raftServer->pause_state_machine_exeuction(100); + CHK_TRUE( s3.raftServer->is_state_machine_execution_paused() ); + TestSuite::sleep_sec(1, "replication"); // Now all async handlers should have result.