Skip to content

Commit

Permalink
#488: ENG-3917: Occasional delay in deletion of compacted files.
Browse files Browse the repository at this point in the history
Summary:
Here is an excerpt from RocksDB wiki with a description of "Version" data structure:
> The list of files in an LSM tree is kept in a data structure called version. In the end of a compaction or a mem table flush, a new version is created for the updated LSM tree. At one time, there is only one "current" version that represents the files in the up-to-date LSM tree. New get requests or new iterators will use the current version through the whole read process or life cycle of iterator. All versions used by get or iterators need to be kept. An out-of-date version that is not used by any get or iterator needs to be dropped. All files not used by any other version need to be deleted."
> ...
> Both of an SST file and a version have a reference count. While we create a version, we incremented the reference counts for all files. If a version is not needed, all files’ of the version have their reference counts decremented. If a file’s reference count drops to 0, the file can be deleted.
> In a similar way, each version has a reference count. When a version is created, it is an up-to-date one, so it has reference count 1. If the version is not up-to-date anymore, its reference count is decremented. Anyone who needs to work on the version has its reference count incremented by 1, and decremented by 1 when finishing using it. When a version’s reference count is 0, it should be removed. Either a version is up-to-date or someone is using it, its reference count is not 0, so it will be kept.

A compaction job doesn't simply delete its input files. Instead, it finds obsoleted files (ignoring list of input files) and deletes them. When deleting obsolete files it doesn't delete live SST files and pending output files.

There were several cases when deletion of compacted files was delayed:
1) A concurrent flush job is holding input version and therefore all files from this version.
2) At the end of a flush job, RocksDB can schedule a compaction and it starts holding its input version together with all files from this version (not only input files of scheduled compaction).
3) `DBImpl::FindObsoleteFiles` and `DBImplPurgeObsoleteFiles` functions don't delete unreferenced SST files with number greater than or equal to `min_pending_output`, which means that if some job is still writing file #4, already compacted and not used files #5, #6, #7 couldn't be deleted till next compaction which would trigger deleting obsolete files.

This diff includes the following changes to address the issue:
1) Don't hold a version during flush.
2) In case of universal compaction, we don't actually need to hold the whole input version, so in this case we only hold input files and store some minimal information from input version.
3) Instead of relying on `min_pending_output`, utility classes `FileNumbersHolder` and `FileNumbersProvider` were implemented in order to allow tracking of the exact set of pending output files and don't block deletion of other unreferenced SST files.

Test Plan:
- Jenkins.
- Long-running test with CassandraKeyValue workload.
- Use debug check and logs to make sure SST files are deleted no later than 1 second after they were compacted.
- Added unit tests for all 3 cases.

Reviewers: mikhail, venkatesh, amitanand, sergei

Reviewed By: sergei

Subscribers: kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D5526
  • Loading branch information
ttyusupov committed Dec 16, 2018
1 parent c73c885 commit e7ec5c2
Show file tree
Hide file tree
Showing 33 changed files with 1,073 additions and 520 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ std::string TsNameForIndex(int idx) {
return Format("ts-$0", idx + 1);
}

void AssertLoggedWaitFor(std::function<Result<bool>()> condition, MonoDelta timeout,
const string& description, MonoDelta initial_delay) {
LOG(INFO) << description;
ASSERT_OK(WaitFor(condition, timeout, description, initial_delay));
}

} // namespace

class KVTableTsFailoverWriteIfTest : public integration_tests::YBTableTestBase {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/rocksdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(ROCKSDB_SRCS
db/experimental.cc
db/event_helpers.cc
db/file_indexer.cc
db/file_numbers.cc
db/filename.cc
db/flush_job.cc
db/flush_scheduler.cc
Expand Down Expand Up @@ -347,6 +348,7 @@ ADD_YB_TEST(db/db_tailing_iter_test)
ADD_YB_TEST(db/db_test)
ADD_YB_TEST(db/db_test2)
ADD_YB_TEST(db/db_universal_compaction_test)
ADD_YB_TEST(db/db_universal_compaction_deletion_test)
ADD_YB_TEST(db/db_wal_test)
ADD_YB_TEST(db/deletefile_test)
ADD_YB_TEST(db/fault_injection_test)
Expand Down
56 changes: 42 additions & 14 deletions src/yb/rocksdb/db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "yb/rocksdb/db/version_set.h"
#include "yb/rocksdb/util/logging.h"
#include "yb/rocksdb/util/sync_point.h"
#include "yb/util/logging.h"

namespace rocksdb {

Expand Down Expand Up @@ -73,12 +74,30 @@ uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}

void Compaction::SetInputVersion(Version* _input_version) {
input_version_ = _input_version;
cfd_ = input_version_->cfd();
bool Compaction::IsCompactionStyleUniversal() const {
return cfd_->ioptions()->compaction_style == kCompactionStyleUniversal;
}

void Compaction::SetInputVersion(Version* input_version) {
input_version_number_ = input_version->GetVersionNumber();
input_version_level0_non_overlapping_ = input_version->storage_info()->level0_non_overlapping();
vset_ = input_version->version_set();
cfd_ = input_version->cfd();
cfd_->Ref();
input_version_->Ref();

if (IsCompactionStyleUniversal()) {
// We don't need to lock the whole input version for universal compaction, only need input
// files.
for (auto& input_level : inputs_) {
for (auto* f : input_level.files) {
++f->refs;
}
}
} else {
input_version_ = input_version;
input_version_->Ref();
}

edit_.SetColumnFamily(cfd_->GetID());
}

Expand Down Expand Up @@ -239,6 +258,13 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
Compaction::~Compaction() {
if (input_version_ != nullptr) {
input_version_->Unref();
} else if (cfd_ != nullptr) {
// If we don't hold input_version_, unref each input file separately.
for (auto& input_level : inputs_) {
for (auto f : input_level.files) {
vset_->UnrefFile(cfd_, f);
}
}
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
Expand All @@ -248,7 +274,9 @@ Compaction::~Compaction() {
}

bool Compaction::InputCompressionMatchesOutput() const {
int base_level = input_version_->storage_info()->base_level();
int base_level = IsCompactionStyleUniversal()
? -1
: input_version_->storage_info()->base_level();
bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_,
base_level) == output_compression_);
if (matches) {
Expand All @@ -267,8 +295,7 @@ bool Compaction::IsTrivialMove() const {
// filter to be applied to that level, and thus cannot be a trivial move.

// Check if start level have files with overlapping ranges
if (start_level_ == 0 &&
input_version_->storage_info()->level0_non_overlapping() == false) {
if (start_level_ == 0 && !input_version_level0_non_overlapping_) {
// We cannot move files from L0 to L1 if the files are overlapping
return false;
}
Expand Down Expand Up @@ -304,13 +331,13 @@ void Compaction::AddInputDeletions(VersionEdit* out_edit) {

bool Compaction::KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
assert(input_version_ != nullptr);
assert(level_ptrs != nullptr);
assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
if (IsCompactionStyleUniversal()) {
return bottommost_level_;
}
DCHECK_ONLY_NOTNULL(input_version_);
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
Expand Down Expand Up @@ -412,8 +439,10 @@ void Compaction::ReleaseCompactionFiles(Status status) {
}

void Compaction::ResetNextCompactionIndex() {
assert(input_version_ != nullptr);
input_version_->storage_info()->ResetNextCompactionIndex(start_level_);
if (!IsCompactionStyleUniversal()) {
DCHECK_ONLY_NOTNULL(input_version_);
input_version_->storage_info()->ResetNextCompactionIndex(start_level_);
}
}

namespace {
Expand All @@ -440,8 +469,7 @@ void Compaction::Summary(char* output, int len) {
int write =
snprintf(output, len, "Base version %" PRIu64
" Base level %d, inputs: [",
input_version_->GetVersionNumber(),
start_level_);
input_version_number_, start_level_);
if (write < 0 || write >= len) {
return;
}
Expand Down Expand Up @@ -505,7 +533,7 @@ bool Compaction::ShouldFormSubcompactions() const {
}
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
return start_level_ == 0 && !IsOutputLevelEmpty();
} else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
} else if (IsCompactionStyleUniversal()) {
return number_levels_ > 1 && output_level_ > 0;
} else {
return false;
Expand Down
22 changes: 15 additions & 7 deletions src/yb/rocksdb/db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#ifndef ROCKSDB_DB_COMPACTION_H
#define ROCKSDB_DB_COMPACTION_H
#ifndef YB_ROCKSDB_DB_COMPACTION_H
#define YB_ROCKSDB_DB_COMPACTION_H

#pragma once

Expand All @@ -32,9 +32,12 @@
#include "yb/rocksdb/util/autovector.h"
#include "yb/rocksdb/util/mutable_cf_options.h"
#include "yb/rocksdb/db/version_edit.h"
#include "yb/util/result.h"

namespace rocksdb {

using yb::Result;

// The structure that manages compaction input files associated
// with the same physical level.
struct CompactionInputFiles {
Expand Down Expand Up @@ -143,8 +146,7 @@ class Compaction {
return 0;
}

// Returns input version of the compaction
Version* input_version() const { return input_version_; }
uint64_t input_version_number() const { return input_version_number_; }

// Returns the ColumnFamilyData associated with the compaction.
ColumnFamilyData* column_family_data() const { return cfd_; }
Expand Down Expand Up @@ -258,8 +260,9 @@ class Compaction {

uint64_t CalculateTotalInputSize() const;

// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
// In case of compaction error, reset the nextIndex that is used to pick up the next file to be
// compacted from files_by_size_. Does nothing for universal compaction, since nextIndex is not
// used in this case.
void ResetNextCompactionIndex();

// Create a CompactionFilter from compaction_filter_factory
Expand Down Expand Up @@ -313,6 +316,9 @@ class Compaction {
uint64_t max_grandparent_overlap_bytes_;
MutableCFOptions mutable_cf_options_;
Version* input_version_;
uint64_t input_version_number_ = 0;
bool input_version_level0_non_overlapping_ = false;
VersionSet* vset_ = nullptr;
VersionEdit edit_;
const int number_levels_;
ColumnFamilyData* cfd_;
Expand Down Expand Up @@ -350,6 +356,8 @@ class Compaction {
// compaction
bool is_trivial_move_;

bool IsCompactionStyleUniversal() const;

// Does input compression match the output compression?
bool InputCompressionMatchesOutput() const;

Expand All @@ -368,4 +376,4 @@ extern uint64_t TotalFileSize(const std::vector<FileMetaData*>& files);

} // namespace rocksdb

#endif // ROCKSDB_DB_COMPACTION_H
#endif // YB_ROCKSDB_DB_COMPACTION_H
35 changes: 17 additions & 18 deletions src/yb/rocksdb/db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "yb/rocksdb/db/dbformat.h"
#include "yb/rocksdb/db/event_helpers.h"
#include "yb/rocksdb/db/filename.h"
#include "yb/rocksdb/db/file_numbers.h"
#include "yb/rocksdb/db/log_reader.h"
#include "yb/rocksdb/db/log_writer.h"
#include "yb/rocksdb/db/memtable.h"
Expand Down Expand Up @@ -232,6 +233,7 @@ CompactionJob::CompactionJob(
InstrumentedMutex* db_mutex, Status* db_bg_error,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
FileNumbersProvider* file_numbers_provider,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats)
Expand All @@ -253,6 +255,7 @@ CompactionJob::CompactionJob(
db_bg_error_(db_bg_error),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
file_numbers_provider_(file_numbers_provider),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks),
Expand Down Expand Up @@ -475,7 +478,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
}
}

Status CompactionJob::Run() {
Result<FileNumbersHolder> CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
TEST_SYNC_POINT("CompactionJob::Run():Start");
Expand All @@ -489,14 +492,16 @@ Status CompactionJob::Run() {
// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<std::thread> thread_pool;
thread_pool.reserve(num_threads - 1);
FileNumbersHolder file_numbers_holder(file_numbers_provider_->CreateHolder());
file_numbers_holder.Reserve(num_threads);
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, &file_numbers_holder,
&compact_->sub_compact_states[i]);
}

// Always schedule the first subcompaction (whether or not there are also
// others) in the current thread to be efficient with resources
ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
ProcessKeyValueCompaction(&file_numbers_holder, &compact_->sub_compact_states[0]);

// Wait for all other threads (if there are any) to finish execution
for (auto& thread : thread_pool) {
Expand Down Expand Up @@ -537,7 +542,7 @@ Status CompactionJob::Run() {
TEST_SYNC_POINT("CompactionJob::Run():End");

compact_->status = status;
return status;
return std::move(file_numbers_holder);
}

Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
Expand Down Expand Up @@ -614,7 +619,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
return status;
}

void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
void CompactionJob::ProcessKeyValueCompaction(
FileNumbersHolder* holder, SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
std::unique_ptr<InternalIterator> input(
versions_->MakeInputIterator(sub_compact->compaction));
Expand Down Expand Up @@ -705,7 +711,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

// Open output file if necessary
if (sub_compact->builder == nullptr) {
status = OpenCompactionOutputFile(sub_compact);
status = OpenCompactionOutputFile(holder, sub_compact);
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -991,11 +997,10 @@ Status CompactionJob::OpenFile(const std::string table_name, uint64_t file_numbe
}

Status CompactionJob::OpenCompactionOutputFile(
SubcompactionState* sub_compact) {
FileNumbersHolder* holder, SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
assert(sub_compact->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
FileNumber file_number = file_numbers_provider_->NewFileNumber(holder);

// Make the output file
unique_ptr<WritableFile> base_writable_file;
Expand All @@ -1004,14 +1009,8 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->output_path_id());
const std::string data_fname = TableBaseToDataFileName(base_fname);
const std::string table_name = sub_compact->compaction->column_family_data()->GetName();
Status s = OpenFile(table_name, file_number, "base", base_fname, &base_writable_file);
if (!s.ok()) {
return s;
}
s = OpenFile(table_name, file_number, "data", data_fname, &data_writable_file);
if (!s.ok()) {
return s;
}
RETURN_NOT_OK(OpenFile(table_name, file_number, "base", base_fname, &base_writable_file));
RETURN_NOT_OK(OpenFile(table_name, file_number, "data", data_fname, &data_writable_file));

SubcompactionState::Output out;
out.meta.fd =
Expand Down Expand Up @@ -1063,7 +1062,7 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts,
skip_filters));
LogFlush(db_options_.info_log);
return s;
return Status::OK();
}

void CompactionJob::CleanupCompaction() {
Expand Down
Loading

0 comments on commit e7ec5c2

Please sign in to comment.