diff --git a/cmake/CodeCoverage.cmake b/cmake/CodeCoverage.cmake index ecd1010..8e75e30 100644 --- a/cmake/CodeCoverage.cmake +++ b/cmake/CodeCoverage.cmake @@ -162,7 +162,7 @@ function(SETUP_TARGET_FOR_COVERAGE) COMMAND ${LCOV_PATH} --directory . --capture --output-file ${PROJECT_BINARY_DIR}/${Coverage_NAME}.info # add baseline counters COMMAND ${LCOV_PATH} -a ${PROJECT_BINARY_DIR}/${Coverage_NAME}.base -a ${PROJECT_BINARY_DIR}/${Coverage_NAME}.info --output-file ${PROJECT_BINARY_DIR}/${Coverage_NAME}.total - COMMAND ${LCOV_PATH} --remove ${Coverage_NAME}.total ${COVERAGE_EXCLUDES} --output-file ${PROJECT_BINARY_DIR}/${Coverage_NAME}.info.cleaned + COMMAND ${LCOV_PATH} --ignore-errors unused --remove ${Coverage_NAME}.total ${COVERAGE_EXCLUDES} --output-file ${PROJECT_BINARY_DIR}/${Coverage_NAME}.info.cleaned COMMAND ${GENHTML_PATH} -o ${PROJECT_BINARY_DIR}/${Coverage_NAME} ${PROJECT_BINARY_DIR}/${Coverage_NAME}.info.cleaned #COMMAND ${CMAKE_COMMAND} -E remove ${Coverage_NAME}.base ${Coverage_NAME}.info ${Coverage_NAME}.total ${PROJECT_BINARY_DIR}/${Coverage_NAME}.info.cleaned diff --git a/include/libjungle/db_config.h b/include/libjungle/db_config.h index 76fbfce..70b4112 100644 --- a/include/libjungle/db_config.h +++ b/include/libjungle/db_config.h @@ -104,6 +104,7 @@ class DBConfig { DBConfig() : allowOverwriteSeqNum(false) , logSectionOnly(false) + , autoLogFlush(true) , truncateInconsecutiveLogs(true) , logFileTtl_sec(0) , maxKeepingMemtables(0) @@ -143,6 +144,8 @@ class DBConfig { , fastIndexScan(false) , seqLoadingDelayFactor(0) , safeMode(false) + , serializeMultiThreadedLogFlush(false) + , skipManifestSync(false) { tableSizeRatio.push_back(2.5); levelSizeRatio.push_back(10.0); @@ -178,11 +181,21 @@ class DBConfig { */ bool allowOverwriteSeqNum; - /* + /** * Disable table section and use logging part only. */ bool logSectionOnly; + /** + * If it is normal DB instance (`readOnly = false` and `logSectionOnly = false`), + * background flusher thread will automatically flush logs to L0 tables. + * + * WARNING: + * If it is set to `false`, users should manually call `flushLogs()`. + * The more unflushed logs, the more memory consumption. + */ + bool autoLogFlush; + /* * (Only when `logSectionOnly == true`) * Truncate tail logs if they are inconsecutive, @@ -575,6 +588,25 @@ class DBConfig { * real production environment. */ bool safeMode; + + /** + * If `true`, `sync` and `flushLogs` calls by multiple threads will be serialized, + * and executed one by one. + * If `false`, only one thread will execute `sync` and `flushLogs` calls, while + * the other concurrent threads will get `OPERATION_IN_PROGRESS` status. + */ + bool serializeMultiThreadedLogFlush; + + /** + * [EXPERIMENTAL] + * If `true`, when `sync()` is invoked, only the actual log files will be synced, + * not the manifest file. The manifest file is synced when 1) a new log file is + * added, or 2) the DB is closed. + * + * Even without syncing the manifest file, Jungle can recover the last synced + * data by scanning the log files. + */ + bool skipManifestSync; }; class GlobalConfig { diff --git a/include/libjungle/params.h b/include/libjungle/params.h index c9e36e1..ac222aa 100644 --- a/include/libjungle/params.h +++ b/include/libjungle/params.h @@ -205,6 +205,7 @@ struct DebugParams { , newLogBatchCb(nullptr) , getLogFileInfoBySeqCb(nullptr) , logFlushCb(nullptr) + , syncCb(nullptr) , forceMerge(false) {} @@ -299,6 +300,12 @@ struct DebugParams { */ std::function< void(const GenericCbParams&) > logFlushCb; + /** + * Callback function that will be invoked at the beginning log sync + * (reading memtable data and writing them into log files). + */ + std::function< void(const GenericCbParams&) > syncCb; + /** * If true, merge will proceed the task even with the small number * of tables in the level. diff --git a/include/libjungle/sized_buf.h b/include/libjungle/sized_buf.h index 6d15558..d849ba4 100644 --- a/include/libjungle/sized_buf.h +++ b/include/libjungle/sized_buf.h @@ -16,6 +16,7 @@ limitations under the License. #pragma once +#include #include #include #include diff --git a/src/cmd_handler.cc b/src/cmd_handler.cc index 12cf591..6b160d8 100644 --- a/src/cmd_handler.cc +++ b/src/cmd_handler.cc @@ -291,7 +291,7 @@ std::string CmdHandler::hDumpKv(DBWrap* target_dbw, ss << "[" << count << "]" << std::endl; ss << "key: " << HexDump::toString(rec_out.kv.key) << std::endl; ss << "sequence number: " << rec_out.seqNum << std::endl; - ss << "type: " << rec_out.type << std::endl; + ss << "type: " << (int)rec_out.type << std::endl; ss << "meta: " << HexDump::toString(rec_out.meta) << std::endl; if (tokens[0] == "getmeta") { ss << "value: " << rec_out.kv.value.size << " bytes" << std::endl; diff --git a/src/generic_bitmap.h b/src/generic_bitmap.h index 0b277bf..4a0fbfa 100644 --- a/src/generic_bitmap.h +++ b/src/generic_bitmap.h @@ -20,6 +20,7 @@ limitations under the License. #pragma once +#include #include #include diff --git a/src/log_file.cc b/src/log_file.cc index 23ef414..e49eb11 100644 --- a/src/log_file.cc +++ b/src/log_file.cc @@ -401,7 +401,7 @@ Status LogFile::getPrefix(const uint64_t chk, return Status(); } -Status LogFile::flushMemTable(uint64_t upto) { +Status LogFile::flushMemTable(uint64_t upto, uint64_t& flushed_seq_out) { touch(); // Skip unnecessary flushing if (immutable && !fHandle && isSynced()) { @@ -509,7 +509,7 @@ Status LogFile::flushMemTable(uint64_t upto) { RwSerializer rws(fOps, fHandle, true); - TC( mTable->flush(rws, upto) ); + TC( mTable->flush(rws, upto, flushed_seq_out) ); TC( mTable->appendFlushMarker(rws) ); TC( fOps->flush(fHandle) ); diff --git a/src/log_file.h b/src/log_file.h index 0e775d6..7386ee5 100644 --- a/src/log_file.h +++ b/src/log_file.h @@ -93,7 +93,7 @@ class LogFile { bool allow_flushed_log, bool allow_tombstone); - Status flushMemTable(uint64_t upto = NOT_INITIALIZED); + Status flushMemTable(uint64_t upto, uint64_t& flushed_seq_out); Status purgeMemTable(); diff --git a/src/log_manifest.cc b/src/log_manifest.cc index 01e2295..e762b72 100644 --- a/src/log_manifest.cc +++ b/src/log_manifest.cc @@ -480,8 +480,9 @@ Status LogManifest::clone(const std::string& dst_path) { Status LogManifest::store(bool call_fsync) { if (mFileName.empty() || !fOps) return Status::NOT_INITIALIZED; - if (call_fsync) { - // `fsync` is required: calls by multiple threads should be serialized. + if (call_fsync || logMgr->getDbConfig()->serializeMultiThreadedLogFlush) { + // `fsync` is required, or serialize option is on: + // calls by multiple threads should be serialized. std::unique_lock l(mFileWriteLock); return storeInternal(call_fsync); } else { @@ -600,7 +601,7 @@ Status LogManifest::storeInternal(bool call_fsync) { if (need_truncate) { fOps->ftruncate(mFile, ss.pos()); } - + bool backup_done = false; if (call_fsync) { s = fOps->fsync(mFile); diff --git a/src/log_mgr.cc b/src/log_mgr.cc index 75c4281..fd1bf06 100644 --- a/src/log_mgr.cc +++ b/src/log_mgr.cc @@ -43,6 +43,7 @@ LogMgr::LogMgr(DB* parent_db, const LogMgrOptions& lm_opt) , fwdVisibleSeqBarrier(0) , globalBatchStatus(nullptr) , numMemtables(0) + , needSkippedManiSync(false) , myLog(nullptr) , vlSync(VERBOSE_LOG_SUPPRESS_MS) {} @@ -182,6 +183,7 @@ Status LogMgr::init(const LogMgrOptions& lm_opt) { } } + needSkippedManiSync = false; initialized = true; return Status(); @@ -202,10 +204,16 @@ void LogMgr::logMgrSettings() { _log_info( myLog, "initialized log manager, memtable flush buffer %zu, " - "direct-IO %s, custom hash length function %s", + "direct-IO %s, custom hash length function %s, " + "sync multi-threaded log flush %s, " + "skip manifest sync %s, " + "auto log flush %s", g_conf->memTableFlushBufferSize, get_on_off_str(opt.dbConfig->directIoOpt.enabled), - get_on_off_str((bool)opt.dbConfig->customLenForHash) ); + get_on_off_str((bool)opt.dbConfig->customLenForHash), + get_on_off_str(opt.dbConfig->serializeMultiThreadedLogFlush), + get_on_off_str(opt.dbConfig->skipManifestSync), + get_on_off_str(opt.dbConfig->autoLogFlush) ); } Status LogMgr::rollback(uint64_t seq_upto) { @@ -215,7 +223,8 @@ Status LogMgr::rollback(uint64_t seq_upto) { DebugParams d_params = mgr->getDebugParams(); // Return error in read-only mode. - if (getDbConfig()->readOnly) return Status::WRITE_VIOLATION; + const DBConfig* db_config = getDbConfig(); + if (db_config->readOnly) return Status::WRITE_VIOLATION; // WARNING: // Both syncing (memtable -> log) and flushing (log -> table) @@ -225,7 +234,7 @@ Status LogMgr::rollback(uint64_t seq_upto) { const size_t MAX_RETRY_MS = 1000; // 1 second. tt.setDurationMs(MAX_RETRY_MS); - OpSemaWrapper ow_sync(&syncSema); + OpSemaWrapper ow_sync(&syncSema, db_config->serializeMultiThreadedLogFlush); while (!ow_sync.acquire()) { if (tt.timeout()) { _log_err(myLog, "rollback timeout due to sync"); @@ -233,9 +242,9 @@ Status LogMgr::rollback(uint64_t seq_upto) { } Timer::sleepMs(10); } - assert(ow_sync.op_sema->enabled); + assert(ow_sync.opSema->enabled); - OpSemaWrapper ow_flush(&flushSema); + OpSemaWrapper ow_flush(&flushSema, db_config->serializeMultiThreadedLogFlush); while (!ow_flush.acquire()) { if (tt.timeout()) { _log_err(myLog, "rollback timeout due to flush"); @@ -243,9 +252,9 @@ Status LogMgr::rollback(uint64_t seq_upto) { } Timer::sleepMs(10); } - assert(ow_flush.op_sema->enabled); + assert(ow_flush.opSema->enabled); - OpSemaWrapper ow_reclaim(&reclaimSema); + OpSemaWrapper ow_reclaim(&reclaimSema, db_config->serializeMultiThreadedLogFlush); while (!ow_reclaim.acquire()) { if (tt.timeout()) { _log_err(myLog, "rollback timeout due to reclaim"); @@ -253,7 +262,7 @@ Status LogMgr::rollback(uint64_t seq_upto) { } Timer::sleepMs(10); } - assert(ow_reclaim.op_sema->enabled); + assert(ow_reclaim.opSema->enabled); _log_info(myLog, "[ROLLBACK] upto %zu", seq_upto); @@ -473,6 +482,8 @@ Status LogMgr::cloneManifest(DB* snap_handle, Status LogMgr::addNewLogFile(LogFileInfoGuard& cur_log_file_info, LogFileInfoGuard& new_log_file_info) { + const DBConfig* db_config = getDbConfig(); + std::unique_lock ll(addNewLogFileMutex); Status s; @@ -508,8 +519,10 @@ Status LogMgr::addNewLogFile(LogFileInfoGuard& cur_log_file_info, _log_info(myLog, "moved to a new log file %ld, start seq %s", new_log_num, _seq_str(start_seqnum).c_str()); - // Sync manifest file. - mani->store(false); + if (!db_config->skipManifestSync) { + // Sync manifest file. + mani->store(false); + } } else { // Otherwise, other thread already added a new log file. @@ -524,6 +537,8 @@ Status LogMgr::addNewLogFile(LogFileInfoGuard& cur_log_file_info, new_log_file_info = LogFileInfoGuard(lf_info); + // Next `sync()` call should sync manifest file, regardless of `skipManifestSync`. + needSkippedManiSync = true; return Status(); } @@ -1124,21 +1139,32 @@ Status LogMgr::sync(bool call_fsync) { Status LogMgr::syncNoWait(bool call_fsync) { // Return error in read-only mode. - if (getDbConfig()->readOnly) return Status::WRITE_VIOLATION; + const DBConfig* db_config = getDbConfig(); + if (db_config->readOnly) return Status::WRITE_VIOLATION; // Only one sync operation at a time. - OpSemaWrapper ow(&syncSema); + OpSemaWrapper ow(&syncSema, db_config->serializeMultiThreadedLogFlush); if (!ow.acquire()) { _log_debug(myLog, "Sync failed. Other thread is working on it."); return Status::OPERATION_IN_PROGRESS; } - assert(ow.op_sema->enabled); + assert(ow.opSema->enabled); return syncInternal(call_fsync); } Status LogMgr::syncInternal(bool call_fsync) { Status s; uint64_t ln_from, ln_to; + + DBMgr* dbm = DBMgr::getWithoutInit(); + if (dbm && dbm->isDebugCallbackEffective()) { + DebugParams dp = dbm->getDebugParams(); + if (dp.syncCb) { + DebugParams::GenericCbParams p; + dp.syncCb(p); + } + } + s = mani->getMaxLogFileNum(ln_to); if (!s) { // No log, do nothing. @@ -1176,8 +1202,20 @@ Status LogMgr::syncInternal(bool call_fsync) { if (li.empty() || li.ptr->isRemoved()) continue; uint64_t before_sync = li->file->getSyncedSeqNum(); - EP( li->file->flushMemTable( seq_barrier ? seq_barrier : NOT_INITIALIZED ) ); - uint64_t after_sync = li->file->getSyncedSeqNum(); + uint64_t after_sync = NOT_INITIALIZED; + EP( li->file->flushMemTable( (seq_barrier ? seq_barrier : NOT_INITIALIZED), + after_sync ) ); + if (call_fsync) { + EP( li->file->sync() ); + } + + // WARNING: + // We should update the syncedSeqNum after the sync() operation. + // If sync() fails and we update syncedSeqNum beforehand, there's a + // risk that the manifest might write an incorrect syncedSeqNum in + // another call stack (e.g., addNewLogFile()). This could lead to + // data loss in the event of a crash. + li->file->setSyncedSeqNum(after_sync); _log_( log_level, myLog, "synced log file %zu, min seq %s, " "flush seq %s, sync seq %s -> %s, max seq %s", ii, @@ -1186,9 +1224,7 @@ Status LogMgr::syncInternal(bool call_fsync) { _seq_str( before_sync ).c_str(), _seq_str( after_sync ).c_str(), _seq_str( li->file->getMaxSeqNum() ).c_str() ); - if (call_fsync) { - EP( li->file->sync() ); - } + if (valid_number(after_sync)) { last_synced_log = ii; } @@ -1200,23 +1236,28 @@ Status LogMgr::syncInternal(bool call_fsync) { // Sync up manifest file next mani->setLastSyncedLog(last_synced_log); - EP( mani->store(call_fsync) ); - _log_(log_level, myLog, "updated log manifest file."); + const DBConfig* db_config = getDbConfig(); + if (!db_config->skipManifestSync || needSkippedManiSync) { + EP( mani->store(call_fsync) ); + _log_(log_level, myLog, "updated log manifest file."); + needSkippedManiSync = false; + } return Status(); } Status LogMgr::discardDirty(uint64_t seq_begin) { // Return error in read-only mode. - if (getDbConfig()->readOnly) return Status::WRITE_VIOLATION; + const DBConfig* db_config = getDbConfig(); + if (db_config->readOnly) return Status::WRITE_VIOLATION; // Should not race with sync. - OpSemaWrapper ow(&syncSema); + OpSemaWrapper ow(&syncSema, db_config->serializeMultiThreadedLogFlush); if (!ow.acquire()) { _log_debug(myLog, "discard failed. Other thread is working on it."); return Status::OPERATION_IN_PROGRESS; } - assert(ow.op_sema->enabled); + assert(ow.opSema->enabled); Status s; uint64_t ln_from, ln_to; @@ -1263,12 +1304,13 @@ Status LogMgr::flush(const FlushOptions& options, return Status::INVALID_SEQNUM; } - OpSemaWrapper ow(&flushSema); + const DBConfig* db_config = getDbConfig(); + OpSemaWrapper ow(&flushSema, db_config->serializeMultiThreadedLogFlush); if (!ow.acquire()) { _log_debug(myLog, "Flush skipped. Other thread is working on it."); return Status::OPERATION_IN_PROGRESS; } - assert(ow.op_sema->enabled); + assert(ow.opSema->enabled); Status s; Timer tt; @@ -1599,12 +1641,13 @@ void LogMgr::adjustThrottlingExtreme() { } Status LogMgr::doLogReclaim() { - OpSemaWrapper ow(&reclaimSema); + const DBConfig* db_config = getDbConfig(); + OpSemaWrapper ow(&reclaimSema, db_config->serializeMultiThreadedLogFlush); if (!ow.acquire()) { _log_debug(myLog, "Reclaim skipped. Other thread is working on it."); return Status::OPERATION_IN_PROGRESS; } - assert(ow.op_sema->enabled); + assert(ow.opSema->enabled); mani->reclaimExpiredLogFiles(); return Status(); @@ -1867,7 +1910,6 @@ Status LogMgr::getLastSyncedSeqNum(uint64_t& seq_num_out) { return Status(); } - bool LogMgr::checkTimeToFlush(const GlobalConfig& config) { Status s; uint64_t l_last_flush = 0; @@ -1875,10 +1917,12 @@ bool LogMgr::checkTimeToFlush(const GlobalConfig& config) { uint64_t seq_last_flush = NOT_INITIALIZED; uint64_t seq_max = NOT_INITIALIZED; - if (getDbConfig()->readOnly) return false; + const DBConfig* db_config = getDbConfig(); + if (db_config->readOnly) return false; if (syncSema.grabbed) return false; if (flushSema.grabbed) return false; - if (getDbConfig()->logSectionOnly) return false; + if (db_config->logSectionOnly) return false; + if (!db_config->autoLogFlush) return false; const size_t MAX_TRY = 10; size_t num_try = 0; @@ -1926,7 +1970,8 @@ Status LogMgr::close() { // If sync() or flush() is running, // wait until they finish their jobs. - OpSemaWrapper op_sync(&syncSema); + const DBConfig* db_config = getDbConfig(); + OpSemaWrapper op_sync(&syncSema, db_config->serializeMultiThreadedLogFlush); _log_info(myLog, "Wait for on-going sync operation."); uint64_t ticks = 0; @@ -1937,15 +1982,16 @@ Status LogMgr::close() { syncSema.enabled = false; _log_info(myLog, "Disabled syncing for %p, %zu ticks", this, ticks); - if (!getDbConfig()->readOnly) { + if (!db_config->readOnly) { // Last sync before close (not in read-only mode). + needSkippedManiSync = true; syncInternal(false); _log_info(myLog, "Last sync done"); } else { _log_info(myLog, "read-only mode: skip the last sync"); } - OpSemaWrapper op_flush(&flushSema); + OpSemaWrapper op_flush(&flushSema, db_config->serializeMultiThreadedLogFlush); _log_info(myLog, "Wait for on-going flush operation."); ticks = 0; while (!op_flush.acquire()) { @@ -1956,7 +2002,7 @@ Status LogMgr::close() { flushSema.enabled = false; _log_info(myLog, "Disabled flushing for %p, %zu ticks", this, ticks); - OpSemaWrapper op_reclaim(&reclaimSema); + OpSemaWrapper op_reclaim(&reclaimSema, db_config->serializeMultiThreadedLogFlush); _log_info(myLog, "Wait for on-going log reclaim operation."); ticks = 0; while (!op_reclaim.acquire()) { diff --git a/src/log_mgr.h b/src/log_mgr.h index 743aaec..6cd8d87 100644 --- a/src/log_mgr.h +++ b/src/log_mgr.h @@ -87,30 +87,44 @@ struct OpSema { OpSema() : enabled(true), grabbed(false) {} std::atomic enabled; std::atomic grabbed; + + // Used only when `synchronizeMultiThreadedLogFlush` is on. + std::mutex lock; }; struct OpSemaWrapper { - OpSemaWrapper(OpSema* _op_sema) : op_sema(_op_sema), acquired(false) {} + OpSemaWrapper(OpSema* op_sema, bool lock_mode = false) + : opSema(op_sema), acquired(false), lockMode(lock_mode) {} ~OpSemaWrapper() { if (acquired) { - op_sema->grabbed = false; + opSema->grabbed = false; + if (lockMode) { + opSema->lock.unlock(); + } } - op_sema = nullptr; + opSema = nullptr; acquired = false; } bool acquire() { - bool expected = false; - bool val = true; - if ( op_sema->enabled && - op_sema->grabbed.compare_exchange_weak(expected, val) ) { + if (lockMode) { + opSema->lock.lock(); + opSema->grabbed = true; acquired = true; + } else { + bool expected = false; + bool val = true; + if ( opSema->enabled && + opSema->grabbed.compare_exchange_weak(expected, val) ) { + acquired = true; + } } return acquired; } - OpSema* op_sema; + OpSema* opSema; bool acquired; + bool lockMode; }; namespace checker { class Checker; } @@ -444,6 +458,11 @@ class LogMgr { */ std::atomic numMemtables; + /** + * If `true`, the sync of manifest is a must in the next `sync()` call. + */ + std::atomic needSkippedManiSync; + /** * Logger. */ diff --git a/src/memtable.cc b/src/memtable.cc index 394bfd3..10c2195 100644 --- a/src/memtable.cc +++ b/src/memtable.cc @@ -1233,7 +1233,7 @@ Status MemTable::findOffsetOfSeq(SimpleLogger* logger, } // MemTable flush: skiplist (memory) -> log file. (disk) -Status MemTable::flush(RwSerializer& rws, uint64_t upto) +Status MemTable::flush(RwSerializer& rws, uint64_t upto, uint64_t& flushed_seq_out) { if (minSeqNum == NOT_INITIALIZED) { // No log in this file. Just do nothing and return OK. @@ -1450,7 +1450,7 @@ Status MemTable::flush(RwSerializer& rws, uint64_t upto) skiplist_get_size(idxBySeq), start_seqnum, seqnum_upto); - syncedSeqNum = seqnum_upto; + flushed_seq_out = seqnum_upto; return Status(); } catch (Status s) { diff --git a/src/memtable.h b/src/memtable.h index 203ae32..bf975f4 100644 --- a/src/memtable.h +++ b/src/memtable.h @@ -97,7 +97,7 @@ class MemTable { uint64_t& offset_out, uint64_t* padding_start_pos_out = nullptr); - Status flush(RwSerializer& rws, uint64_t upto = NOT_INITIALIZED); + Status flush(RwSerializer& rws, uint64_t upto, uint64_t& flushed_seq_out); Status checkpoint(uint64_t& seq_num_out); Status getLogsToFlush(const uint64_t seq_num, std::list& list_out, diff --git a/src/table_file.cc b/src/table_file.cc index 26231c9..e35f832 100644 --- a/src/table_file.cc +++ b/src/table_file.cc @@ -903,7 +903,7 @@ Status TableFile::setSingle(uint32_t key_hash_val, // Immediate purging option, // only for the bottom-most non-zero level. InternalMeta i_meta_from_rec; - readInternalMeta(rec.meta, i_meta_from_rec); + readInternalMeta(SizedBuf(doc.metalen, doc.meta), i_meta_from_rec); if (i_meta_from_rec.isTombstone || force_delete) { fs = fdb_del(kvs_db, &doc); deletion_executed = true; diff --git a/tests/jungle/basic_op_test.cc b/tests/jungle/basic_op_test.cc index 3f6c1cc..36a62bc 100644 --- a/tests/jungle/basic_op_test.cc +++ b/tests/jungle/basic_op_test.cc @@ -2980,7 +2980,7 @@ int log_flush_add_new_file_race_test() { EventAwaiter ea_add_new_file; EventAwaiter ea_main; - const size_t WAIT_TIME_MS = 3600 * 1000; + const size_t WAIT_TIME_MS = 10 * 1000; // Enable debugging hook for new log file and log flush. jungle::DebugParams dp; @@ -3040,6 +3040,156 @@ int log_flush_add_new_file_race_test() { return 0; } +int serialized_sync_and_flush_test() { + std::string filename; + TEST_SUITE_PREPARE_PATH(filename); + + jungle::Status s; + jungle::DBConfig config; + TEST_CUSTOM_DB_CONFIG(config); + jungle::DB* db; + + config.serializeMultiThreadedLogFlush = true; + CHK_Z(jungle::DB::open(&db, filename, config)); + + auto insert_keys = [&](size_t from, size_t to) { + for (size_t ii = from; ii < to; ++ii) { + std::string key_str = "key" + TestSuite::lzStr(5, ii); + std::string val_str = "val" + TestSuite::lzStr(5, ii); + CHK_Z( db->set( jungle::KV(key_str, val_str) ) ); + } + return 0; + }; + + CHK_Z( insert_keys(0, 10) ); + + const size_t WAIT_TIME_MS = 3600 * 1000; + EventAwaiter ea_main; + + // Enable debugging hook for new log file and log flush. + jungle::DebugParams dp; + std::thread* t1(nullptr); + std::thread* t2(nullptr); + jungle::Status t1_res(jungle::Status::ERROR), t2_res(jungle::Status::ERROR); + std::atomic sync_cb_invoked(false), flush_cb_invoked(false); + + dp.syncCb = [&](const jungle::DebugParams::GenericCbParams& pp) { + bool exp = false; + bool des = true; + // This CB is executed only once, so only 2 threads are racing. + if (sync_cb_invoked.compare_exchange_strong(exp, des)) { + t1 = new std::thread([&]() { + // Let another thread sync simultaneously. + // This thread should be blocked by the main thread. + t1_res = db->sync(false); + + // Once it's done, resume the main thread flow. + ea_main.invoke(); + }); + } + }; + + dp.logFlushCb = [&](const jungle::DebugParams::GenericCbParams& pp) { + bool exp = false; + bool des = true; + // This CB is executed only once, so only 2 threads are racing. + if (flush_cb_invoked.compare_exchange_strong(exp, des)) { + t2 = new std::thread([&]() { + // Let another thread flush simultaneously. + // This thread should be blocked by the main thread. + jungle::FlushOptions f_opt; + t2_res = db->flushLogs(f_opt); + + // Once it's done, resume the main thread flow. + ea_main.invoke(); + }); + } + }; + jungle::DB::setDebugParams(dp); + jungle::DB::enableDebugCallbacks(true); + + // Do sync. + CHK_Z(db->sync(false)); + ea_main.wait_ms(WAIT_TIME_MS); + ea_main.reset(); + // Both thread should succeed without `OPERATION_IN_PROGRESS`. + CHK_Z(t1_res); + + // Do flush. + jungle::FlushOptions f_opt; + CHK_Z(db->flushLogs(f_opt)); + ea_main.wait_ms(WAIT_TIME_MS); + ea_main.reset(); + // Both thread should succeed without `OPERATION_IN_PROGRESS`. + CHK_Z(t2_res.getValue()); + + CHK_Z(jungle::DB::close(db)); + CHK_Z(jungle::shutdown()); + + if (t1->joinable()) { + t1->join(); + } + delete t1; + if (t2->joinable()) { + t2->join(); + } + delete t2; + + TEST_SUITE_CLEANUP_PATH(); + return 0; +} + +int disabling_auto_flush_test() { + std::string filename; + TEST_SUITE_PREPARE_PATH(filename); + + jungle::Status s; + jungle::DBConfig config; + TEST_CUSTOM_DB_CONFIG(config); + jungle::DB* db1 = nullptr; + jungle::DB* db2 = nullptr; + + jungle::GlobalConfig g_config; + g_config.numFlusherThreads = 1; + g_config.flusherMinRecordsToTrigger = 100; + jungle::init(g_config); + + config.maxEntriesInLogFile = 100; + std::string db1_file = filename + "_db1"; + CHK_Z(jungle::DB::open(&db1, db1_file, config)); + + config.autoLogFlush = false; + std::string db2_file = filename + "_db2"; + CHK_Z(jungle::DB::open(&db2, db2_file, config)); + + auto insert_keys = [&](size_t from, size_t to) { + for (size_t ii = from; ii < to; ++ii) { + std::string key_str = "key" + TestSuite::lzStr(5, ii); + std::string val_str = "val" + TestSuite::lzStr(5, ii); + CHK_Z( db1->set( jungle::KV(key_str, val_str) ) ); + CHK_Z( db2->set( jungle::KV(key_str, val_str) ) ); + } + return 0; + }; + + CHK_Z( insert_keys(0, 200) ); + + // Wait longer than flusher sleep time. + TestSuite::sleep_ms(g_config.flusherSleepDuration_ms * 2, "wait for flusher.."); + + uint64_t db1_flushed_seqnum = 0, db2_flushed_seqnum = 0; + CHK_Z(db1->getLastFlushedSeqNum(db1_flushed_seqnum)); + CHK_EQ(jungle::Status::INVALID_SEQNUM, + db2->getLastFlushedSeqNum(db2_flushed_seqnum).getValue()); + + CHK_Z(jungle::DB::close(db1)); + CHK_Z(jungle::DB::close(db2)); + CHK_Z(jungle::shutdown()); + + TEST_SUITE_CLEANUP_PATH(); + return 0; +} + int main(int argc, char** argv) { TestSuite ts(argc, argv); @@ -3094,6 +3244,8 @@ int main(int argc, char** argv) { key_length_limit_for_hash_test, TestRange( {24, 8, 7, 0} )); ts.doTest("sample key test", sample_key_test); ts.doTest("log flush add new file race test", log_flush_add_new_file_race_test); + ts.doTest("serialized sync and flush test", serialized_sync_and_flush_test); + ts.doTest("disabling auto flush test", disabling_auto_flush_test); return 0; } diff --git a/tests/jungle/log_reclaim_test.cc b/tests/jungle/log_reclaim_test.cc index 74260c7..e66f332 100644 --- a/tests/jungle/log_reclaim_test.cc +++ b/tests/jungle/log_reclaim_test.cc @@ -1660,6 +1660,68 @@ int dedicated_flusher_test() { return 0; } +int sync_wo_manifest_test() { + std::string filename; + TEST_SUITE_PREPARE_PATH(filename); + + jungle::Status s; + jungle::DBConfig config; + TEST_CUSTOM_DB_CONFIG(config); + jungle::DB* db = nullptr; + + jungle::GlobalConfig g_config; + g_config.numFlusherThreads = 1; + jungle::init(g_config); + + config.maxEntriesInLogFile = 10; + config.skipManifestSync = true; + CHK_Z(jungle::DB::open(&db, filename, config)); + + auto insert_keys = [&](size_t from, size_t to) { + for (size_t ii = from; ii < to; ++ii) { + std::string key_str = "key" + TestSuite::lzStr(5, ii); + std::string val_str = "val" + TestSuite::lzStr(5, ii); + CHK_Z( db->set( jungle::KV(key_str, val_str) ) ); + } + return 0; + }; + auto verify = [&](size_t upto) { + for (size_t ii = 0; ii < upto; ++ii) { + TestSuite::setInfo("ii=%zu", ii); + jungle::SizedBuf value_out; + jungle::SizedBuf::Holder h(value_out); + std::string key_str = "key" + TestSuite::lzStr(5, ii); + std::string val_str = "val" + TestSuite::lzStr(5, ii); + CHK_Z( db->get(jungle::SizedBuf(key_str), value_out) ); + CHK_EQ(val_str, value_out.toString()); + } + return 0; + }; + + CHK_Z(insert_keys(0, 11)); + CHK_Z(db->sync(true)); + + CHK_Z(insert_keys(11, 15)); + CHK_Z(db->sync(true)); + + // Copy file at this moment to mimic crash. + std::string clone_path = filename + "_clone"; + TestSuite::copyfile(filename, clone_path); + + CHK_Z(jungle::DB::close(db)); + + // Open clone. + // Even with crash without manifest sync, all 15 logs should be there. + CHK_Z(jungle::DB::open(&db, clone_path, config)); + CHK_Z(verify(15)); + CHK_Z(jungle::DB::close(db)); + + CHK_Z(jungle::shutdown()); + + TEST_SUITE_CLEANUP_PATH(); + return 0; +} + } using namespace log_reclaim_test; @@ -1741,10 +1803,12 @@ int main(int argc, char** argv) { ts.doTest("snapshot on purged memtable test", snapshot_on_purged_memtable_test); - ts.doTest("dedicated flusher test", dedicated_flusher_test); + ts.doTest("sync without manifest test", + sync_wo_manifest_test); + #if 0 ts.doTest("reload empty files test", reload_with_empty_files_test_load);