Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 22, 2023
1 parent acc078f commit 87cae81
Show file tree
Hide file tree
Showing 17 changed files with 792 additions and 88 deletions.
139 changes: 135 additions & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_source.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
Expand Down Expand Up @@ -1109,21 +1110,111 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!resumable_compactions_of_all_db_sessions_.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Compaction* result =
PickResumableCompaction(mutable_options, mutable_db_options);
if (result == nullptr) {
result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options,
current_->storage_info(), log_buffer);
}
if (result != nullptr) {
result->FinalizeInputInfo(current_);
}
return result;
}

Compaction* ColumnFamilyData::PickResumableCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options) {
Compaction* result = nullptr;

while (!resumable_compactions_of_all_db_sessions_.empty()) {
auto iter_1 = resumable_compactions_of_all_db_sessions_.begin();
auto db_session_id = iter_1->first;
ResumableCompactionsPerDBSession resumable_compactions_per_db_session =
iter_1->second;
while (!resumable_compactions_per_db_session.empty()) {
auto iter_2 = resumable_compactions_per_db_session.begin();
auto compaction_id = iter_2->first;
ResumableCompaction resumable_compaction = iter_2->second;

resumable_compactions_per_db_session.erase(iter_2);
if (resumable_compactions_per_db_session.empty()) {
resumable_compactions_of_all_db_sessions_.erase(iter_1);
}

result = GetCompactionFromResumableCompaction(
db_session_id, compaction_id, resumable_compaction, mutable_options,
mutable_db_options);

if (result != nullptr) {
return result;
}
}
}

return result;
}

Compaction* ColumnFamilyData::GetCompactionFromResumableCompaction(
std::string& db_session_id, uint64_t compaction_id,
ResumableCompaction& resumable_compaction,
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options) const {
std::vector<CompactionInputFiles> inputs;

for (const auto& resumable_compaction_inputs_per_level :
resumable_compaction.inputs) {
CompactionInputFiles compaction_input_files;
compaction_input_files.level = resumable_compaction_inputs_per_level.first;
if (resumable_compaction_inputs_per_level.second.empty()) {
return nullptr;
}
for (auto compaction_input_file :
resumable_compaction_inputs_per_level.second) {
FileMetaData* file = current_->storage_info()->GetFileMetaDataByNumber(
compaction_input_file);
if (file == nullptr || file->being_compacted) {
return nullptr;
} else {
compaction_input_files.files.push_back(file);
}
}
inputs.push_back(compaction_input_files);
}

if (compaction_picker_->FilesRangeOverlapWithCompaction(
inputs, resumable_compaction.output_level,
resumable_compaction.penultimate_output_level != -1
? resumable_compaction.penultimate_output_level
: Compaction::EvaluatePenultimateLevel(
current_->storage_info(), ioptions_, inputs[0].level,
resumable_compaction.output_level))) {
return nullptr;
}

Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options, mutable_db_options,
inputs, resumable_compaction.output_level,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0, CompressionType::kNoCompression,
CompressionOptions(), Temperature::kUnknown,
std::numeric_limits<uint32_t>::max(), {}, false, "", -1, false, true,
CompactionReason::kUnknown, BlobGarbageCollectionPolicy::kUseDefault, -1,
{db_session_id, compaction_id, resumable_compaction});

compaction_picker_->RegisterCompaction(result);

return result;
}

bool ColumnFamilyData::RangeOverlapWithCompaction(
const Slice& smallest_user_key, const Slice& largest_user_key,
int level) const {
Expand Down Expand Up @@ -1578,6 +1669,46 @@ void ColumnFamilyData::RecoverEpochNumbers() {
vstorage->RecoverEpochNumbers(this);
}

void ColumnFamilyData::AddResumableCompaction(
const ResumableCompactionInfo& info) {
const uint64_t compaction_id = info.job_id >> 32;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
const uint64_t subcompaction_id = info.job_id & LAST_32_BITS_MASK;

auto iter_1 =
resumable_compactions_of_all_db_sessions_.find(info.db_session_id);
if (iter_1 == resumable_compactions_of_all_db_sessions_.end()) {
resumable_compactions_of_all_db_sessions_.insert({info.db_session_id, {}});
}
ResumableCompactionsPerDBSession& resumable_compactions_per_db_session =
resumable_compactions_of_all_db_sessions_[info.db_session_id];

auto iter_2 = resumable_compactions_per_db_session.find(compaction_id);
if (iter_2 == resumable_compactions_per_db_session.end()) {
ResumableCompaction resumable_compaction = {
info.inputs,
info.output_level,
info.penultimate_output_level,
info.penultimate_output_level_smallest,
info.penultimate_output_level_largest,
{}};
resumable_compactions_per_db_session.insert(
{compaction_id, resumable_compaction});
}

const bool has_subcompaction = info.subcompaction_start.size() != 0 ||
info.subcompaction_end.size() != 0;
if (has_subcompaction) {
ResumableCompaction& resumable_compaction =
resumable_compactions_per_db_session[compaction_id];
assert(
resumable_compaction.subcompaction_id_to_range.find(subcompaction_id) ==
resumable_compaction.subcompaction_id_to_range.end());
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id, {info.subcompaction_start, info.subcompaction_end}});
}
}

ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const FileOptions& file_options,
Expand Down
31 changes: 31 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ class BlobFileCache;
class BlobSource;

extern const double kIncSlowdownRatio;

struct ResumableCompaction {
std::vector<std::pair<int, std::vector<uint64_t>>> inputs;
int output_level;
int penultimate_output_level;
InternalKey penultimate_output_level_smallest;
InternalKey penultimate_output_level_largest;
std::map<uint64_t, std::pair<std::string, std::string>>
subcompaction_id_to_range;
};

using ResumableCompactionsPerDBSession =
std::map<uint64_t /* compaction id */, ResumableCompaction>;

using ResumableCompactionsOfAllDBSessions =
std::map<std::string /* db session id */, ResumableCompactionsPerDBSession>;

// This file contains a list of data structures for managing column family
// level metadata.
//
Expand Down Expand Up @@ -549,6 +566,8 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddResumableCompaction(const ResumableCompactionInfo& info);

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand All @@ -564,6 +583,16 @@ class ColumnFamilyData {

std::vector<std::string> GetDbPaths() const;

Compaction* PickResumableCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options);

Compaction* GetCompactionFromResumableCompaction(
std::string& db_session_id, uint64_t compaction_id,
ResumableCompaction& resumable_compaction,
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options) const;

uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Expand Down Expand Up @@ -650,6 +679,8 @@ class ColumnFamilyData {
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

ResumableCompactionsOfAllDBSessions resumable_compactions_of_all_db_sessions_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
26 changes: 20 additions & 6 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ Compaction::Compaction(
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
double _blob_garbage_collection_age_cutoff,
std::tuple<std::string, uint64_t, ResumableCompaction>
resumable_compaction_info)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -333,11 +335,15 @@ Compaction::Compaction(
_blob_garbage_collection_age_cutoff > 1
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
resumable_compaction_info_(resumable_compaction_info),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
IsResumableCompaction()
? GetResumableCompaction().penultimate_output_level
:
// For simplicity, we don't support the concept of "penultimate
// level" with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
Expand Down Expand Up @@ -396,7 +402,15 @@ Compaction::Compaction(
}
}

PopulatePenultimateLevelOutputRange();
if (IsResumableCompaction()) {
const ResumableCompaction& resumable_compaction = GetResumableCompaction();
penultimate_level_smallest_ =
resumable_compaction.penultimate_output_level_smallest;
penultimate_level_largest_ =
resumable_compaction.penultimate_output_level_largest;
} else {
PopulatePenultimateLevelOutputRange();
}
}

void Compaction::PopulatePenultimateLevelOutputRange() {
Expand Down
37 changes: 36 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#pragma once
#include <limits>

#include "db/column_family.h"
#include "db/version_set.h"
#include "memory/arena.h"
#include "options/cf_options.h"
Expand Down Expand Up @@ -96,7 +99,12 @@ class Compaction {
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
double blob_garbage_collection_age_cutoff = -1,
std::tuple<std::string, uint64_t, ResumableCompaction>
resumable_compaction_info =
std::tuple<std::string, uint64_t, ResumableCompaction>(
"", std::numeric_limits<uint64_t>::max(),
ResumableCompaction()));

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -432,6 +440,30 @@ class Compaction {
const int start_level,
const int output_level);

InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

bool IsResumableCompaction() const {
return !std::get<0>(resumable_compaction_info_).empty();
}

const std::string& GetResumableCompactionDBSession() const {
return std::get<0>(resumable_compaction_info_);
}

uint64_t GetResumableCompactionJobId() const {
return std::get<1>(resumable_compaction_info_);
}

const ResumableCompaction& GetResumableCompaction() const {
return std::get<2>(resumable_compaction_info_);
}

private:
void SetInputVersion(Version* input_version);

Expand Down Expand Up @@ -558,6 +590,9 @@ class Compaction {
// Blob garbage collection age cutoff.
double blob_garbage_collection_age_cutoff_;

const std::tuple<std::string, uint64_t, ResumableCompaction>
resumable_compaction_info_;

// only set when per_key_placement feature is enabled, -1 (kInvalidLevel)
// means not supported.
const int penultimate_level_;
Expand Down
Loading

0 comments on commit 87cae81

Please sign in to comment.