Skip to content

Commit

Permalink
Handle the case when logs do not exist (#37)
Browse files Browse the repository at this point in the history
* If there is no snapshot, and logs are already compacted, leader
should protect itself rather than crashing.

* Update logger to fix recursive crash handler issue.
  • Loading branch information
greensky00 authored Sep 1, 2019
1 parent 81472da commit 9d08c11
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 5 deletions.
2 changes: 2 additions & 0 deletions examples/in_memory_log_store.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ ptr< std::vector< ptr<log_entry> > >
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
Expand All @@ -120,6 +121,7 @@ ptr<std::vector<ptr<log_entry>>>
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
Expand Down
6 changes: 3 additions & 3 deletions examples/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Author/Developer(s): Jung-Sang Ahn
Original Copyright 2017 Jung-Sang Ahn
See URL: https://github.com/greensky00/simple_logger
(v0.3.22)
(v0.3.24)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -584,8 +584,8 @@ SimpleLoggerMgr::~SimpleLoggerMgr() {
termination = true;

#if defined(__linux__) || defined(__APPLE__)
if (oldSigSegvHandler) signal(SIGSEGV, oldSigSegvHandler);
if (oldSigAbortHandler) signal(SIGABRT, oldSigAbortHandler);
signal(SIGSEGV, oldSigSegvHandler);
signal(SIGABRT, oldSigAbortHandler);
#endif
{ std::unique_lock<std::mutex> l(cvFlusherLock);
cvFlusher.notify_all();
Expand Down
53 changes: 52 additions & 1 deletion examples/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Author/Developer(s): Jung-Sang Ahn
Original Copyright 2017 Jung-Sang Ahn
See URL: https://github.com/greensky00/simple_logger
(v0.3.22)
(v0.3.24)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ limitations under the License.
#pragma once

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <list>
Expand Down Expand Up @@ -79,6 +80,55 @@ limitations under the License.
#define _s_trace(l) _stream_(SimpleLogger::TRACE, l)


// Do printf style log, but print logs in `lv1` level during normal time,
// once in given `interval_ms` interval, print a log in `lv2` level.
// The very first log will be printed in `lv2` level.
//
// This function is global throughout the process, so that
// multiple threads will share the interval.
#define _timed_log_g(l, interval_ms, lv1, lv2, ...) \
{ \
_timed_log_definition(static); \
_timed_log_body(l, interval_ms, lv1, lv2, __VA_ARGS__); \
}

// Same as `_timed_log_g` but per-thread level.
#define _timed_log_t(l, interval_ms, lv1, lv2, ...) \
{ \
_timed_log_definition(thread_local); \
_timed_log_body(l, interval_ms, lv1, lv2, __VA_ARGS__); \
}

#define _timed_log_definition(prefix) \
prefix std::mutex timer_lock; \
prefix bool first_event_fired = false; \
prefix std::chrono::system_clock::time_point last_timeout = \
std::chrono::system_clock::now();

#define _timed_log_body(l, interval_ms, lv1, lv2, ...) \
std::chrono::system_clock::time_point cur = \
std::chrono::system_clock::now(); \
std::chrono::duration<double> elapsed = cur - last_timeout; \
bool timeout = false; \
{ std::lock_guard<std::mutex> l(timer_lock); \
if ( elapsed.count() * 1000 > interval_ms || \
!first_event_fired ) { \
cur = std::chrono::system_clock::now(); \
elapsed = cur - last_timeout; \
if ( elapsed.count() * 1000 > interval_ms || \
!first_event_fired ) { \
timeout = first_event_fired = true; \
last_timeout = cur; \
} \
} \
} \
if (timeout) { \
_log_(lv2, l, __VA_ARGS__); \
} else { \
_log_(lv1, l, __VA_ARGS__); \
}


class SimpleLoggerMgr;
class SimpleLogger {
friend class SimpleLoggerMgr;
Expand All @@ -94,6 +144,7 @@ class SimpleLogger {
INFO = 4,
DEBUG = 5,
TRACE = 6,
UNKNOWN = 99,
};

class LoggerStream : public std::ostream {
Expand Down
20 changes: 20 additions & 0 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,26 @@ ptr<req_msg> raft_server::create_append_entries_req(peer& p) {
return create_sync_snapshot_req(p, last_log_idx, term, commit_idx);
}

if (last_log_idx + 1 < starting_idx) {
static timer_helper msg_timer(5000000);
static bool first_event_fired = false;

// Neither snapshot nor log exists, it should not happen
// and probably user did something wrong.
// Return here to protect leader itself.
int log_lv = L_TRACE;
if (!first_event_fired || msg_timer.timeout()) {
msg_timer.reset();
first_event_fired = true;
log_lv = L_ERROR;
}
p_lv(log_lv,
"neither snapshot nor log exists, peer %d, last log %zu, "
"leader's start log %zu",
p.get_id(), last_log_idx, starting_idx);
return ptr<req_msg>();
}

ulong last_log_term = term_for_log(last_log_idx);
ulong end_idx = std::min( cur_nxt_idx,
last_log_idx + 1 +
Expand Down
2 changes: 1 addition & 1 deletion src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void raft_server::handle_hb_timeout(int32 srv_id) {

if (!check_leadership_validity()) return;

p_db("Heartbeat timeout for %d", p->get_id());
p_db("heartbeat timeout for %d", p->get_id());
if (role_ == srv_role::leader) {
update_target_priority();
request_append_entries(p);
Expand Down
66 changes: 66 additions & 0 deletions tests/unit/failure_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,69 @@ int rmv_not_resp_srv_wq_test(bool explicit_failure) {
return 0;
}

int force_log_compaction_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";
std::string s3_addr = "S3";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
RaftPkg s3(f_base, 3, s3_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
// Do not create snapshot.
param.snapshot_distance_ = 0;
pp->raftServer->update_params(param);
}

const size_t NUM = 10;

// Append messages asynchronously.
for (size_t ii=0; ii<NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
s1.raftServer->append_entries( {msg} );
}

// Send it to S2 only.
s1.fNet->execReqResp(s2_addr);
s1.fNet->execReqResp(s2_addr);
s1.fNet->makeReqFailAll(s3_addr);

// Force log compaction.
s1.sMgr->load_log_store()->compact(NUM / 2);

// Trigger heartbeat, it should be ok, without any crash.
s1.fTimer->invoke( timer_task_type::heartbeat_timer );
s1.fNet->execReqResp(s2_addr);

// One more time, after 100ms.
TestSuite::sleep_ms(100);
s1.fTimer->invoke( timer_task_type::heartbeat_timer );
s1.fNet->execReqResp(s2_addr);

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();

f_base->destroy();

return 0;
}

} // namespace failure_test;
using namespace failure_test;

Expand All @@ -240,6 +303,9 @@ int main(int argc, char** argv) {
rmv_not_resp_srv_wq_test,
TestRange<bool>({false, true}));

ts.doTest( "force log compaction test",
force_log_compaction_test );

return 0;
}

0 comments on commit 9d08c11

Please sign in to comment.