Skip to content

Commit

Permalink
Support parallel log appending as an experimental feature (#283)
Browse files Browse the repository at this point in the history
* If this flag is set, the leader can do log replication and log
appending in parallel, thus it can reduce the latency of write
operation path.
  • Loading branch information
greensky00 authored Feb 11, 2022
1 parent cce3af7 commit afd7397
Show file tree
Hide file tree
Showing 11 changed files with 432 additions and 9 deletions.
102 changes: 101 additions & 1 deletion examples/in_memory_log_store.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,26 @@ namespace nuraft {

inmem_log_store::inmem_log_store()
: start_idx_(1)
, raft_server_bwd_pointer_(nullptr)
, disk_emul_delay(0)
, disk_emul_thread_(nullptr)
, disk_emul_thread_stop_signal_(false)
, disk_emul_last_durable_index_(0)
{
// Dummy entry for index 0.
ptr<buffer> buf = buffer::alloc(sz_ulong);
logs_[0] = cs_new<log_entry>(0, buf);
}

inmem_log_store::~inmem_log_store() {}
inmem_log_store::~inmem_log_store() {
if (disk_emul_thread_) {
disk_emul_thread_stop_signal_ = true;
disk_emul_ea_.invoke();
if (disk_emul_thread_->joinable()) {
disk_emul_thread_->join();
}
}
}

ptr<log_entry> inmem_log_store::make_clone(const ptr<log_entry>& entry) {
ptr<log_entry> clone = cs_new<log_entry>
Expand Down Expand Up @@ -68,6 +81,13 @@ ulong inmem_log_store::append(ptr<log_entry>& entry) {
std::lock_guard<std::mutex> l(logs_lock_);
size_t idx = start_idx_ + logs_.size() - 1;
logs_[idx] = clone;

if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx;
disk_emul_ea_.invoke();
}

return idx;
}

Expand All @@ -81,6 +101,22 @@ void inmem_log_store::write_at(ulong index, ptr<log_entry>& entry) {
itr = logs_.erase(itr);
}
logs_[index] = clone;

if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index;

// Remove entries greater than `index`.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->second > index) {
entry = disk_emul_logs_being_written_.erase(entry);
} else {
entry++;
}
}
disk_emul_ea_.invoke();
}
}

ptr< std::vector< ptr<log_entry> > >
Expand Down Expand Up @@ -236,7 +272,71 @@ bool inmem_log_store::compact(ulong last_log_index) {
return true;
}

bool inmem_log_store::flush() {
disk_emul_last_durable_index_ = next_slot() - 1;
return true;
}

void inmem_log_store::close() {}

void inmem_log_store::set_disk_delay(raft_server* raft, size_t delay_ms) {
disk_emul_delay = delay_ms;
raft_server_bwd_pointer_ = raft;

if (!disk_emul_thread_) {
disk_emul_thread_ =
std::unique_ptr<std::thread>(
new std::thread(&inmem_log_store::disk_emul_loop, this) );
}
}

ulong inmem_log_store::last_durable_index() {
uint64_t last_log = next_slot() - 1;
if (!disk_emul_delay) {
return last_log;
}

return disk_emul_last_durable_index_;
}

void inmem_log_store::disk_emul_loop() {
// This thread mimics async disk writes.

size_t next_sleep_us = 100 * 1000;
while (!disk_emul_thread_stop_signal_) {
disk_emul_ea_.wait_us(next_sleep_us);
disk_emul_ea_.reset();
if (disk_emul_thread_stop_signal_) break;

uint64_t cur_time = timer_helper::get_timeofday_us();
next_sleep_us = 100 * 1000;

bool call_notification = false;
{
std::lock_guard<std::mutex> l(logs_lock_);
// Remove all timestamps equal to or smaller than `cur_time`,
// and pick the greatest one among them.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->first <= cur_time) {
disk_emul_last_durable_index_ = entry->second;
entry = disk_emul_logs_being_written_.erase(entry);
call_notification = true;
} else {
break;
}
}

entry = disk_emul_logs_being_written_.begin();
if (entry != disk_emul_logs_being_written_.end()) {
next_sleep_us = entry->first - cur_time;
}
}

if (call_notification) {
raft_server_bwd_pointer_->notify_log_append_completion(true);
}
}
}

}
65 changes: 64 additions & 1 deletion examples/in_memory_log_store.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

#pragma once

#include "event_awaiter.h"
#include "internal_timer.hxx"
#include "log_store.hxx"

#include <atomic>
Expand All @@ -25,6 +27,8 @@ limitations under the License.

namespace nuraft {

class raft_server;

class inmem_log_store : public log_store {
public:
inmem_log_store();
Expand All @@ -33,6 +37,7 @@ public:

__nocopy__(inmem_log_store);

public:
ulong next_slot() const;

ulong start_index() const;
Expand All @@ -58,16 +63,74 @@ public:

bool compact(ulong last_log_index);

bool flush() { return true; }
bool flush();

void close();

ulong last_durable_index();

void set_disk_delay(raft_server* raft, size_t delay_ms);

private:
static ptr<log_entry> make_clone(const ptr<log_entry>& entry);

void disk_emul_loop();

/**
* Map of <log index, log data>.
*/
std::map<ulong, ptr<log_entry>> logs_;

/**
* Lock for `logs_`.
*/
mutable std::mutex logs_lock_;

/**
* The index of the first log.
*/
std::atomic<ulong> start_idx_;

/**
* Backward pointer to Raft server.
*/
raft_server* raft_server_bwd_pointer_;

// Testing purpose --------------- BEGIN

/**
* If non-zero, this log store will emulate the disk write delay.
*/
std::atomic<size_t> disk_emul_delay;

/**
* Map of <timestamp, log index>, emulating logs that is being written to disk.
* Log index will be regarded as "durable" after the corresponding timestamp.
*/
std::map<uint64_t, uint64_t> disk_emul_logs_being_written_;

/**
* Thread that will update `last_durable_index_` and call
* `notify_log_append_completion` at proper time.
*/
std::unique_ptr<std::thread> disk_emul_thread_;

/**
* Flag to terminate the thread.
*/
std::atomic<bool> disk_emul_thread_stop_signal_;

/**
* Event awaiter that emulates disk delay.
*/
EventAwaiter disk_emul_ea_;

/**
* Last written log index.
*/
std::atomic<uint64_t> disk_emul_last_durable_index_;

// Testing purpose --------------- END
};

}
Expand Down
10 changes: 10 additions & 0 deletions include/libnuraft/internal_timer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ limitations under the License.
#include <mutex>
#include <thread>

#include <sys/time.h>

namespace nuraft {

struct timer_helper {
Expand Down Expand Up @@ -119,6 +121,14 @@ struct timer_helper {
return false;
}

static uint64_t get_timeofday_us() {
struct timeval tv;
gettimeofday(&tv, nullptr);
uint64_t ret = tv.tv_sec * 1000000UL;
ret += tv.tv_usec;
return ret;
}

std::chrono::time_point<std::chrono::system_clock> t_created_;
size_t duration_us_;
mutable bool first_event_fired_;
Expand Down
9 changes: 9 additions & 0 deletions include/libnuraft/log_store.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public:
* @return `true` on success.
*/
virtual bool flush() = 0;

/**
* (Experimental)
* This API is used only when `raft_params::parallel_log_appending_` flag is set.
* Please refer to the comment of the flag.
*
* @return The last durable log index.
*/
virtual ulong last_durable_index() { return next_slot() - 1; }
};

}
Expand Down
31 changes: 31 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,37 @@ public:
* request for a configured time (`response_limit_`).
*/
bool use_full_consensus_among_healthy_members_;

/**
* (Experimental)
* If `true`, users can let the leader append logs parallel with their
* replication. To implement parallel log appending, users need to make
* `log_store::append`, `log_store::write_at`, or
* `log_store::end_of_append_batch` API triggers asynchronous disk writes
* without blocking the thread. Even while the disk write is in progress,
* the other read APIs of log store should be able to read the log.
*
* The replication and the disk write will be executed in parallel,
* and users need to call `raft_server::notify_log_append_completion`
* when the asynchronous disk write is done. Also, users need to properly
* implement `log_store::last_durable_index` API to return the most recent
* durable log index. The leader will commit the log based on the
* result of this API.
*
* - If the disk write is done earlier than the replication,
* the commit behavior is the same as the original protocol.
*
* - If the replication is done earlier than the disk write,
* the leader will commit the log based on the quorum except
* for the leader itself. The leader can apply the log to
* the state machine even before completing the disk write
* of the log.
*
* Note that parallel log appending is available for the leader only,
* and followers will wait for `notify_log_append_completion` call
* before returning the response.
*/
bool parallel_log_appending_;
};

}
Expand Down
27 changes: 27 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,18 @@ public:
*/
bool is_state_machine_execution_paused() const;

/**
* (Experimental)
* This API is used when `raft_params::parallel_log_appending_` is set.
* Everytime an asynchronous log appending job is done, users should call
* this API to notify Raft server to handle the log.
* Note that calling this API once for multiple logs is acceptable
* and recommended.
*
* @param ok `true` if appending succeeded.
*/
void notify_log_append_completion(bool ok);

protected:
typedef std::unordered_map<int32, ptr<peer>>::const_iterator peer_itor;

Expand Down Expand Up @@ -914,6 +926,10 @@ protected:

void check_overall_status();

void request_append_entries_for_all();

uint64_t get_current_leader_index();

protected:
static const int default_snapshot_sync_block_size;

Expand Down Expand Up @@ -1393,6 +1409,17 @@ protected:
* The term when `vote_init_timer_` was reset.
*/
std::atomic<ulong> vote_init_timer_term_;

/**
* (Experimental)
* Used when `raft_params::parallel_log_appending_` is set.
* Follower will wait for the asynchronous log appending using
* this event awaiter.
*
* WARNING: We are assuming that only one thraed is using this
* awaiter at a time, by the help of `lock_`.
*/
EventAwaiter* ea_follower_log_append_;
};

} // namespace nuraft;
Expand Down
Loading

0 comments on commit afd7397

Please sign in to comment.