Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 15, 2024
1 parent 2ff9f95 commit fcb7e15
Show file tree
Hide file tree
Showing 2 changed files with 596 additions and 273 deletions.
87 changes: 87 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ class Directories {
std::unique_ptr<FSDirectory> wal_dir_;
};

struct DBOpenLogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
bool* old_log_record;
void Corruption(size_t bytes, const Status& s) override;

void OldLogRecord(size_t bytes) override;
};

// While DB is the public interface of RocksDB, and DBImpl is the actual
// class implementing it. It's the entrance of the core RocksdB engine.
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
Expand Down Expand Up @@ -2043,6 +2054,82 @@ class DBImpl : public DB {
bool is_retry, bool* corrupted_log_found,
RecoveryContext* recovery_ctx);

void SetupLogFilesRecovery(
const std::vector<uint64_t>& wal_numbers,
std::unordered_map<int, VersionEdit>* version_edits, int* job_id,
uint64_t* min_wal_number);

Status ProcessLogFiles(const std::vector<uint64_t>& wal_numbers,
bool read_only, bool is_retry, uint64_t min_wal_number,
int job_id, SequenceNumber* next_sequence,
std::unordered_map<int, VersionEdit>* version_edits,
bool* corrupted_wal_found,
RecoveryContext* recovery_ctx);

Status ProcessLogFile(
uint64_t wal_number, uint64_t min_wal_number, bool is_retry,
bool read_only, int job_id, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

void SetupLogFileProcessing(uint64_t wal_number);

Status InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname, bool* const old_log_record,
Status* const reporter_status,
DBOpenLogReporter* reporter,
std::unique_ptr<log::Reader>& reader);
Status ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status InitializeWriteBatchForLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, WriteBatch* batch,
std::unique_ptr<WriteBatch>& new_batch, WriteBatch*& batch_to_use,
uint64_t* record_checksum);

void MaybeReviseStopReplayForCorruption(
SequenceNumber sequence, SequenceNumber const* const next_sequence,
bool* stop_replay_for_corruption);

Status InsertLogRecordToMemtable(WriteBatch* batch_to_use,
uint64_t wal_number,
SequenceNumber* next_sequence,
bool* has_valid_writes);

Status MaybeWriteLevel0TableForRecovery(
bool has_valid_writes, bool read_only, uint64_t wal_number, int job_id,
SequenceNumber const* const next_sequence,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status log_read_status, bool* old_log_record,
bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found);

void FinishLogFileProcessing(SequenceNumber const* const next_sequence,
const Status& status);

// Return `Status::Corruption()` when `stop_replay_for_corruption == true` and
// exits inconsistency between SST and WAL data
Status MaybeHandleStopReplayForCorruptionForInconsistency(
bool stop_replay_for_corruption, uint64_t corrupted_wal_number);

Status MaybeFlushFinalMemtableOrRestoreActiveLogFiles(
const std::vector<uint64_t>& wal_numbers, bool read_only, int job_id,
bool flushed, std::unordered_map<int, VersionEdit>* version_edits,
RecoveryContext* recovery_ctx);

void FinishLogFilesRecovery(int job_id, const Status& status);
// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
// database is opened) and is heavyweight because it holds the mutex
Expand Down
Loading

0 comments on commit fcb7e15

Please sign in to comment.