Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 7, 2024
1 parent 3b91fe8 commit 293c9f7
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool is_retry, bool* corrupted_wal_found,
RecoveryContext* recovery_ctx) {
// ------------ DBOpenLogReporter: start ------------ //
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
Expand All @@ -1131,11 +1132,14 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
fname, static_cast<int>(bytes));
}
};
// ------------ DBOpenLogReporter: end ------------ //

mutex_.AssertHeld();
Status status;
bool old_log_record = false;
std::unordered_map<int, VersionEdit> version_edits;

// ------------ SetupLogFilesRecovery: start ------------ //
// no need to refcount because iteration is under mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
VersionEdit edit;
Expand Down Expand Up @@ -1168,18 +1172,24 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
min_wal_number =
std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
// ------------ SetupLogFilesRecovery: end ------------ //

// ------------ ProcessLogFiles: start ------------ //
for (auto wal_number : wal_numbers) {
// -------- ProcessLogFile: start -------- //
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is older than min log to keep #%" PRIu64,
wal_number, min_wal_number);
continue;
}
// ---- SetupLogFileProcess: start ---- //
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(wal_number);
// ---- SetupLogFileProcess: end ---- //
// Open the log file
std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), wal_number);
Expand All @@ -1200,6 +1210,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
continue;
}

// ---- InitializeLogReader: start ---- //
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<FSSequentialFile> file;
Expand Down Expand Up @@ -1239,6 +1250,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, wal_number);
// ---- InitializeLogReader: end ---- //

// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
Expand All @@ -1256,11 +1268,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
immutable_db_options_.wal_recovery_mode,
&record_checksum) &&
status.ok()) {
// ---- ProcessLogRecord: start ---- //
if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
// --- InitializeWriteBatchForLogRecord: start --- //
// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums
WriteBatch batch;
Expand Down Expand Up @@ -1295,6 +1309,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
if (!status.ok()) {
return status;
}
// --- InitializeWriteBatchForLogRecord: end --- //

SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);
if (sequence > kMaxSequenceNumber) {
Expand All @@ -1307,13 +1322,15 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,

if (immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
// --- MaybeReviseStopReplayForCorruption: start --- //
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could
// happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered.
if (sequence == *next_sequence) {
stop_replay_for_corruption = false;
}
// --- MaybeReviseStopReplayForCorruption: end --- //
if (stop_replay_for_corruption) {
logFileDropped();
break;
Expand All @@ -1328,6 +1345,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
continue;
}

// --- InsertLogRecordToMemtable: start --- //
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --
Expand All @@ -1339,6 +1357,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
// --- InsertLogRecordToMemtable: end --- //
MaybeIgnoreError(&status);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
Expand All @@ -1347,6 +1366,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
continue;
}

// --- MaybeWriteLevel0TableForRecovery: start --- //
if (has_valid_writes && !read_only) {
// we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB
Expand All @@ -1372,9 +1392,12 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
*next_sequence - 1);
}
}
// --- MaybeWriteLevel0TableForRecovery: end --- //
// ---- ProcessLogRecord: end ---- //
}

if (!status.ok() || old_log_record) {
// ---- HandleNonOkStatusOrOldLogRecord: start ---- //
if (status.IsNotSupported()) {
// We should not treat NotSupported as corruption. It is rather a clear
// sign that we are processing a WAL that is produced by an incompatible
Expand Down Expand Up @@ -1416,8 +1439,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
WALRecoveryMode::kAbsoluteConsistency);
return status;
}
// ---- HandleNonOkStatusOrOldLogRecord: end ---- //
}

// ---- FinishLogFileProcess: start ---- //
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
auto last_sequence = *next_sequence - 1;
Expand All @@ -1427,7 +1452,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
// ---- FinishLogFileProcess: end ---- //
// -------- ProcessLogFile: end -------- //
}
// -------- MaybeHandleStopReplayForCorruptionForInconsistency: start -------- //
// Compare the corrupted log number to all columnfamily's current log number.
// Abort Open() if any column family's log number is greater than
// the corrupted log number, which means CF contains data beyond the point of
Expand Down Expand Up @@ -1468,7 +1496,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
}
}
// -------- MaybeHandleStopReplayForCorruptionForInconsistency: end -------- //

// -------- MaybeFlushFinalMemtableOrRestoreActiveLogFiles: start -------- //
// True if there's any data in the WALs; if not, we can skip re-processing
// them later
bool data_seen = false;
Expand Down Expand Up @@ -1568,9 +1598,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
.PermitUncheckedError();
}
}
// -------- MaybeFlushFinalMemtableOrRestoreActiveLogFiles: end -------- //
// ------------ ProcessLogFiles: end ------------ //

// ------------ FinishLogFilesRecovery: start ------------ //
event_logger_.Log() << "job" << job_id << "event"
<< "recovery_finished";
// ------------ FinishLogFilesRecovery: end ------------ //

return status;
}
Expand Down

0 comments on commit 293c9f7

Please sign in to comment.