Skip to content

Commit

Permalink
Fix to bug of sync log file inversion
Browse files Browse the repository at this point in the history
* If flush happens with `beyondLastSync=true` and `numFilesLimit`,
if may overwrite last synced log file number with smaller number,
that may result in sync order inversion.
  • Loading branch information
greensky00 committed Jan 27, 2025
1 parent cba9b96 commit bcf3716
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 1 deletion.
16 changes: 16 additions & 0 deletions include/libjungle/jungle.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ class DB {
*/
Status getLastFlushedSeqNum(uint64_t& seq_num_out);

/**
* Get the last flushed log file number.
*
* @param[out] log_file_num_out Reference to log file number as a result.
* @return OK on success.
*/
Status getLastFlushedLogFileNum(uint64_t& log_file_num_out);

/**
* Get the last synced (written to file) sequence number.
*
Expand All @@ -378,6 +386,14 @@ class DB {
*/
Status getLastSyncedSeqNum(uint64_t& seq_num_out);

/**
* Get the last synced (written to file) log file number.
*
* @param[out] log_file_num_out Reference to log file number as a result.
* @return OK on success.
*/
Status getLastSyncedLogFileNum(uint64_t& log_file_num_out);

/**
* Get the list of checkpoint markers.
*
Expand Down
12 changes: 12 additions & 0 deletions src/jungle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,24 @@ Status DB::getLastFlushedSeqNum(uint64_t& seq_num_out) {
return p->logMgr->getLastFlushedSeqNum(seq_num_out);
}

Status DB::getLastFlushedLogFileNum(uint64_t& log_file_num_out) {
Status s;
EP( p->checkHandleValidity() );
return p->logMgr->getLastFlushedLogFileNum(log_file_num_out);
}

Status DB::getLastSyncedSeqNum(uint64_t& seq_num_out) {
Status s;
EP( p->checkHandleValidity() );
return p->logMgr->getLastSyncedSeqNum(seq_num_out);
}

Status DB::getLastSyncedLogFileNum(uint64_t& log_file_num_out) {
Status s;
EP( p->checkHandleValidity() );
return p->logMgr->getLastSyncedLogFileNum(log_file_num_out);
}

Status DB::getCheckpoints(std::list<uint64_t>& chk_out) {
Status s;
EP( p->checkHandleValidity() );
Expand Down
26 changes: 25 additions & 1 deletion src/log_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1488,7 +1488,15 @@ Status LogMgr::flush(const FlushOptions& options,
_seq_str(seq_num).c_str(), seq_num_local, ln_from, ln_to);

if (options.beyondLastSync) {
mani->setLastSyncedLog(ln_to);
// WARNING:
// Update last synced log only if it is beyond the last sync.
// Due to `FlushOptions::numFilesLimit`, `ln_to` could be
// less than `last_synced_log`.
uint64_t last_synced_log = 0;
mani->getLastSyncedLog(last_synced_log);
if (ln_to > last_synced_log) {
mani->setLastSyncedLog(ln_to);
}
}
mani->setLastFlushedLog(ln_to);
// Remove log file except for ln_to.
Expand Down Expand Up @@ -1917,6 +1925,22 @@ Status LogMgr::getLastSyncedSeqNum(uint64_t& seq_num_out) {
return Status();
}

Status LogMgr::getLastFlushedLogFileNum(uint64_t& log_file_num_out) {
Status s;
uint64_t ln_flush = 0;
EP( mani->getLastFlushedLog(ln_flush) );
log_file_num_out = ln_flush;
return Status();
}

Status LogMgr::getLastSyncedLogFileNum(uint64_t& log_file_num_out) {
Status s;
uint64_t ln_sync = 0;
EP( mani->getLastSyncedLog(ln_sync) );
log_file_num_out = ln_sync;
return Status();
}

bool LogMgr::checkTimeToFlush(const GlobalConfig& config) {
Status s;
uint64_t l_last_flush = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/log_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ class LogMgr {
Status getLastFlushedSeqNum(uint64_t& seq_num_out);
Status getLastSyncedSeqNum(uint64_t& seq_num_out);

Status getLastFlushedLogFileNum(uint64_t& log_file_num_out);
Status getLastSyncedLogFileNum(uint64_t& log_file_num_out);

bool checkTimeToFlush(const GlobalConfig& config);
Status close();

Expand Down
53 changes: 53 additions & 0 deletions tests/jungle/basic_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,58 @@ int disabling_auto_flush_test() {
return 0;
}


int sync_flush_race_test() {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
jungle::DB* db = nullptr;

jungle::GlobalConfig g_config;
g_config.numFlusherThreads = 1;
g_config.flusherMinRecordsToTrigger = 100;
jungle::init(g_config);

config.maxEntriesInLogFile = 10;
std::string db_file = filename + "_db1";
CHK_Z(jungle::DB::open(&db, db_file, config));

auto insert_keys = [&](size_t from, size_t to) {
for (size_t ii = from; ii < to; ++ii) {
std::string key_str = "key" + TestSuite::lzStr(5, ii);
std::string val_str = "val" + TestSuite::lzStr(5, ii);
CHK_Z( db->set( jungle::KV(key_str, val_str) ) );
}
return 0;
};

// Write 20 log files.
CHK_Z( insert_keys(0, 100) );

// Sync and flush (but 5 files only).
CHK_Z( db->sync(false) );
uint64_t last_synced_log_file_num = 0;
CHK_Z( db->getLastSyncedLogFileNum(last_synced_log_file_num) );

jungle::FlushOptions f_opt;
f_opt.beyondLastSync = true;
f_opt.numFilesLimit = 5;
CHK_Z( db->flushLogs(f_opt) );

uint64_t last_synced_log_file_num2 = 0;
CHK_Z( db->getLastSyncedLogFileNum(last_synced_log_file_num2) );
CHK_EQ(last_synced_log_file_num, last_synced_log_file_num2);

CHK_Z(jungle::DB::close(db));
CHK_Z(jungle::shutdown());

TEST_SUITE_CLEANUP_PATH();
return 0;
}

int main(int argc, char** argv) {
TestSuite ts(argc, argv);

Expand Down Expand Up @@ -3244,6 +3296,7 @@ int main(int argc, char** argv) {
ts.doTest("log flush add new file race test", log_flush_add_new_file_race_test);
ts.doTest("serialized sync and flush test", serialized_sync_and_flush_test);
ts.doTest("disabling auto flush test", disabling_auto_flush_test);
ts.doTest("sync flush race test", sync_flush_race_test);

return 0;
}
Expand Down

0 comments on commit bcf3716

Please sign in to comment.