Skip to content

Commit

Permalink
Support auto adjusting quorum size for 2-node cluster
Browse files Browse the repository at this point in the history
* The quorum size of the 2-node cluster is 2, which means if any node
goes offline, the cluster becomes unavailable immediately.

* Added a configurable option: if we turn on this option, the quorum
size of the 2-node cluster is automatically adjusted to 1, if any
node becomes offline. The remaining node can do self-election and
also can commit incoming requests. The quorum size will be restored
using the default value once the offline node becomes online.

* It is not entirely safe, and dealing with log divergence is the
users' responsibility. Using the custom term counter to avoid
duplicate terms may be helpful.
  • Loading branch information
greensky00 committed Jun 10, 2020
1 parent 498cc30 commit c8b077a
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 0 deletions.
7 changes: 7 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public:
*/
static const int32 LEAVE_LIMIT = 5;

/**
* For 2-node cluster, if the other peer is not responding for
* pre-vote more than this limit, adjust quorum size.
* Enabled only when `auto_adjust_quorum_for_small_cluster_` is on.
*/
static const int32 VOTE_LIMIT = 10;

peer( ptr<srv_config>& config,
const context& ctx,
timer_task<int32>::executor& hb_exec,
Expand Down
8 changes: 8 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct raft_params {
, auto_forwarding_(false)
, use_bg_thread_for_urgent_commit_(true)
, exclude_snp_receiver_from_quorum_(false)
, auto_adjust_quorum_for_small_cluster_(false)
, locking_method_type_(dual_mutex)
, return_method_(blocking)
{}
Expand Down Expand Up @@ -455,6 +456,13 @@ public:
*/
bool exclude_snp_receiver_from_quorum_;

/**
* If `true` and the size of the cluster is 2, the quorum size
* will be adjusted to 1 automatically, once one of two nodes
* becomes offline.
*/
bool auto_adjust_quorum_for_small_cluster_;

/**
* Choose the type of lock that will be used by user threads.
*/
Expand Down
10 changes: 10 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ protected:
struct pre_vote_status_t {
pre_vote_status_t()
: quorum_reject_count_(0)
, failure_count_(0)
{ reset(0); }
void reset(ulong _term) {
term_ = _term;
Expand All @@ -493,7 +494,16 @@ protected:
std::atomic<int32> live_;
std::atomic<int32> dead_;
std::atomic<int32> abandoned_;

/**
* Number of pre-vote rejections by quorum.
*/
std::atomic<int32> quorum_reject_count_;

/**
* Number of pre-vote failures due to not-responding peers.
*/
std::atomic<int32> failure_count_;
};

protected:
Expand Down
30 changes: 30 additions & 0 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ void raft_server::request_append_entries() {
}

bool raft_server::request_append_entries(ptr<peer> p) {
static timer_helper chk_timer(1000*1000);

// Checking the validity of role first.
if (role_ != srv_role::leader) {
// WARNING: We should allow `write_paused_` state for
Expand All @@ -93,6 +95,34 @@ bool raft_server::request_append_entries(ptr<peer> p) {

ptr<raft_params> params = ctx_->get_params();

if ( params->auto_adjust_quorum_for_small_cluster_ &&
get_num_voting_members() == 2 &&
chk_timer.timeout_and_reset() ) {
// If auto adjust mode is on for 2-node cluster, and
// the follower is not responding, adjust the quorum.
size_t num_not_responding_peers = get_not_responding_peers();
size_t cur_quorum_size = get_quorum_for_commit();
if ( num_not_responding_peers > 0 &&
cur_quorum_size >= 1 ) {
p_wn("2-node cluster's follower is not responding long time, "
"adjust quorum to 1");
ptr<raft_params> clone = cs_new<raft_params>(*params);
clone->custom_commit_quorum_size_ = 1;
clone->custom_election_quorum_size_ = 1;
ctx_->set_params(clone);

} else if ( num_not_responding_peers == 0 &&
cur_quorum_size == 0 ) {
// Recovered.
p_wn("2-node cluster's follower is responding now, "
"restore quorum with default value");
ptr<raft_params> clone = cs_new<raft_params>(*params);
clone->custom_commit_quorum_size_ = 0;
clone->custom_election_quorum_size_ = 0;
ctx_->set_params(clone);
}
}

bool need_to_reconnect = p->need_to_reconnect();
int32 last_active_time_ms = p->get_active_timer_us() / 1000;
if ( last_active_time_ms >
Expand Down
28 changes: 28 additions & 0 deletions src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,34 @@ void raft_server::request_prevote() {
}
}

int quorum_size = get_quorum_for_election();
if ( pre_vote_.live_ + pre_vote_.dead_ > 0 &&
pre_vote_.live_ + pre_vote_.dead_ < quorum_size + 1) {
// Pre-vote failed due to non-responding voters.
pre_vote_.failure_count_++;
p_wn("total %zu nodes (including this node) responded for pre-vote "
"(term %zu, live %zu, dead %zu), at least %zu nodes should "
"respond. failure count %zu",
pre_vote_.live_.load() + pre_vote_.dead_.load(),
pre_vote_.term_,
pre_vote_.live_.load(),
pre_vote_.dead_.load(),
quorum_size + 1,
pre_vote_.failure_count_.load());
}
int num_voting_members = get_num_voting_members();
if ( params->auto_adjust_quorum_for_small_cluster_ &&
num_voting_members == 2 &&
pre_vote_.failure_count_ > peer::VOTE_LIMIT ) {
// 2-node cluster's pre-vote failed due to offline node.
p_wn("2-node cluster's pre-vote is failing long time, "
"adjust quorum to 1");
ptr<raft_params> clone = cs_new<raft_params>(*params);
clone->custom_commit_quorum_size_ = 1;
clone->custom_election_quorum_size_ = 1;
ctx_->set_params(clone);
}

hb_alive_ = false;
leader_ = -1;
pre_vote_.reset(state_->get_term());
Expand Down
2 changes: 2 additions & 0 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ void raft_server::become_leader() {
next_leader_candidate_ = -1;
initialized_ = true;
pre_vote_.quorum_reject_count_ = 0;
pre_vote_.failure_count_ = 0;
data_fresh_ = true;

request_append_entries();
Expand Down Expand Up @@ -1035,6 +1036,7 @@ void raft_server::become_follower() {
initialized_ = true;
uncommitted_config_.reset();
pre_vote_.quorum_reject_count_ = 0;
pre_vote_.failure_count_ = 0;

// Drain all pending callback functions.
drop_all_pending_commit_elems();
Expand Down
216 changes: 216 additions & 0 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,216 @@ int async_append_handler_test() {
return 0;
}

int auto_quorum_size_test() {
reset_log_files();

std::string s1_addr = "tcp://127.0.0.1:20010";
std::string s2_addr = "tcp://127.0.0.1:20020";

RaftAsioPkg s1(1, s1_addr);
std::shared_ptr<RaftAsioPkg> s2 = std::make_shared<RaftAsioPkg>(2, s2_addr);
std::vector<RaftAsioPkg*> pkgs = {&s1, s2.get()};

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );
TestSuite::sleep_sec(1, "wait for Raft group ready");

// Set custom term counter, and enable auto quorum size mode.
auto custom_inc_term = [](uint64_t cur_term) -> uint64_t {
return (cur_term / 10) + 10;
};
s1.raftServer->set_inc_term_func(custom_inc_term);
s2->raftServer->set_inc_term_func(custom_inc_term);

raft_params params = s1.raftServer->get_current_params();
params.auto_adjust_quorum_for_small_cluster_ = true;
s1.raftServer->update_params(params);
s2->raftServer->update_params(params);

CHK_TRUE( s1.raftServer->is_leader() );
CHK_EQ(1, s1.raftServer->get_leader());
CHK_EQ(1, s2->raftServer->get_leader());

// Replication.
for (size_t ii=0; ii<10; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}
TestSuite::sleep_sec(1, "wait for replication");
uint64_t committed_idx = s1.raftServer->get_committed_log_idx();

// State machine should be identical.
CHK_OK( s2->getTestSm()->isSame( *s1.getTestSm() ) );

// Shutdown S2.
s2->raftServer->shutdown();
s2.reset();

TestSuite::sleep_ms( RaftAsioPkg::HEARTBEAT_MS * 30,
"wait for quorum adjust" );

// More replication.
for (size_t ii=10; ii<11; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}

// Replication should succeed: committed index should be moved forward.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 1,
s1.raftServer->get_committed_log_idx() );

// Restart S2.
_msg("launching S2 again\n");
RaftAsioPkg s2_new(2, s2_addr);
CHK_Z( launch_servers({&s2_new}, false) );
TestSuite::sleep_sec(1, "wait for S2 ready");
CHK_EQ( committed_idx + 1,
s2_new.raftServer->get_committed_log_idx() );

// More replication.
for (size_t ii=11; ii<12; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}

// Both of them should have the same commit number.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 2,
s1.raftServer->get_committed_log_idx() );
CHK_EQ( committed_idx + 2,
s2_new.raftServer->get_committed_log_idx() );

s1.raftServer->shutdown();
s2_new.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
return 0;
}

int auto_quorum_size_election_test() {
reset_log_files();

std::string s1_addr = "tcp://127.0.0.1:20010";
std::string s2_addr = "tcp://127.0.0.1:20020";

std::shared_ptr<RaftAsioPkg> s1 = std::make_shared<RaftAsioPkg>(1, s1_addr);
std::shared_ptr<RaftAsioPkg> s2 = std::make_shared<RaftAsioPkg>(2, s2_addr);
std::vector<RaftAsioPkg*> pkgs = {s1.get(), s2.get()};

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );
TestSuite::sleep_sec(1, "wait for Raft group ready");

// Set custom term counter, and enable auto quorum size mode.
auto custom_inc_term = [](uint64_t cur_term) -> uint64_t {
return (cur_term / 10) + 10;
};
s1->raftServer->set_inc_term_func(custom_inc_term);
s2->raftServer->set_inc_term_func(custom_inc_term);

raft_params params = s1->raftServer->get_current_params();
params.auto_adjust_quorum_for_small_cluster_ = true;
s1->raftServer->update_params(params);
s2->raftServer->update_params(params);

CHK_TRUE( s1->raftServer->is_leader() );
CHK_EQ(1, s1->raftServer->get_leader());
CHK_EQ(1, s2->raftServer->get_leader());

// Replication.
for (size_t ii=0; ii<10; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1->raftServer->append_entries( {msg} );
}
TestSuite::sleep_sec(1, "wait for replication");

// State machine should be identical.
CHK_OK( s2->getTestSm()->isSame( *s1->getTestSm() ) );

// Shutdown S1.
s1->raftServer->shutdown();
s1.reset();

// Wait for adjust quorum and self election.
TestSuite::sleep_ms( RaftAsioPkg::HEARTBEAT_MS * 50,
"wait for quorum adjust" );

// S2 should be a leader.
CHK_TRUE( s2->raftServer->is_leader() );
CHK_EQ(2, s2->raftServer->get_leader());
uint64_t committed_idx = s2->raftServer->get_committed_log_idx();

// More replication.
for (size_t ii=10; ii<11; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s2->raftServer->append_entries( {msg} );
}

// Replication should succeed: committed index should be moved forward.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 1,
s2->raftServer->get_committed_log_idx() );

// Restart S1.
_msg("launching S1 again\n");
RaftAsioPkg s1_new(1, s1_addr);
CHK_Z( launch_servers({&s1_new}, false) );
TestSuite::sleep_sec(1, "wait for S2 ready");
CHK_EQ( committed_idx + 1,
s1_new.raftServer->get_committed_log_idx() );

// S2 should remain as a leader.
CHK_TRUE( s2->raftServer->is_leader() );
CHK_EQ(2, s1_new.raftServer->get_leader());
CHK_EQ(2, s2->raftServer->get_leader());

// More replication.
for (size_t ii=11; ii<12; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s2->raftServer->append_entries( {msg} );
}

// Both of them should have the same commit number.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 2,
s1_new.raftServer->get_committed_log_idx() );
CHK_EQ( committed_idx + 2,
s2->raftServer->get_committed_log_idx() );

s2->raftServer->shutdown();
s1_new.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
return 0;
}

} // namespace asio_service_test;
using namespace asio_service_test;

Expand Down Expand Up @@ -788,6 +998,12 @@ int main(int argc, char** argv) {
ts.doTest( "async append handler test",
async_append_handler_test );

ts.doTest( "auto quorum size test",
auto_quorum_size_test );

ts.doTest( "auto quorum size for election test",
auto_quorum_size_election_test );

#ifdef ENABLE_RAFT_STATS
_msg("raft stats: ENABLED\n");
#else
Expand Down

0 comments on commit c8b077a

Please sign in to comment.