From afd73979e190192e87cbfeb53f26cc6ab3c6118c Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Fri, 11 Feb 2022 14:58:39 -0800 Subject: [PATCH] Support parallel log appending as an experimental feature (#283) * If this flag is set, the leader can do log replication and log appending in parallel, thus it can reduce the latency of write operation path. --- examples/in_memory_log_store.cxx | 102 +++++++++++++++++++++++++- examples/in_memory_log_store.hxx | 65 +++++++++++++++- include/libnuraft/internal_timer.hxx | 10 +++ include/libnuraft/log_store.hxx | 9 +++ include/libnuraft/raft_params.hxx | 31 ++++++++ include/libnuraft/raft_server.hxx | 27 +++++++ src/handle_append_entries.cxx | 84 ++++++++++++++++++++- src/handle_client_request.cxx | 8 +- src/raft_server.cxx | 9 ++- tests/unit/asio_service_test.cxx | 92 +++++++++++++++++++++++ tests/unit/raft_functional_common.hxx | 4 + 11 files changed, 432 insertions(+), 9 deletions(-) diff --git a/examples/in_memory_log_store.cxx b/examples/in_memory_log_store.cxx index 50e7d168..8f963bc1 100644 --- a/examples/in_memory_log_store.cxx +++ b/examples/in_memory_log_store.cxx @@ -25,13 +25,26 @@ namespace nuraft { inmem_log_store::inmem_log_store() : start_idx_(1) + , raft_server_bwd_pointer_(nullptr) + , disk_emul_delay(0) + , disk_emul_thread_(nullptr) + , disk_emul_thread_stop_signal_(false) + , disk_emul_last_durable_index_(0) { // Dummy entry for index 0. ptr buf = buffer::alloc(sz_ulong); logs_[0] = cs_new(0, buf); } -inmem_log_store::~inmem_log_store() {} +inmem_log_store::~inmem_log_store() { + if (disk_emul_thread_) { + disk_emul_thread_stop_signal_ = true; + disk_emul_ea_.invoke(); + if (disk_emul_thread_->joinable()) { + disk_emul_thread_->join(); + } + } +} ptr inmem_log_store::make_clone(const ptr& entry) { ptr clone = cs_new @@ -68,6 +81,13 @@ ulong inmem_log_store::append(ptr& entry) { std::lock_guard l(logs_lock_); size_t idx = start_idx_ + logs_.size() - 1; logs_[idx] = clone; + + if (disk_emul_delay) { + uint64_t cur_time = timer_helper::get_timeofday_us(); + disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx; + disk_emul_ea_.invoke(); + } + return idx; } @@ -81,6 +101,22 @@ void inmem_log_store::write_at(ulong index, ptr& entry) { itr = logs_.erase(itr); } logs_[index] = clone; + + if (disk_emul_delay) { + uint64_t cur_time = timer_helper::get_timeofday_us(); + disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index; + + // Remove entries greater than `index`. + auto entry = disk_emul_logs_being_written_.begin(); + while (entry != disk_emul_logs_being_written_.end()) { + if (entry->second > index) { + entry = disk_emul_logs_being_written_.erase(entry); + } else { + entry++; + } + } + disk_emul_ea_.invoke(); + } } ptr< std::vector< ptr > > @@ -236,7 +272,71 @@ bool inmem_log_store::compact(ulong last_log_index) { return true; } +bool inmem_log_store::flush() { + disk_emul_last_durable_index_ = next_slot() - 1; + return true; +} + void inmem_log_store::close() {} +void inmem_log_store::set_disk_delay(raft_server* raft, size_t delay_ms) { + disk_emul_delay = delay_ms; + raft_server_bwd_pointer_ = raft; + + if (!disk_emul_thread_) { + disk_emul_thread_ = + std::unique_ptr( + new std::thread(&inmem_log_store::disk_emul_loop, this) ); + } } +ulong inmem_log_store::last_durable_index() { + uint64_t last_log = next_slot() - 1; + if (!disk_emul_delay) { + return last_log; + } + + return disk_emul_last_durable_index_; +} + +void inmem_log_store::disk_emul_loop() { + // This thread mimics async disk writes. + + size_t next_sleep_us = 100 * 1000; + while (!disk_emul_thread_stop_signal_) { + disk_emul_ea_.wait_us(next_sleep_us); + disk_emul_ea_.reset(); + if (disk_emul_thread_stop_signal_) break; + + uint64_t cur_time = timer_helper::get_timeofday_us(); + next_sleep_us = 100 * 1000; + + bool call_notification = false; + { + std::lock_guard l(logs_lock_); + // Remove all timestamps equal to or smaller than `cur_time`, + // and pick the greatest one among them. + auto entry = disk_emul_logs_being_written_.begin(); + while (entry != disk_emul_logs_being_written_.end()) { + if (entry->first <= cur_time) { + disk_emul_last_durable_index_ = entry->second; + entry = disk_emul_logs_being_written_.erase(entry); + call_notification = true; + } else { + break; + } + } + + entry = disk_emul_logs_being_written_.begin(); + if (entry != disk_emul_logs_being_written_.end()) { + next_sleep_us = entry->first - cur_time; + } + } + + if (call_notification) { + raft_server_bwd_pointer_->notify_log_append_completion(true); + } + } +} + +} diff --git a/examples/in_memory_log_store.hxx b/examples/in_memory_log_store.hxx index 20fd1120..2cd6ebe9 100644 --- a/examples/in_memory_log_store.hxx +++ b/examples/in_memory_log_store.hxx @@ -17,6 +17,8 @@ limitations under the License. #pragma once +#include "event_awaiter.h" +#include "internal_timer.hxx" #include "log_store.hxx" #include @@ -25,6 +27,8 @@ limitations under the License. namespace nuraft { +class raft_server; + class inmem_log_store : public log_store { public: inmem_log_store(); @@ -33,6 +37,7 @@ public: __nocopy__(inmem_log_store); +public: ulong next_slot() const; ulong start_index() const; @@ -58,16 +63,74 @@ public: bool compact(ulong last_log_index); - bool flush() { return true; } + bool flush(); void close(); + ulong last_durable_index(); + + void set_disk_delay(raft_server* raft, size_t delay_ms); + private: static ptr make_clone(const ptr& entry); + void disk_emul_loop(); + + /** + * Map of . + */ std::map> logs_; + + /** + * Lock for `logs_`. + */ mutable std::mutex logs_lock_; + + /** + * The index of the first log. + */ std::atomic start_idx_; + + /** + * Backward pointer to Raft server. + */ + raft_server* raft_server_bwd_pointer_; + + // Testing purpose --------------- BEGIN + + /** + * If non-zero, this log store will emulate the disk write delay. + */ + std::atomic disk_emul_delay; + + /** + * Map of , emulating logs that is being written to disk. + * Log index will be regarded as "durable" after the corresponding timestamp. + */ + std::map disk_emul_logs_being_written_; + + /** + * Thread that will update `last_durable_index_` and call + * `notify_log_append_completion` at proper time. + */ + std::unique_ptr disk_emul_thread_; + + /** + * Flag to terminate the thread. + */ + std::atomic disk_emul_thread_stop_signal_; + + /** + * Event awaiter that emulates disk delay. + */ + EventAwaiter disk_emul_ea_; + + /** + * Last written log index. + */ + std::atomic disk_emul_last_durable_index_; + + // Testing purpose --------------- END }; } diff --git a/include/libnuraft/internal_timer.hxx b/include/libnuraft/internal_timer.hxx index ee5956cd..115bd751 100644 --- a/include/libnuraft/internal_timer.hxx +++ b/include/libnuraft/internal_timer.hxx @@ -21,6 +21,8 @@ limitations under the License. #include #include +#include + namespace nuraft { struct timer_helper { @@ -119,6 +121,14 @@ struct timer_helper { return false; } + static uint64_t get_timeofday_us() { + struct timeval tv; + gettimeofday(&tv, nullptr); + uint64_t ret = tv.tv_sec * 1000000UL; + ret += tv.tv_usec; + return ret; + } + std::chrono::time_point t_created_; size_t duration_us_; mutable bool first_event_fired_; diff --git a/include/libnuraft/log_store.hxx b/include/libnuraft/log_store.hxx index e91b6443..429298a1 100644 --- a/include/libnuraft/log_store.hxx +++ b/include/libnuraft/log_store.hxx @@ -173,6 +173,15 @@ public: * @return `true` on success. */ virtual bool flush() = 0; + + /** + * (Experimental) + * This API is used only when `raft_params::parallel_log_appending_` flag is set. + * Please refer to the comment of the flag. + * + * @return The last durable log index. + */ + virtual ulong last_durable_index() { return next_slot() - 1; } }; } diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index 0934385d..ea9cb8b1 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -550,6 +550,37 @@ public: * request for a configured time (`response_limit_`). */ bool use_full_consensus_among_healthy_members_; + + /** + * (Experimental) + * If `true`, users can let the leader append logs parallel with their + * replication. To implement parallel log appending, users need to make + * `log_store::append`, `log_store::write_at`, or + * `log_store::end_of_append_batch` API triggers asynchronous disk writes + * without blocking the thread. Even while the disk write is in progress, + * the other read APIs of log store should be able to read the log. + * + * The replication and the disk write will be executed in parallel, + * and users need to call `raft_server::notify_log_append_completion` + * when the asynchronous disk write is done. Also, users need to properly + * implement `log_store::last_durable_index` API to return the most recent + * durable log index. The leader will commit the log based on the + * result of this API. + * + * - If the disk write is done earlier than the replication, + * the commit behavior is the same as the original protocol. + * + * - If the replication is done earlier than the disk write, + * the leader will commit the log based on the quorum except + * for the leader itself. The leader can apply the log to + * the state machine even before completing the disk write + * of the log. + * + * Note that parallel log appending is available for the leader only, + * and followers will wait for `notify_log_append_completion` call + * before returning the response. + */ + bool parallel_log_appending_; }; } diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 70d1a040..77b21f58 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -717,6 +717,18 @@ public: */ bool is_state_machine_execution_paused() const; + /** + * (Experimental) + * This API is used when `raft_params::parallel_log_appending_` is set. + * Everytime an asynchronous log appending job is done, users should call + * this API to notify Raft server to handle the log. + * Note that calling this API once for multiple logs is acceptable + * and recommended. + * + * @param ok `true` if appending succeeded. + */ + void notify_log_append_completion(bool ok); + protected: typedef std::unordered_map>::const_iterator peer_itor; @@ -914,6 +926,10 @@ protected: void check_overall_status(); + void request_append_entries_for_all(); + + uint64_t get_current_leader_index(); + protected: static const int default_snapshot_sync_block_size; @@ -1393,6 +1409,17 @@ protected: * The term when `vote_init_timer_` was reset. */ std::atomic vote_init_timer_term_; + + /** + * (Experimental) + * Used when `raft_params::parallel_log_appending_` is set. + * Follower will wait for the asynchronous log appending using + * this event awaiter. + * + * WARNING: We are assuming that only one thraed is using this + * awaiter at a time, by the help of `lock_`. + */ + EventAwaiter* ea_follower_log_append_; }; } // namespace nuraft; diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 2d5498f4..e9e7af48 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -18,6 +18,8 @@ See the License for the specific language governing permissions and limitations under the License. **************************************************************************/ +#include "pp_util.hxx" +#include "raft_params.hxx" #include "raft_server.hxx" #include "cluster_config.hxx" @@ -71,7 +73,8 @@ void raft_server::request_append_entries() { // We should call it here. if ( peers_.size() == 0 || get_quorum_for_commit() == 0 ) { - commit(precommit_index_.load()); + uint64_t leader_index = get_current_leader_index(); + commit(leader_index); return; } @@ -737,6 +740,24 @@ ptr raft_server::handle_append_entries(req_msg& req) // End of batch. log_store_->end_of_append_batch( req.get_last_log_idx() + 1, req.log_entries().size() ); + + ptr params = ctx_->get_params(); + if (params->parallel_log_appending_) { + uint64_t last_durable_index = log_store_->last_durable_index(); + while ( last_durable_index < + req.get_last_log_idx() + req.log_entries().size() ) { + // Some logs are not durable yet, wait here and block the thread. + p_tr( "durable idnex %lu, sleep and wait for log appending completion", + last_durable_index ); + ea_follower_log_append_->wait_ms(params->heart_beat_interval_); + + // --- `notify_log_append_completion` API will wake it up. --- + + ea_follower_log_append_->reset(); + last_durable_index = log_store_->last_durable_index(); + p_tr( "wake up, durable index %lu", last_durable_index ); + } + } } leader_ = req.get_src(); @@ -1009,15 +1030,29 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) { } } +uint64_t raft_server::get_current_leader_index() { + uint64_t leader_index = precommit_index_; + ptr params = ctx_->get_params(); + if (params->parallel_log_appending_) { + // For parallel appending, take the smaller one. + uint64_t durable_index = log_store_->last_durable_index(); + p_tr("last durable index %lu, precommit index %lu", + durable_index, precommit_index_.load()); + leader_index = std::min(precommit_index_.load(), durable_index); + } + return leader_index; +} + ulong raft_server::get_expected_committed_log_idx() { std::vector matched_indexes; state_machine::adjust_commit_index_params aci_params; matched_indexes.reserve(16); aci_params.peer_index_map_.reserve(16); - // Leader itself. - matched_indexes.push_back( precommit_index_ ); - aci_params.peer_index_map_[id_] = precommit_index_; + // Put the index of leader itself. + uint64_t leader_index = get_current_leader_index(); + matched_indexes.push_back( leader_index ); + aci_params.peer_index_map_[id_] = leader_index; for (auto& entry: peers_) { ptr& p = entry.second; @@ -1074,5 +1109,46 @@ ulong raft_server::get_expected_committed_log_idx() { return adjusted_commit_index; } +void raft_server::notify_log_append_completion(bool ok) { + p_tr("got log append completion notification: %s", ok ? "OK" : "FAILED"); + + if (role_ == srv_role::leader) { + recur_lock(lock_); + if (!ok) { + // If log appending fails, leader should resign immediately. + p_er("log appending failed, resign immediately"); + leader_ = -1; + become_follower(); + + // Clear this flag to avoid pre-vote rejection. + hb_alive_ = false; + return; + } + + // Leader: commit the log and send append_entries request, if needed. + uint64_t prev_committed_index = quick_commit_index_.load(); + uint64_t committed_index = get_expected_committed_log_idx(); + commit( committed_index ); + + if (quick_commit_index_ > prev_committed_index) { + // Commit index has been changed as a result of log appending. + // Send replication messages. + request_append_entries_for_all(); + } + } else { + if (!ok) { + // If log appending fails for follower, there is no way to proceed it. + // We should stop the server immediately. + recur_lock(lock_); + p_ft("log appending failed, stop this server"); + ctx_->state_mgr_->system_exit(N21_log_flush_failed); + return; + } + + // Follower: wake up the waiting thread. + ea_follower_log_append_->invoke(); + } +} + } // namespace nuraft; diff --git a/src/handle_client_request.cxx b/src/handle_client_request.cxx index e5bd9fc3..cf2cd01e 100644 --- a/src/handle_client_request.cxx +++ b/src/handle_client_request.cxx @@ -55,6 +55,13 @@ ptr raft_server::handle_cli_req_prelock(req_msg& req, } // Urgent commit, so that the commit will not depend on hb. + request_append_entries_for_all(); + + return resp; +} + +void raft_server::request_append_entries_for_all() { + ptr params = ctx_->get_params(); if (params->use_bg_thread_for_urgent_commit_) { // Let background generate request (some delay may happen). nuraft_global_mgr* mgr = nuraft_global_mgr::get_instance(); @@ -70,7 +77,6 @@ ptr raft_server::handle_cli_req_prelock(req_msg& req, recur_lock(lock_); request_append_entries(); } - return resp; } ptr raft_server::handle_cli_req(req_msg& req, diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 4bbda1fc..c84f6a58 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -23,6 +23,7 @@ limitations under the License. #include "cluster_config.hxx" #include "context.hxx" #include "error_code.hxx" +#include "event_awaiter.h" #include "global_mgr.hxx" #include "handle_client_request.hxx" #include "handle_custom_notification.hxx" @@ -102,6 +103,7 @@ raft_server::raft_server(context* ctx, const init_options& opt) std::placeholders::_1, std::placeholders::_2 ) ) , last_snapshot_(ctx->state_machine_->last_snapshot()) + , ea_follower_log_append_(new EventAwaiter()) { char temp_buf[4096]; std::string print_msg; @@ -309,6 +311,7 @@ raft_server::~raft_server() { ready_to_stop_cv_.wait_for(lock, std::chrono::milliseconds(10)); cancel_schedulers(); delete bg_append_ea_; + delete ea_follower_log_append_; } void raft_server::update_rand_timeout() { @@ -358,7 +361,8 @@ void raft_server::apply_and_log_current_params() { "snapshot receiver %s, " "leadership transfer wait time %d, " "grace period of lagging state machine %d, " - "snapshot IO: %s", + "snapshot IO: %s, " + "parallel log appending: %s", params->election_timeout_lower_bound_, params->election_timeout_upper_bound_, params->heart_beat_interval_, @@ -377,7 +381,8 @@ void raft_server::apply_and_log_current_params() { params->exclude_snp_receiver_from_quorum_ ? "EXCLUDED" : "INCLUDED", params->leadership_transfer_min_wait_time_, params->grace_period_of_lagging_state_machine_, - params->use_bg_thread_for_snapshot_io_ ? "ASYNC" : "BLOCKING" ); + params->use_bg_thread_for_snapshot_io_ ? "ASYNC" : "BLOCKING", + params->parallel_log_appending_ ? "ON" : "OFF" ); status_check_timer_.set_duration_ms(params->heart_beat_interval_); status_check_timer_.reset(); diff --git a/tests/unit/asio_service_test.cxx b/tests/unit/asio_service_test.cxx index d3d878ef..7975e529 100644 --- a/tests/unit/asio_service_test.cxx +++ b/tests/unit/asio_service_test.cxx @@ -2357,6 +2357,95 @@ int custom_commit_condition_test() { return 0; } +int parallel_log_append_test() { + reset_log_files(); + + std::string s1_addr = "tcp://127.0.0.1:20010"; + std::string s2_addr = "tcp://127.0.0.1:20020"; + std::string s3_addr = "tcp://127.0.0.1:20030"; + + RaftAsioPkg s1(1, s1_addr); + RaftAsioPkg s2(2, s2_addr); + RaftAsioPkg s3(3, s3_addr); + std::vector pkgs = {&s1, &s2, &s3}; + + _msg("launching asio-raft servers\n"); + CHK_Z( launch_servers(pkgs, false) ); + + _msg("organizing raft group\n"); + CHK_Z( make_group(pkgs) ); + + // Set disk delay (2s for S1, 10ms for S2 and S3). + s1.getTestMgr()->set_disk_delay(s1.raftServer.get(), 2000); + s2.getTestMgr()->set_disk_delay(s2.raftServer.get(), 10); + s3.getTestMgr()->set_disk_delay(s3.raftServer.get(), 10); + + // Set async mode. + for (auto& entry: pkgs) { + RaftAsioPkg* pp = entry; + raft_params param = pp->raftServer->get_current_params(); + param.return_method_ = raft_params::async_handler; + param.parallel_log_appending_ = true; + pp->raftServer->update_params(param); + } + + // Append messages asynchronously. + const size_t NUM = 10; + std::list< ptr< cmd_result< ptr > > > handlers; + std::list idx_list; + std::mutex idx_list_lock; + auto do_async_append = [&]() { + handlers.clear(); + idx_list.clear(); + for (size_t ii=0; ii msg = buffer::alloc(test_msg.size() + 1); + msg->put(test_msg); + ptr< cmd_result< ptr > > ret = + s1.raftServer->append_entries( {msg} ); + + cmd_result< ptr >::handler_type my_handler = + std::bind( async_handler, + &idx_list, + &idx_list_lock, + std::placeholders::_1, + std::placeholders::_2 ); + ret->when_ready( my_handler ); + + handlers.push_back(ret); + } + }; + do_async_append(); + + TestSuite::sleep_sec(1, "wait for replication"); + + // Still durable index is smaller than the last index. + CHK_SM( s1.getTestMgr()->load_log_store()->last_durable_index(), + s1.getTestMgr()->load_log_store()->next_slot() - 1 ); + + // All servers should have the same log index. + CHK_EQ( s1.getTestMgr()->load_log_store()->next_slot() - 1, + s2.getTestMgr()->load_log_store()->next_slot() - 1 ); + CHK_EQ( s1.getTestMgr()->load_log_store()->next_slot() - 1, + s3.getTestMgr()->load_log_store()->next_slot() - 1 ); + + // Even with disk delay, logs should have been committed by S2 and S3. + CHK_EQ( s1.getTestMgr()->load_log_store()->next_slot() - 1, + s1.raftServer->get_committed_log_idx() ); + + TestSuite::sleep_ms(1500, "wait for disk delay"); + CHK_EQ( s1.getTestMgr()->load_log_store()->last_durable_index(), + s1.getTestMgr()->load_log_store()->next_slot() - 1 ); + + s1.raftServer->shutdown(); + s2.raftServer->shutdown(); + s3.raftServer->shutdown(); + TestSuite::sleep_sec(1, "shutting down"); + + SimpleLogger::shutdown(); + return 0; +} + } // namespace asio_service_test; using namespace asio_service_test; @@ -2456,6 +2545,9 @@ int main(int argc, char** argv) { ts.doTest( "custom commit condition test", custom_commit_condition_test ); + ts.doTest( "parallel log append test", + parallel_log_append_test ); + #ifdef ENABLE_RAFT_STATS _msg("raft stats: ENABLED\n"); #else diff --git a/tests/unit/raft_functional_common.hxx b/tests/unit/raft_functional_common.hxx index 50e55ce3..b0749301 100644 --- a/tests/unit/raft_functional_common.hxx +++ b/tests/unit/raft_functional_common.hxx @@ -444,6 +444,10 @@ public: ptr get_srv_config() const { return mySrvConfig; } + void set_disk_delay(raft_server* raft, size_t delay_ms) { + curLogStore->set_disk_delay(raft, delay_ms); + } + private: int myId; std::string myEndpoint;