Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parallel log appending as an experimental feature #283

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion examples/in_memory_log_store.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,26 @@ namespace nuraft {

inmem_log_store::inmem_log_store()
: start_idx_(1)
, raft_server_bwd_pointer_(nullptr)
, disk_emul_delay(0)
, disk_emul_thread_(nullptr)
, disk_emul_thread_stop_signal_(false)
, disk_emul_last_durable_index_(0)
{
// Dummy entry for index 0.
ptr<buffer> buf = buffer::alloc(sz_ulong);
logs_[0] = cs_new<log_entry>(0, buf);
}

inmem_log_store::~inmem_log_store() {}
inmem_log_store::~inmem_log_store() {
if (disk_emul_thread_) {
disk_emul_thread_stop_signal_ = true;
disk_emul_ea_.invoke();
if (disk_emul_thread_->joinable()) {
disk_emul_thread_->join();
}
}
}

ptr<log_entry> inmem_log_store::make_clone(const ptr<log_entry>& entry) {
ptr<log_entry> clone = cs_new<log_entry>
Expand Down Expand Up @@ -68,6 +81,13 @@ ulong inmem_log_store::append(ptr<log_entry>& entry) {
std::lock_guard<std::mutex> l(logs_lock_);
size_t idx = start_idx_ + logs_.size() - 1;
logs_[idx] = clone;

if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx;
disk_emul_ea_.invoke();
}

return idx;
}

Expand All @@ -81,6 +101,22 @@ void inmem_log_store::write_at(ulong index, ptr<log_entry>& entry) {
itr = logs_.erase(itr);
}
logs_[index] = clone;

if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index;

// Remove entries greater than `index`.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->second > index) {
entry = disk_emul_logs_being_written_.erase(entry);
} else {
entry++;
}
}
disk_emul_ea_.invoke();
}
}

ptr< std::vector< ptr<log_entry> > >
Expand Down Expand Up @@ -236,7 +272,71 @@ bool inmem_log_store::compact(ulong last_log_index) {
return true;
}

bool inmem_log_store::flush() {
disk_emul_last_durable_index_ = next_slot() - 1;
return true;
}

void inmem_log_store::close() {}

void inmem_log_store::set_disk_delay(raft_server* raft, size_t delay_ms) {
disk_emul_delay = delay_ms;
raft_server_bwd_pointer_ = raft;

if (!disk_emul_thread_) {
disk_emul_thread_ =
std::unique_ptr<std::thread>(
new std::thread(&inmem_log_store::disk_emul_loop, this) );
}
}

ulong inmem_log_store::last_durable_index() {
uint64_t last_log = next_slot() - 1;
if (!disk_emul_delay) {
return last_log;
}

return disk_emul_last_durable_index_;
}

void inmem_log_store::disk_emul_loop() {
// This thread mimics async disk writes.

size_t next_sleep_us = 100 * 1000;
while (!disk_emul_thread_stop_signal_) {
disk_emul_ea_.wait_us(next_sleep_us);
disk_emul_ea_.reset();
if (disk_emul_thread_stop_signal_) break;

uint64_t cur_time = timer_helper::get_timeofday_us();
next_sleep_us = 100 * 1000;

bool call_notification = false;
{
std::lock_guard<std::mutex> l(logs_lock_);
// Remove all timestamps equal to or smaller than `cur_time`,
// and pick the greatest one among them.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->first <= cur_time) {
disk_emul_last_durable_index_ = entry->second;
entry = disk_emul_logs_being_written_.erase(entry);
call_notification = true;
} else {
break;
}
}

entry = disk_emul_logs_being_written_.begin();
if (entry != disk_emul_logs_being_written_.end()) {
next_sleep_us = entry->first - cur_time;
}
}

if (call_notification) {
raft_server_bwd_pointer_->notify_log_append_completion(true);
}
}
}

}
65 changes: 64 additions & 1 deletion examples/in_memory_log_store.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

#pragma once

#include "event_awaiter.h"
#include "internal_timer.hxx"
#include "log_store.hxx"

#include <atomic>
Expand All @@ -25,6 +27,8 @@ limitations under the License.

namespace nuraft {

class raft_server;

class inmem_log_store : public log_store {
public:
inmem_log_store();
Expand All @@ -33,6 +37,7 @@ public:

__nocopy__(inmem_log_store);

public:
ulong next_slot() const;

ulong start_index() const;
Expand All @@ -58,16 +63,74 @@ public:

bool compact(ulong last_log_index);

bool flush() { return true; }
bool flush();

void close();

ulong last_durable_index();

void set_disk_delay(raft_server* raft, size_t delay_ms);

private:
static ptr<log_entry> make_clone(const ptr<log_entry>& entry);

void disk_emul_loop();

/**
* Map of <log index, log data>.
*/
std::map<ulong, ptr<log_entry>> logs_;

/**
* Lock for `logs_`.
*/
mutable std::mutex logs_lock_;

/**
* The index of the first log.
*/
std::atomic<ulong> start_idx_;

/**
* Backward pointer to Raft server.
*/
raft_server* raft_server_bwd_pointer_;

// Testing purpose --------------- BEGIN

/**
* If non-zero, this log store will emulate the disk write delay.
*/
std::atomic<size_t> disk_emul_delay;

/**
* Map of <timestamp, log index>, emulating logs that is being written to disk.
* Log index will be regarded as "durable" after the corresponding timestamp.
*/
std::map<uint64_t, uint64_t> disk_emul_logs_being_written_;

/**
* Thread that will update `last_durable_index_` and call
* `notify_log_append_completion` at proper time.
*/
std::unique_ptr<std::thread> disk_emul_thread_;

/**
* Flag to terminate the thread.
*/
std::atomic<bool> disk_emul_thread_stop_signal_;

/**
* Event awaiter that emulates disk delay.
*/
EventAwaiter disk_emul_ea_;

/**
* Last written log index.
*/
std::atomic<uint64_t> disk_emul_last_durable_index_;

// Testing purpose --------------- END
};

}
Expand Down
10 changes: 10 additions & 0 deletions include/libnuraft/internal_timer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ limitations under the License.
#include <mutex>
#include <thread>

#include <sys/time.h>

namespace nuraft {

struct timer_helper {
Expand Down Expand Up @@ -119,6 +121,14 @@ struct timer_helper {
return false;
}

static uint64_t get_timeofday_us() {
struct timeval tv;
gettimeofday(&tv, nullptr);
uint64_t ret = tv.tv_sec * 1000000UL;
ret += tv.tv_usec;
return ret;
}

std::chrono::time_point<std::chrono::system_clock> t_created_;
size_t duration_us_;
mutable bool first_event_fired_;
Expand Down
9 changes: 9 additions & 0 deletions include/libnuraft/log_store.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public:
* @return `true` on success.
*/
virtual bool flush() = 0;

/**
* (Experimental)
* This API is used only when `raft_params::parallel_log_appending_` flag is set.
* Please refer to the comment of the flag.
*
* @return The last durable log index.
*/
virtual ulong last_durable_index() { return next_slot() - 1; }
};

}
Expand Down
31 changes: 31 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,37 @@ public:
* request for a configured time (`response_limit_`).
*/
bool use_full_consensus_among_healthy_members_;

/**
* (Experimental)
* If `true`, users can let the leader append logs parallel with their
* replication. To implement parallel log appending, users need to make
* `log_store::append`, `log_store::write_at`, or
* `log_store::end_of_append_batch` API triggers asynchronous disk writes
* without blocking the thread. Even while the disk write is in progress,
* the other read APIs of log store should be able to read the log.
*
* The replication and the disk write will be executed in parallel,
* and users need to call `raft_server::notify_log_append_completion`
* when the asynchronous disk write is done. Also, users need to properly
* implement `log_store::last_durable_index` API to return the most recent
* durable log index. The leader will commit the log based on the
* result of this API.
*
* - If the disk write is done earlier than the replication,
* the commit behavior is the same as the original protocol.
*
* - If the replication is done earlier than the disk write,
* the leader will commit the log based on the quorum except
* for the leader itself. The leader can apply the log to
* the state machine even before completing the disk write
* of the log.
*
* Note that parallel log appending is available for the leader only,
* and followers will wait for `notify_log_append_completion` call
* before returning the response.
*/
bool parallel_log_appending_;
};

}
Expand Down
27 changes: 27 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,18 @@ public:
*/
bool is_state_machine_execution_paused() const;

/**
* (Experimental)
* This API is used when `raft_params::parallel_log_appending_` is set.
* Everytime an asynchronous log appending job is done, users should call
* this API to notify Raft server to handle the log.
* Note that calling this API once for multiple logs is acceptable
* and recommended.
*
* @param ok `true` if appending succeeded.
*/
void notify_log_append_completion(bool ok);

protected:
typedef std::unordered_map<int32, ptr<peer>>::const_iterator peer_itor;

Expand Down Expand Up @@ -914,6 +926,10 @@ protected:

void check_overall_status();

void request_append_entries_for_all();

uint64_t get_current_leader_index();

protected:
static const int default_snapshot_sync_block_size;

Expand Down Expand Up @@ -1393,6 +1409,17 @@ protected:
* The term when `vote_init_timer_` was reset.
*/
std::atomic<ulong> vote_init_timer_term_;

/**
* (Experimental)
* Used when `raft_params::parallel_log_appending_` is set.
* Follower will wait for the asynchronous log appending using
* this event awaiter.
*
* WARNING: We are assuming that only one thraed is using this
* awaiter at a time, by the help of `lock_`.
*/
EventAwaiter* ea_follower_log_append_;
};

} // namespace nuraft;
Expand Down
Loading