Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
draft 3
Browse files Browse the repository at this point in the history
hx235 committed Dec 16, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent b3ad90c commit 52df1d3
Showing 11 changed files with 426 additions and 401 deletions.
58 changes: 27 additions & 31 deletions db/column_family.cc
Original file line number Diff line number Diff line change
@@ -1110,65 +1110,61 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
(!rc_db_session_id_to_rc_compactions_map_.empty() ||
(!db_session_id_to_resumable_compactions.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
while (!rc_db_session_id_to_rc_compactions_map_.empty()) {
auto rc_db_session_id_to_rc_compactions_map_it =
rc_db_session_id_to_rc_compactions_map_.begin();
auto rc_db_session_id = rc_db_session_id_to_rc_compactions_map_it->first;
auto rc_compaction_id_to_info_map =
rc_db_session_id_to_rc_compactions_map_it->second;
rc_db_session_id_to_rc_compactions_map_.erase(
rc_db_session_id_to_rc_compactions_map_it);
while (!rc_compaction_id_to_info_map.empty()) {
auto rc_compaction_id_to_info_map_it =
rc_compaction_id_to_info_map.begin();
auto rc_compaction_id = rc_compaction_id_to_info_map_it->first;
auto rc_info = rc_compaction_id_to_info_map_it->second;
rc_compaction_id_to_info_map.erase(rc_compaction_id_to_info_map_it);
while (!db_session_id_to_resumable_compactions.empty()) {
auto iter_1 = db_session_id_to_resumable_compactions.begin();
auto db_session_id = iter_1->first;
ResumableCompactions resumable_compactions = iter_1->second;
db_session_id_to_resumable_compactions.erase(iter_1);
while (!resumable_compactions.empty()) {
auto iter_2 = resumable_compactions.begin();
auto compaction_id = iter_2->first;
ResumableCompaction resumable_compaction = iter_2->second;
resumable_compactions.erase(iter_2);

// Verify input files stil exist thus the compaction needs resumed
std::vector<CompactionInputFiles> inputs;
bool has_error = false;
for (auto& rc_input : rc_info.rc_inputs_) {
CompactionInputFiles input;
input.level = rc_input.first;
assert(rc_input.second.size() > 0);
for (uint64_t rc_input_file_number : rc_input.second) {
for (auto& input : resumable_compaction.inputs) {
CompactionInputFiles input_files;
input_files.level = input.first;
for (uint64_t input_file_number : input.second) {
FileMetaData* file =
current_->storage_info()->GetFileMetaDataByNumber(
rc_input_file_number);
input_file_number);
if (file == nullptr) {
has_error = true;
break;
}
input.files.push_back(file);
input_files.files.push_back(file);
}
if (has_error) {
break;
}
inputs.push_back(input);
inputs.push_back(input_files);
}

if (!has_error && !inputs.empty()) {
std::vector<std::pair<std::string, std::string>> rc_subcompactions =
rc_info.rc_subcompactions_;
Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options,
mutable_db_options, inputs, rc_info.rc_output_level_,
mutable_db_options, inputs, resumable_compaction.output_level,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0,
CompressionType::kNoCompression, CompressionOptions(),
Temperature::kUnknown, std::numeric_limits<uint32_t>::max(), {},
false, "", -1, false, true, CompactionReason::kUnknown,
BlobGarbageCollectionPolicy::kUseDefault, -1,
rc_info.rc_penultimate_level_,
rc_info.rc_penultimate_level_smallest_,
rc_info.rc_penultimate_level_largest_, rc_db_session_id,
rc_compaction_id, rc_info.rc_subcompaction_ids_,
rc_info.rc_subcompactions_);
resumable_compaction.penultimate_output_level,
resumable_compaction.penultimate_output_level_smallest,
resumable_compaction.penultimate_output_level_largest,
db_session_id, compaction_id,
resumable_compaction.subcompaction_id_to_range);
result->FinalizeInputInfo(current_);
return result;
}
131 changes: 49 additions & 82 deletions db/column_family.h
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
#include "db/memtable_list.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "options/cf_options.h"
@@ -549,83 +550,45 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddTempRCInfo(
const std::string& rc_db_session_id, uint64_t rc_job_id,
const std::vector<std::pair<int, std::vector<uint64_t>>>& rc_inputs,
int rc_output_level, int rc_penultimate_level,
const InternalKey& rc_penultimate_level_smallest,
const InternalKey& rc_penultimate_level_largest,
const std::string& rc_subcompaction_start,
const std::string& rc_subcompaction_end) {
assert(!rc_inputs.empty());
for (auto& input : rc_inputs) {
assert(input.second.size() > 0);
void AddResumableCompaction(const ResumableCompactionInfo& info) {
auto iter_1 =
db_session_id_to_resumable_compactions.find(info.db_session_id);
if (iter_1 == db_session_id_to_resumable_compactions.end()) {
db_session_id_to_resumable_compactions.insert({info.db_session_id, {}});
}
ResumableCompactions& resumable_compactions = iter_1->second;

uint64_t rc_compaction_id = rc_job_id >> 32;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t rc_subcompaction_id = rc_job_id & LAST_32_BITS_MASK;
uint64_t compaction_id = info.job_id >> 32;
auto iter_2 = resumable_compactions.find(compaction_id);

auto iter_1 =
rc_db_session_id_to_rc_compactions_map_.find(rc_db_session_id);
if (iter_1 != rc_db_session_id_to_rc_compactions_map_.end()) {
std::map<uint64_t, TempRCInfo>& rc_compaction_id_to_info_map =
iter_1->second;
auto iter_2 = rc_compaction_id_to_info_map.find(rc_compaction_id);
if (iter_2 != rc_compaction_id_to_info_map.end()) {
assert(rc_subcompaction_start.size() != 0 ||
rc_subcompaction_end.size() != 0);
TempRCInfo& rc_compaction_info = iter_2->second;
rc_compaction_info.rc_subcompaction_ids_.push_back(rc_subcompaction_id);
rc_compaction_info.rc_subcompactions_.push_back(
{rc_subcompaction_start, rc_subcompaction_end});
} else {
TempRCInfo rc_compaction_info;
if (rc_subcompaction_start.size() == 0 &&
rc_subcompaction_end.size() == 0) {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{},
{}};
} else {
rc_compaction_info = {
rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{rc_subcompaction_id},
{{rc_subcompaction_start, rc_subcompaction_end}}};
}
rc_compaction_id_to_info_map.insert(
{rc_compaction_id, rc_compaction_info});
bool has_subcompaction = info.subcompaction_start.size() != 0 ||
info.subcompaction_end.size() != 0;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t subcompaction_id = info.job_id & LAST_32_BITS_MASK;

if (iter_2 == resumable_compactions.end()) {
ResumableCompaction resumable_compaction = {
info.inputs,
info.output_level,
info.penultimate_output_level,
info.penultimate_output_level_smallest,
info.penultimate_output_level_largest,
{}};
if (has_subcompaction) {
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
resumable_compactions.insert({compaction_id, resumable_compaction});
} else {
rc_db_session_id_to_rc_compactions_map_.insert({rc_db_session_id, {}});
TempRCInfo rc_compaction_info;
if (rc_subcompaction_start.size() == 0 &&
rc_subcompaction_end.size() == 0) {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{},
{}};
} else {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{rc_subcompaction_id},
{{rc_subcompaction_start, rc_subcompaction_end}}};
}
rc_db_session_id_to_rc_compactions_map_[rc_db_session_id].insert(
{rc_compaction_id, rc_compaction_info});
assert(has_subcompaction);
ResumableCompaction& resumable_compaction = iter_2->second;
assert(resumable_compaction.subcompaction_id_to_range.find(
subcompaction_id) ==
resumable_compaction.subcompaction_id_to_range.end());
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
}

@@ -732,17 +695,21 @@ class ColumnFamilyData {

std::atomic<uint64_t> next_epoch_number_;

struct TempRCInfo {
std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs_;
int rc_output_level_;
int rc_penultimate_level_;
InternalKey rc_penultimate_level_smallest_;
InternalKey rc_penultimate_level_largest_;
std::vector<uint64_t> rc_subcompaction_ids_;
std::vector<std::pair<std::string, std::string>> rc_subcompactions_;
struct ResumableCompaction {
std::vector<std::pair<int, std::vector<uint64_t>>> inputs;
int output_level;
int penultimate_output_level;
InternalKey penultimate_output_level_smallest;
InternalKey penultimate_output_level_largest;
std::map<uint64_t, std::pair<std::string, std::string>>
subcompaction_id_to_range;
};
std::map<std::string, std::map<uint64_t, TempRCInfo>>
rc_db_session_id_to_rc_compactions_map_;

using ResumableCompactions =
std::map<uint64_t /* compaction id */, ResumableCompaction>;

std::map<std::string, ResumableCompactions>
db_session_id_to_resumable_compactions;
};

// ColumnFamilySet has interesting thread-safety requirements
16 changes: 9 additions & 7 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
@@ -287,9 +287,11 @@ Compaction::Compaction(
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff, int penultimate_level,
InternalKey penultimate_level_smallest,
InternalKey penultimate_level_largest, std::string rc_db_session_id,
uint64_t rc_compaction_id, std::vector<uint64_t> rc_subcompaction_ids,
std::vector<std::pair<std::string, std::string>> rc_subcompactions)
InternalKey penultimate_level_largest,
std::string resumable_compaction_db_session_id,
uint64_t resumable_compaction_compaction_id,
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
@@ -349,10 +351,10 @@ Compaction::Compaction(
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)),
rc_db_session_id_(rc_db_session_id),
rc_compaction_id_(rc_compaction_id),
rc_subcompaction_ids_(rc_subcompaction_ids),
rc_subcompactions_(rc_subcompactions) {
resumable_compaction_db_session_id_(resumable_compaction_db_session_id),
resumable_compaction_compaction_id_(resumable_compaction_compaction_id),
resumable_compaction_subcompactions_(
resumable_compaction_subcompactions) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
71 changes: 37 additions & 34 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
@@ -80,29 +80,31 @@ class CompactionFilter;
// A Compaction encapsulates metadata about a compaction.
class Compaction {
public:
Compaction(
VersionStorageInfo* input_version,
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
CompressionOptions compression_opts, Temperature output_temperature,
uint32_t max_subcompactions, std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
bool l0_files_might_overlap = true,
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1, InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string rc_db_session_id = "",
uint64_t rc_compaction_id = std::numeric_limits<uint64_t>::max(),
std::vector<uint64_t> rc_subcompaction_ids = {},
std::vector<std::pair<std::string, std::string>> rc_subcompactions = {});
Compaction(VersionStorageInfo* input_version,
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
CompressionOptions compression_opts,
Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
bool l0_files_might_overlap = true,
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1,
InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string resumable_compaction_db_session_id = "",
uint64_t resumable_compaction_compaction_id =
std::numeric_limits<uint64_t>::max(),
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcmpactions = {});

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
@@ -446,16 +448,17 @@ class Compaction {
return penultimate_level_largest_;
}

std::string GetRCDbSessionID() const { return rc_db_session_id_; }

uint64_t GetRCCompactionId() const { return rc_compaction_id_; }
std::string GetResumableCompactionDbSessionID() const {
return resumable_compaction_db_session_id_;
}

std::vector<uint64_t> GetRCSubompactionIds() const {
return rc_subcompaction_ids_;
uint64_t GetResumableCompactionCompactionId() const {
return resumable_compaction_compaction_id_;
}

std::vector<std::pair<std::string, std::string>> GetRCSubcompactions() const {
return rc_subcompactions_;
std::map<uint64_t, std::pair<std::string, std::string>>
GetResumableCompactionSubcompactions() const {
return resumable_compaction_subcompactions_;
}

private:
@@ -595,10 +598,10 @@ class Compaction {
InternalKey penultimate_level_largest_;
PenultimateOutputRangeType penultimate_output_range_type_ =
PenultimateOutputRangeType::kNotSupported;
std::string rc_db_session_id_;
uint64_t rc_compaction_id_;
std::vector<uint64_t> rc_subcompaction_ids_;
std::vector<std::pair<std::string, std::string>> rc_subcompactions_;
std::string resumable_compaction_db_session_id_;
uint64_t resumable_compaction_compaction_id_;
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions_;
};

#ifndef NDEBUG
Loading

0 comments on commit 52df1d3

Please sign in to comment.