diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index 036dff8e..7b42b417 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -60,6 +60,13 @@ public: */ static const int32 LEAVE_LIMIT = 5; + /** + * For 2-node cluster, if the other peer is not responding for + * pre-vote more than this limit, adjust quorum size. + * Enabled only when `auto_adjust_quorum_for_small_cluster_` is on. + */ + static const int32 VOTE_LIMIT = 10; + peer( ptr& config, const context& ctx, timer_task::executor& hb_exec, diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index 7509032d..e83d0a11 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -86,6 +86,7 @@ struct raft_params { , auto_forwarding_(false) , use_bg_thread_for_urgent_commit_(true) , exclude_snp_receiver_from_quorum_(false) + , auto_adjust_quorum_for_small_cluster_(false) , locking_method_type_(dual_mutex) , return_method_(blocking) {} @@ -455,6 +456,13 @@ public: */ bool exclude_snp_receiver_from_quorum_; + /** + * If `true` and the size of the cluster is 2, the quorum size + * will be adjusted to 1 automatically, once one of two nodes + * becomes offline. + */ + bool auto_adjust_quorum_for_small_cluster_; + /** * Choose the type of lock that will be used by user threads. */ diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 96927c2b..d05b53b7 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -482,6 +482,7 @@ protected: struct pre_vote_status_t { pre_vote_status_t() : quorum_reject_count_(0) + , failure_count_(0) { reset(0); } void reset(ulong _term) { term_ = _term; @@ -493,7 +494,16 @@ protected: std::atomic live_; std::atomic dead_; std::atomic abandoned_; + + /** + * Number of pre-vote rejections by quorum. + */ std::atomic quorum_reject_count_; + + /** + * Number of pre-vote failures due to not-responding peers. + */ + std::atomic failure_count_; }; protected: diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 60bc4f60..f7801be1 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -77,6 +77,8 @@ void raft_server::request_append_entries() { } bool raft_server::request_append_entries(ptr p) { + static timer_helper chk_timer(1000*1000); + // Checking the validity of role first. if (role_ != srv_role::leader) { // WARNING: We should allow `write_paused_` state for @@ -93,6 +95,34 @@ bool raft_server::request_append_entries(ptr p) { ptr params = ctx_->get_params(); + if ( params->auto_adjust_quorum_for_small_cluster_ && + get_num_voting_members() == 2 && + chk_timer.timeout_and_reset() ) { + // If auto adjust mode is on for 2-node cluster, and + // the follower is not responding, adjust the quorum. + size_t num_not_responding_peers = get_not_responding_peers(); + size_t cur_quorum_size = get_quorum_for_commit(); + if ( num_not_responding_peers > 0 && + cur_quorum_size >= 1 ) { + p_wn("2-node cluster's follower is not responding long time, " + "adjust quorum to 1"); + ptr clone = cs_new(*params); + clone->custom_commit_quorum_size_ = 1; + clone->custom_election_quorum_size_ = 1; + ctx_->set_params(clone); + + } else if ( num_not_responding_peers == 0 && + cur_quorum_size == 0 ) { + // Recovered. + p_wn("2-node cluster's follower is responding now, " + "restore quorum with default value"); + ptr clone = cs_new(*params); + clone->custom_commit_quorum_size_ = 0; + clone->custom_election_quorum_size_ = 0; + ctx_->set_params(clone); + } + } + bool need_to_reconnect = p->need_to_reconnect(); int32 last_active_time_ms = p->get_active_timer_us() / 1000; if ( last_active_time_ms > diff --git a/src/handle_vote.cxx b/src/handle_vote.cxx index 87f06801..713abe5e 100644 --- a/src/handle_vote.cxx +++ b/src/handle_vote.cxx @@ -83,6 +83,34 @@ void raft_server::request_prevote() { } } + int quorum_size = get_quorum_for_election(); + if ( pre_vote_.live_ + pre_vote_.dead_ > 0 && + pre_vote_.live_ + pre_vote_.dead_ < quorum_size + 1) { + // Pre-vote failed due to non-responding voters. + pre_vote_.failure_count_++; + p_wn("total %zu nodes (including this node) responded for pre-vote " + "(term %zu, live %zu, dead %zu), at least %zu nodes should " + "respond. failure count %zu", + pre_vote_.live_.load() + pre_vote_.dead_.load(), + pre_vote_.term_, + pre_vote_.live_.load(), + pre_vote_.dead_.load(), + quorum_size + 1, + pre_vote_.failure_count_.load()); + } + int num_voting_members = get_num_voting_members(); + if ( params->auto_adjust_quorum_for_small_cluster_ && + num_voting_members == 2 && + pre_vote_.failure_count_ > peer::VOTE_LIMIT ) { + // 2-node cluster's pre-vote failed due to offline node. + p_wn("2-node cluster's pre-vote is failing long time, " + "adjust quorum to 1"); + ptr clone = cs_new(*params); + clone->custom_commit_quorum_size_ = 1; + clone->custom_election_quorum_size_ = 1; + ctx_->set_params(clone); + } + hb_alive_ = false; leader_ = -1; pre_vote_.reset(state_->get_term()); diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 772904a6..8d31e3a4 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -905,6 +905,7 @@ void raft_server::become_leader() { next_leader_candidate_ = -1; initialized_ = true; pre_vote_.quorum_reject_count_ = 0; + pre_vote_.failure_count_ = 0; data_fresh_ = true; request_append_entries(); @@ -1035,6 +1036,7 @@ void raft_server::become_follower() { initialized_ = true; uncommitted_config_.reset(); pre_vote_.quorum_reject_count_ = 0; + pre_vote_.failure_count_ = 0; // Drain all pending callback functions. drop_all_pending_commit_elems(); diff --git a/tests/unit/asio_service_test.cxx b/tests/unit/asio_service_test.cxx index d697e993..3de7ade7 100644 --- a/tests/unit/asio_service_test.cxx +++ b/tests/unit/asio_service_test.cxx @@ -752,6 +752,216 @@ int async_append_handler_test() { return 0; } +int auto_quorum_size_test() { + reset_log_files(); + + std::string s1_addr = "tcp://127.0.0.1:20010"; + std::string s2_addr = "tcp://127.0.0.1:20020"; + + RaftAsioPkg s1(1, s1_addr); + std::shared_ptr s2 = std::make_shared(2, s2_addr); + std::vector pkgs = {&s1, s2.get()}; + + _msg("launching asio-raft servers\n"); + CHK_Z( launch_servers(pkgs, false) ); + + _msg("organizing raft group\n"); + CHK_Z( make_group(pkgs) ); + TestSuite::sleep_sec(1, "wait for Raft group ready"); + + // Set custom term counter, and enable auto quorum size mode. + auto custom_inc_term = [](uint64_t cur_term) -> uint64_t { + return (cur_term / 10) + 10; + }; + s1.raftServer->set_inc_term_func(custom_inc_term); + s2->raftServer->set_inc_term_func(custom_inc_term); + + raft_params params = s1.raftServer->get_current_params(); + params.auto_adjust_quorum_for_small_cluster_ = true; + s1.raftServer->update_params(params); + s2->raftServer->update_params(params); + + CHK_TRUE( s1.raftServer->is_leader() ); + CHK_EQ(1, s1.raftServer->get_leader()); + CHK_EQ(1, s2->raftServer->get_leader()); + + // Replication. + for (size_t ii=0; ii<10; ++ii) { + std::string msg_str = std::to_string(ii); + ptr msg = buffer::alloc(sizeof(uint32_t) + msg_str.size()); + buffer_serializer bs(msg); + bs.put_str(msg_str); + s1.raftServer->append_entries( {msg} ); + } + TestSuite::sleep_sec(1, "wait for replication"); + uint64_t committed_idx = s1.raftServer->get_committed_log_idx(); + + // State machine should be identical. + CHK_OK( s2->getTestSm()->isSame( *s1.getTestSm() ) ); + + // Shutdown S2. + s2->raftServer->shutdown(); + s2.reset(); + + TestSuite::sleep_ms( RaftAsioPkg::HEARTBEAT_MS * 30, + "wait for quorum adjust" ); + + // More replication. + for (size_t ii=10; ii<11; ++ii) { + std::string msg_str = std::to_string(ii); + ptr msg = buffer::alloc(sizeof(uint32_t) + msg_str.size()); + buffer_serializer bs(msg); + bs.put_str(msg_str); + s1.raftServer->append_entries( {msg} ); + } + + // Replication should succeed: committed index should be moved forward. + TestSuite::sleep_sec(1, "wait for replication"); + CHK_EQ( committed_idx + 1, + s1.raftServer->get_committed_log_idx() ); + + // Restart S2. + _msg("launching S2 again\n"); + RaftAsioPkg s2_new(2, s2_addr); + CHK_Z( launch_servers({&s2_new}, false) ); + TestSuite::sleep_sec(1, "wait for S2 ready"); + CHK_EQ( committed_idx + 1, + s2_new.raftServer->get_committed_log_idx() ); + + // More replication. + for (size_t ii=11; ii<12; ++ii) { + std::string msg_str = std::to_string(ii); + ptr msg = buffer::alloc(sizeof(uint32_t) + msg_str.size()); + buffer_serializer bs(msg); + bs.put_str(msg_str); + s1.raftServer->append_entries( {msg} ); + } + + // Both of them should have the same commit number. + TestSuite::sleep_sec(1, "wait for replication"); + CHK_EQ( committed_idx + 2, + s1.raftServer->get_committed_log_idx() ); + CHK_EQ( committed_idx + 2, + s2_new.raftServer->get_committed_log_idx() ); + + s1.raftServer->shutdown(); + s2_new.raftServer->shutdown(); + TestSuite::sleep_sec(1, "shutting down"); + + SimpleLogger::shutdown(); + return 0; +} + +int auto_quorum_size_election_test() { + reset_log_files(); + + std::string s1_addr = "tcp://127.0.0.1:20010"; + std::string s2_addr = "tcp://127.0.0.1:20020"; + + std::shared_ptr s1 = std::make_shared(1, s1_addr); + std::shared_ptr s2 = std::make_shared(2, s2_addr); + std::vector pkgs = {s1.get(), s2.get()}; + + _msg("launching asio-raft servers\n"); + CHK_Z( launch_servers(pkgs, false) ); + + _msg("organizing raft group\n"); + CHK_Z( make_group(pkgs) ); + TestSuite::sleep_sec(1, "wait for Raft group ready"); + + // Set custom term counter, and enable auto quorum size mode. + auto custom_inc_term = [](uint64_t cur_term) -> uint64_t { + return (cur_term / 10) + 10; + }; + s1->raftServer->set_inc_term_func(custom_inc_term); + s2->raftServer->set_inc_term_func(custom_inc_term); + + raft_params params = s1->raftServer->get_current_params(); + params.auto_adjust_quorum_for_small_cluster_ = true; + s1->raftServer->update_params(params); + s2->raftServer->update_params(params); + + CHK_TRUE( s1->raftServer->is_leader() ); + CHK_EQ(1, s1->raftServer->get_leader()); + CHK_EQ(1, s2->raftServer->get_leader()); + + // Replication. + for (size_t ii=0; ii<10; ++ii) { + std::string msg_str = std::to_string(ii); + ptr msg = buffer::alloc(sizeof(uint32_t) + msg_str.size()); + buffer_serializer bs(msg); + bs.put_str(msg_str); + s1->raftServer->append_entries( {msg} ); + } + TestSuite::sleep_sec(1, "wait for replication"); + + // State machine should be identical. + CHK_OK( s2->getTestSm()->isSame( *s1->getTestSm() ) ); + + // Shutdown S1. + s1->raftServer->shutdown(); + s1.reset(); + + // Wait for adjust quorum and self election. + TestSuite::sleep_ms( RaftAsioPkg::HEARTBEAT_MS * 50, + "wait for quorum adjust" ); + + // S2 should be a leader. + CHK_TRUE( s2->raftServer->is_leader() ); + CHK_EQ(2, s2->raftServer->get_leader()); + uint64_t committed_idx = s2->raftServer->get_committed_log_idx(); + + // More replication. + for (size_t ii=10; ii<11; ++ii) { + std::string msg_str = std::to_string(ii); + ptr msg = buffer::alloc(sizeof(uint32_t) + msg_str.size()); + buffer_serializer bs(msg); + bs.put_str(msg_str); + s2->raftServer->append_entries( {msg} ); + } + + // Replication should succeed: committed index should be moved forward. + TestSuite::sleep_sec(1, "wait for replication"); + CHK_EQ( committed_idx + 1, + s2->raftServer->get_committed_log_idx() ); + + // Restart S1. + _msg("launching S1 again\n"); + RaftAsioPkg s1_new(1, s1_addr); + CHK_Z( launch_servers({&s1_new}, false) ); + TestSuite::sleep_sec(1, "wait for S2 ready"); + CHK_EQ( committed_idx + 1, + s1_new.raftServer->get_committed_log_idx() ); + + // S2 should remain as a leader. + CHK_TRUE( s2->raftServer->is_leader() ); + CHK_EQ(2, s1_new.raftServer->get_leader()); + CHK_EQ(2, s2->raftServer->get_leader()); + + // More replication. + for (size_t ii=11; ii<12; ++ii) { + std::string msg_str = std::to_string(ii); + ptr msg = buffer::alloc(sizeof(uint32_t) + msg_str.size()); + buffer_serializer bs(msg); + bs.put_str(msg_str); + s2->raftServer->append_entries( {msg} ); + } + + // Both of them should have the same commit number. + TestSuite::sleep_sec(1, "wait for replication"); + CHK_EQ( committed_idx + 2, + s1_new.raftServer->get_committed_log_idx() ); + CHK_EQ( committed_idx + 2, + s2->raftServer->get_committed_log_idx() ); + + s2->raftServer->shutdown(); + s1_new.raftServer->shutdown(); + TestSuite::sleep_sec(1, "shutting down"); + + SimpleLogger::shutdown(); + return 0; +} + } // namespace asio_service_test; using namespace asio_service_test; @@ -788,6 +998,12 @@ int main(int argc, char** argv) { ts.doTest( "async append handler test", async_append_handler_test ); + ts.doTest( "auto quorum size test", + auto_quorum_size_test ); + + ts.doTest( "auto quorum size for election test", + auto_quorum_size_election_test ); + #ifdef ENABLE_RAFT_STATS _msg("raft stats: ENABLED\n"); #else