Skip to content

Commit

Permalink
Improve db recovery
Browse files Browse the repository at this point in the history
Summary: Avoid creating unnecessary sst files while db opening

Test Plan: make all check

Reviewers: sdong, igor

Reviewed By: igor

Subscribers: zagfox, yhchiang, ljin, leveldb

Differential Revision: https://reviews.facebook.net/D20661
  • Loading branch information
StanislavGlebik committed Sep 9, 2014
1 parent 6bb7e3e commit d343c3f
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 103 deletions.
211 changes: 113 additions & 98 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1219,14 +1219,16 @@ Status DBImpl::Recover(
"flag but a log file already exists");
}

// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (const auto& log : logs) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log);
s = RecoverLogFile(log, &max_sequence, read_only);
if (!logs.empty()) {
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
s = RecoverLogFiles(logs, &max_sequence, read_only);
if (!s.ok()) {
// Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable();
}
}
}
SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence());
}
Expand All @@ -1239,8 +1241,9 @@ Status DBImpl::Recover(
return s;
}

Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only) {
// REQUIRES: log_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
Expand All @@ -1256,7 +1259,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
};

mutex_.AssertHeld();

Status status;
std::unordered_map<int, VersionEdit> version_edits;
// no need to refcount because iteration is under mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
Expand All @@ -1265,102 +1268,113 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
version_edits.insert({cfd->GetID(), edit});
}

// Open the log file
std::string fname = LogFileName(db_options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}

// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = db_options_.info_log.get();
reporter.fname = fname.c_str();
reporter.status = (db_options_.paranoid_checks &&
!db_options_.skip_log_error_on_recovery ? &status
: nullptr);
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(std::move(file), &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(db_options_.info_log, "Recovering log #%" PRIu64 "", log_number);

// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log_number);
// Open the log file
std::string fname = LogFileName(db_options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
} else {
// Fail with one log file, but that's ok.
// Try next one.
continue;
}
}
WriteBatchInternal::SetContents(&batch, record);

// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case -- we
// just ignore the update. That's why we set ignore missing column families
// to true
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
true /* ignore missing column families */, log_number);
// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = db_options_.info_log.get();
reporter.fname = fname.c_str();
reporter.status =
(db_options_.paranoid_checks && !db_options_.skip_log_error_on_recovery
? &status
: nullptr);
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(std::move(file), &reporter, true /*checksum*/,
0 /*initial_offset*/);
Log(db_options_.info_log, "Recovering log #%" PRIu64 "", log_number);

// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);

MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), true, log_number);

if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ShouldFlush()) {
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
}
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}

if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ShouldFlush()) {
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
}
cfd->CreateNewMemtable();
}
}
}
}
}

if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
}
}

if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
auto max_log_number = log_numbers.back();
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;

if (cfd->GetLogNumber() > log_number) {
if (cfd->GetLogNumber() > max_log_number) {
// Column family cfd has already flushed the data
// from log_number. Memtable has to be empty because
// from all logs. Memtable has to be empty because
// we filter the updates based on log_number
// (in WriteBatch::InsertInto)
assert(cfd->mem()->GetFirstSequenceNumber() == 0);
Expand All @@ -1371,28 +1385,29 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
}
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
return status;
if (!status.ok()) {
// Recovery failed
break;
}
cfd->CreateNewMemtable();
}

// write MANIFEST with update
// writing log number in the manifest means that any log file
// writing log_number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit->SetLogNumber(log_number + 1);
// Since we already recovered max_log_number, we want all logs
// with numbers `<= max_log_number` (includes this one) to be ignored
edit->SetLogNumber(max_log_number + 1);
// we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(log_number + 1);
versions_->MarkFileNumberUsed(max_log_number + 1);
status = versions_->LogAndApply(cfd, edit, &mutex_);
if (!status.ok()) {
return status;
// Recovery failed
break;
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,9 @@ class DBImpl : public DB {
DeletionState& deletion_state,
LogBuffer* log_buffer);

Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only);

// The following two methods are used to flush a memtable to
// storage. The first one is used atdatabase RecoveryTime (when the
Expand Down
Loading

0 comments on commit d343c3f

Please sign in to comment.