Skip to content

Commit

Permalink
Skip processing srv_to_leave_ on replaying old configs (#110)
Browse files Browse the repository at this point in the history
* If `srv_to_leave_` is NULL on removing server in reconfigure, that
means Raft is replaying old log, and there is no current request by
user. In such case, we can skip the logic handling `srv_to_leave_`.
  • Loading branch information
greensky00 authored Mar 13, 2020
1 parent 972f02d commit 3ffe39c
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 6 deletions.
8 changes: 6 additions & 2 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,13 @@ public:
bool need_to_reconnect() {
if (abandoned_) return false;

if (reconn_scheduled_ && reconn_timer_.timeout()) return true;
if (reconn_scheduled_ && reconn_timer_.timeout()) {
return true;
}
{ std::lock_guard<std::mutex> l(rpc_protector_);
if (!rpc_.get()) return true;
if (!rpc_.get()) {
return true;
}
}
return false;
}
Expand Down
15 changes: 13 additions & 2 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,23 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
// of the last config is delivered to S3.
// Also we will have timeout for it. If we fail to deliver the
// commit index, S3 will be just force removed.
const ptr<peer>& pp = pit->second;

if (role_ == srv_role::leader) {
if (role_ == srv_role::leader && srv_to_leave_) {
// If leader, keep the to-be-removed server in peer list
// until 1) catch-up is done, or 2) timeout.

// However, if `srv_to_leave_` is NULL,
// it is replaying old config. We can remove it
// immediately without setting `srv_to_leave_`.

} else {
remove_peer_from_peers(pit->second);
if (!srv_to_leave_) {
p_in("srv_to_leave_ is currently empty "
"on config for removing %d",
pp->get_id());
}
remove_peer_from_peers(pp);
sprintf(temp_buf, "remove peer %d\n", srv_removed);
str_buf += temp_buf;
}
Expand Down
2 changes: 1 addition & 1 deletion src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ void raft_server::rm_srv_from_cluster(int32 srv_id) {

void raft_server::handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p) {
if (t_msg == msg_type::leave_cluster_request) {
p_in( "rpc failed again for the removing server (%d), "
p_in( "rpc failed for removing server (%d), "
"will remove this server directly",
p->get_id() );

Expand Down
50 changes: 49 additions & 1 deletion tests/unit/failure_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,51 @@ int removed_server_late_step_down_test() {
return 0;
}

int remove_server_on_pending_configs_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Make some dummy configs by setting user ctx.
s1.raftServer->set_user_ctx("a");
s1.raftServer->set_user_ctx("aa");

// Without commit & replication of above configs,
// remove S2.
s1.raftServer->remove_srv(2);

// Make failure.
s1.fNet->makeReqFailAll(s2_addr);
s1.fTimer->invoke( timer_task_type::heartbeat_timer );
s1.fNet->makeReqFailAll(s2_addr);

// Wait for bg commit for configuration change.
TestSuite::sleep_ms(COMMIT_TIME_MS);

// Adding server should succeed without error about duplicate ID.
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->add_srv( *s2.getTestMgr()->get_srv_config() );
CHK_Z( ret->get_result_code() );

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();

f_base->destroy();

return 0;
}

} // namespace failure_test;
using namespace failure_test;

Expand All @@ -559,7 +604,7 @@ int main(int argc, char** argv) {

ts.doTest( "remove not responding server with quorum test",
rmv_not_resp_srv_wq_test,
TestRange<bool>({false, true}));
TestRange<bool>({false, true}) );

ts.doTest( "force log compaction test",
force_log_compaction_test );
Expand All @@ -570,6 +615,9 @@ int main(int argc, char** argv) {
ts.doTest( "removed server late step down test",
removed_server_late_step_down_test );

ts.doTest( "remove server on pending configs test",
remove_server_on_pending_configs_test );

return 0;
}

5 changes: 5 additions & 0 deletions tests/unit/raft_package_fake.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public:
_s_info(ll) << msg;
}

void localLog(const std::string& msg) {
SimpleLogger* ll = myLogWrapper->getLogger();
_s_info(ll) << msg;
}

int myId;
std::string myEndpoint;
ptr<FakeNetworkBase> fBase;
Expand Down

0 comments on commit 3ffe39c

Please sign in to comment.