Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 6, 2024
1 parent 5fb3f47 commit f90ae55
Show file tree
Hide file tree
Showing 16 changed files with 691 additions and 35 deletions.
5 changes: 4 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
&error_handler_, &event_logger_,
immutable_db_options_.listeners, dbname_),
lock_wal_count_(0) {
lock_wal_count_(0),
predecessor_wal_log_num_(0),
predecessor_wal_size_bytes_(0),
predecessor_wal_last_seqno_recorded_(kMaxSequenceNumber) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
Expand Down
11 changes: 9 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,9 @@ class DBImpl : public DB {
bool read_only, int job_id, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
uint64_t* prev_log_number, SequenceNumber* prev_log_last_seqno_recorded,
uint64_t* prev_log_size);

void SetupLogFileProcess(uint64_t wal_number);

Expand All @@ -2057,7 +2059,8 @@ class DBImpl : public DB {
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
SequenceNumber* prev_log_last_seqno_recorded);

Status InitializeWriteBatchForLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
Expand Down Expand Up @@ -3066,6 +3069,10 @@ class DBImpl : public DB {
// The number of LockWAL called without matching UnlockWAL call.
// See also lock_wal_write_token_
uint32_t lock_wal_count_;

uint64_t predecessor_wal_log_num_;
uint64_t predecessor_wal_size_bytes_;
SequenceNumber predecessor_wal_last_seqno_recorded_;
};

class GetWithTimestampReadCallback : public ReadCallback {
Expand Down
53 changes: 44 additions & 9 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ Status DBImpl::Recover(
}
}

// Optional - not for repaired db
// if (!is_new_db && wal_files.empty()) {
// return Status::Corruption("Opening an existing DB with no WAL files");
// }

if (immutable_db_options_.track_and_verify_wals_in_manifest) {
if (!immutable_db_options_.best_efforts_recovery) {
// Verify WALs in MANIFEST.
Expand Down Expand Up @@ -1191,13 +1196,18 @@ Status DBImpl::ProcessLogFiles(
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;

uint64_t prev_log_number = 0;
SequenceNumber prev_log_last_seqno_recorded = kMaxSequenceNumber;
uint64_t prev_log_size = 0;

for (auto wal_number : wal_numbers) {
if (status.ok()) {
status =
ProcessLogFile(wal_number, min_wal_number, is_retry, read_only,
job_id, next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed);
status = ProcessLogFile(
wal_number, min_wal_number, is_retry, read_only, job_id,
next_sequence, &stop_replay_for_corruption,
&stop_replay_by_wal_filter, &corrupted_wal_number,
corrupted_wal_found, version_edits, &flushed, &prev_log_number,
&prev_log_last_seqno_recorded, &prev_log_size);
}
}

Expand All @@ -1218,7 +1228,9 @@ Status DBImpl::ProcessLogFile(
int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
uint64_t* prev_log_number, SequenceNumber* prev_log_last_seqno_recorded,
uint64_t* prev_log_size) {
assert(stop_replay_by_wal_filter);

// Variable initialization starts
Expand Down Expand Up @@ -1284,7 +1296,8 @@ Status DBImpl::ProcessLogFile(

bool read_record = reader->ReadRecord(
&record, &scratch, immutable_db_options_.wal_recovery_mode,
&record_checksum);
&record_checksum, stop_replay_for_corruption, &min_wal_number,
prev_log_number, prev_log_last_seqno_recorded, prev_log_size);

// FIXME(hx235): consolidate `read_record` and `status`
if (!read_record || !status.ok()) {
Expand All @@ -1296,7 +1309,7 @@ Status DBImpl::ProcessLogFile(
record, reader, running_ts_sz, wal_number, fname, read_only, job_id,
logFileDropped, &reporter, &record_checksum, next_sequence,
stop_replay_for_corruption, &status, stop_replay_by_wal_filter,
version_edits, flushed);
version_edits, flushed, prev_log_last_seqno_recorded);

if (!process_status.ok()) {
return process_status;
Expand All @@ -1312,6 +1325,16 @@ Status DBImpl::ProcessLogFile(
}

FinishLogFileProcess(next_sequence, status);

if (status.ok()) {
*prev_log_number = wal_number;
assert(*prev_log_last_seqno_recorded);
// *prev_log_last_seqno_recorded = *next_sequence - 1;
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
*prev_log_size = bytes;
}
}
return status;
}

Expand Down Expand Up @@ -1383,11 +1406,13 @@ Status DBImpl::ProcessLogRecord(
uint64_t* record_checksum, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
SequenceNumber* prev_log_last_seqno_recorded) {
assert(reporter);
assert(stop_replay_for_corruption);
assert(status);
assert(stop_replay_by_wal_filter);
assert(prev_log_last_seqno_recorded);

Status process_status;
bool has_valid_writes = false;
Expand Down Expand Up @@ -1462,6 +1487,10 @@ Status DBImpl::ProcessLogRecord(
has_valid_writes, read_only, wal_number, job_id, next_sequence,
version_edits, flushed);

if (process_status.ok()) {
*prev_log_last_seqno_recorded = sequence;
}

return process_status;
}

Expand Down Expand Up @@ -2238,7 +2267,13 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
immutable_db_options_.manual_wal_flush,
immutable_db_options_.wal_compression);
io_s = (*new_log)->AddCompressionTypeRecord(write_options);
if (io_s.ok() && immutable_db_options_.track_predecessor_wal) {
io_s = (*new_log)->MaybeAddPredecessorWALInfoRecord(
write_options, predecessor_wal_log_num_, predecessor_wal_size_bytes_,
predecessor_wal_last_seqno_recorded_);
}
}

return io_s;
}

Expand Down
11 changes: 10 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
if (!io_s.ok()) {
return io_s;
}
io_s = log_writer->AddRecord(write_options, log_entry);
io_s = log_writer->AddRecord(write_options, log_entry,
WriteBatchInternal::Sequence(&merged_batch));

if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock();
Expand Down Expand Up @@ -2269,6 +2270,14 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (creating_new_log) {
// TODO: Write buffer size passed in should be max of all CF's instead
// of mutable_cf_options.write_buffer_size.
if (!logs_.empty()) {
log::Writer* cur_log_writer = logs_.back().writer;
predecessor_wal_log_num_ = cur_log_writer->get_log_number();
predecessor_wal_size_bytes_ = cur_log_writer->file()->GetFileSize();
predecessor_wal_last_seqno_recorded_ =
cur_log_writer->GetLastSeqnoRecorded();
}

io_s = CreateWAL(write_options, new_log_number, recycle_log_number,
preallocate_block_size, &new_log);
if (s.ok()) {
Expand Down
Loading

0 comments on commit f90ae55

Please sign in to comment.