Skip to content

Commit

Permalink
draft 3
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 16, 2023
1 parent b3ad90c commit 776f06f
Show file tree
Hide file tree
Showing 11 changed files with 436 additions and 405 deletions.
58 changes: 27 additions & 31 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1110,65 +1110,61 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
(!rc_db_session_id_to_rc_compactions_map_.empty() ||
(!db_session_id_to_resumable_compactions.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
while (!rc_db_session_id_to_rc_compactions_map_.empty()) {
auto rc_db_session_id_to_rc_compactions_map_it =
rc_db_session_id_to_rc_compactions_map_.begin();
auto rc_db_session_id = rc_db_session_id_to_rc_compactions_map_it->first;
auto rc_compaction_id_to_info_map =
rc_db_session_id_to_rc_compactions_map_it->second;
rc_db_session_id_to_rc_compactions_map_.erase(
rc_db_session_id_to_rc_compactions_map_it);
while (!rc_compaction_id_to_info_map.empty()) {
auto rc_compaction_id_to_info_map_it =
rc_compaction_id_to_info_map.begin();
auto rc_compaction_id = rc_compaction_id_to_info_map_it->first;
auto rc_info = rc_compaction_id_to_info_map_it->second;
rc_compaction_id_to_info_map.erase(rc_compaction_id_to_info_map_it);
while (!db_session_id_to_resumable_compactions.empty()) {
auto iter_1 = db_session_id_to_resumable_compactions.begin();
auto db_session_id = iter_1->first;
ResumableCompactions resumable_compactions = iter_1->second;
db_session_id_to_resumable_compactions.erase(iter_1);
while (!resumable_compactions.empty()) {
auto iter_2 = resumable_compactions.begin();
auto compaction_id = iter_2->first;
ResumableCompaction resumable_compaction = iter_2->second;
resumable_compactions.erase(iter_2);

// Verify input files stil exist thus the compaction needs resumed
std::vector<CompactionInputFiles> inputs;
bool has_error = false;
for (auto& rc_input : rc_info.rc_inputs_) {
CompactionInputFiles input;
input.level = rc_input.first;
assert(rc_input.second.size() > 0);
for (uint64_t rc_input_file_number : rc_input.second) {
for (auto& input : resumable_compaction.inputs) {
CompactionInputFiles input_files;
input_files.level = input.first;
for (uint64_t input_file_number : input.second) {
FileMetaData* file =
current_->storage_info()->GetFileMetaDataByNumber(
rc_input_file_number);
input_file_number);
if (file == nullptr) {
has_error = true;
break;
}
input.files.push_back(file);
input_files.files.push_back(file);
}
if (has_error) {
break;
}
inputs.push_back(input);
inputs.push_back(input_files);
}

if (!has_error && !inputs.empty()) {
std::vector<std::pair<std::string, std::string>> rc_subcompactions =
rc_info.rc_subcompactions_;
Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options,
mutable_db_options, inputs, rc_info.rc_output_level_,
mutable_db_options, inputs, resumable_compaction.output_level,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0,
CompressionType::kNoCompression, CompressionOptions(),
Temperature::kUnknown, std::numeric_limits<uint32_t>::max(), {},
false, "", -1, false, true, CompactionReason::kUnknown,
BlobGarbageCollectionPolicy::kUseDefault, -1,
rc_info.rc_penultimate_level_,
rc_info.rc_penultimate_level_smallest_,
rc_info.rc_penultimate_level_largest_, rc_db_session_id,
rc_compaction_id, rc_info.rc_subcompaction_ids_,
rc_info.rc_subcompactions_);
resumable_compaction.penultimate_output_level,
resumable_compaction.penultimate_output_level_smallest,
resumable_compaction.penultimate_output_level_largest,
db_session_id, compaction_id,
resumable_compaction.subcompaction_id_to_range);
result->FinalizeInputInfo(current_);
return result;
}
Expand Down
131 changes: 49 additions & 82 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "db/memtable_list.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "options/cf_options.h"
Expand Down Expand Up @@ -549,83 +550,45 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddTempRCInfo(
const std::string& rc_db_session_id, uint64_t rc_job_id,
const std::vector<std::pair<int, std::vector<uint64_t>>>& rc_inputs,
int rc_output_level, int rc_penultimate_level,
const InternalKey& rc_penultimate_level_smallest,
const InternalKey& rc_penultimate_level_largest,
const std::string& rc_subcompaction_start,
const std::string& rc_subcompaction_end) {
assert(!rc_inputs.empty());
for (auto& input : rc_inputs) {
assert(input.second.size() > 0);
void AddResumableCompaction(const ResumableCompactionInfo& info) {
auto iter_1 =
db_session_id_to_resumable_compactions.find(info.db_session_id);
if (iter_1 == db_session_id_to_resumable_compactions.end()) {
db_session_id_to_resumable_compactions.insert({info.db_session_id, {}});
}
ResumableCompactions& resumable_compactions = iter_1->second;

uint64_t rc_compaction_id = rc_job_id >> 32;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t rc_subcompaction_id = rc_job_id & LAST_32_BITS_MASK;
uint64_t compaction_id = info.job_id >> 32;
auto iter_2 = resumable_compactions.find(compaction_id);

auto iter_1 =
rc_db_session_id_to_rc_compactions_map_.find(rc_db_session_id);
if (iter_1 != rc_db_session_id_to_rc_compactions_map_.end()) {
std::map<uint64_t, TempRCInfo>& rc_compaction_id_to_info_map =
iter_1->second;
auto iter_2 = rc_compaction_id_to_info_map.find(rc_compaction_id);
if (iter_2 != rc_compaction_id_to_info_map.end()) {
assert(rc_subcompaction_start.size() != 0 ||
rc_subcompaction_end.size() != 0);
TempRCInfo& rc_compaction_info = iter_2->second;
rc_compaction_info.rc_subcompaction_ids_.push_back(rc_subcompaction_id);
rc_compaction_info.rc_subcompactions_.push_back(
{rc_subcompaction_start, rc_subcompaction_end});
} else {
TempRCInfo rc_compaction_info;
if (rc_subcompaction_start.size() == 0 &&
rc_subcompaction_end.size() == 0) {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{},
{}};
} else {
rc_compaction_info = {
rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{rc_subcompaction_id},
{{rc_subcompaction_start, rc_subcompaction_end}}};
}
rc_compaction_id_to_info_map.insert(
{rc_compaction_id, rc_compaction_info});
bool has_subcompaction = info.subcompaction_start.size() != 0 ||
info.subcompaction_end.size() != 0;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t subcompaction_id = info.job_id & LAST_32_BITS_MASK;

if (iter_2 == resumable_compactions.end()) {
ResumableCompaction resumable_compaction = {
info.inputs,
info.output_level,
info.penultimate_output_level,
info.penultimate_output_level_smallest,
info.penultimate_output_level_largest,
{}};
if (has_subcompaction) {
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
resumable_compactions.insert({compaction_id, resumable_compaction});
} else {
rc_db_session_id_to_rc_compactions_map_.insert({rc_db_session_id, {}});
TempRCInfo rc_compaction_info;
if (rc_subcompaction_start.size() == 0 &&
rc_subcompaction_end.size() == 0) {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{},
{}};
} else {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{rc_subcompaction_id},
{{rc_subcompaction_start, rc_subcompaction_end}}};
}
rc_db_session_id_to_rc_compactions_map_[rc_db_session_id].insert(
{rc_compaction_id, rc_compaction_info});
assert(has_subcompaction);
ResumableCompaction& resumable_compaction = iter_2->second;
assert(resumable_compaction.subcompaction_id_to_range.find(
subcompaction_id) ==
resumable_compaction.subcompaction_id_to_range.end());
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
}

Expand Down Expand Up @@ -732,17 +695,21 @@ class ColumnFamilyData {

std::atomic<uint64_t> next_epoch_number_;

struct TempRCInfo {
std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs_;
int rc_output_level_;
int rc_penultimate_level_;
InternalKey rc_penultimate_level_smallest_;
InternalKey rc_penultimate_level_largest_;
std::vector<uint64_t> rc_subcompaction_ids_;
std::vector<std::pair<std::string, std::string>> rc_subcompactions_;
struct ResumableCompaction {
std::vector<std::pair<int, std::vector<uint64_t>>> inputs;
int output_level;
int penultimate_output_level;
InternalKey penultimate_output_level_smallest;
InternalKey penultimate_output_level_largest;
std::map<uint64_t, std::pair<std::string, std::string>>
subcompaction_id_to_range;
};
std::map<std::string, std::map<uint64_t, TempRCInfo>>
rc_db_session_id_to_rc_compactions_map_;

using ResumableCompactions =
std::map<uint64_t /* compaction id */, ResumableCompaction>;

std::map<std::string, ResumableCompactions>
db_session_id_to_resumable_compactions;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
16 changes: 9 additions & 7 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,11 @@ Compaction::Compaction(
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff, int penultimate_level,
InternalKey penultimate_level_smallest,
InternalKey penultimate_level_largest, std::string rc_db_session_id,
uint64_t rc_compaction_id, std::vector<uint64_t> rc_subcompaction_ids,
std::vector<std::pair<std::string, std::string>> rc_subcompactions)
InternalKey penultimate_level_largest,
std::string resumable_compaction_db_session_id,
uint64_t resumable_compaction_compaction_id,
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -349,10 +351,10 @@ Compaction::Compaction(
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)),
rc_db_session_id_(rc_db_session_id),
rc_compaction_id_(rc_compaction_id),
rc_subcompaction_ids_(rc_subcompaction_ids),
rc_subcompactions_(rc_subcompactions) {
resumable_compaction_db_session_id_(resumable_compaction_db_session_id),
resumable_compaction_compaction_id_(resumable_compaction_compaction_id),
resumable_compaction_subcompactions_(
resumable_compaction_subcompactions) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
Expand Down
71 changes: 37 additions & 34 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,31 @@ class CompactionFilter;
// A Compaction encapsulates metadata about a compaction.
class Compaction {
public:
Compaction(
VersionStorageInfo* input_version,
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
CompressionOptions compression_opts, Temperature output_temperature,
uint32_t max_subcompactions, std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
bool l0_files_might_overlap = true,
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1, InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string rc_db_session_id = "",
uint64_t rc_compaction_id = std::numeric_limits<uint64_t>::max(),
std::vector<uint64_t> rc_subcompaction_ids = {},
std::vector<std::pair<std::string, std::string>> rc_subcompactions = {});
Compaction(VersionStorageInfo* input_version,
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
CompressionOptions compression_opts,
Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
bool l0_files_might_overlap = true,
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1,
InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string resumable_compaction_db_session_id = "",
uint64_t resumable_compaction_compaction_id =
std::numeric_limits<uint64_t>::max(),
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcmpactions = {});

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -446,16 +448,17 @@ class Compaction {
return penultimate_level_largest_;
}

std::string GetRCDbSessionID() const { return rc_db_session_id_; }

uint64_t GetRCCompactionId() const { return rc_compaction_id_; }
std::string GetResumableCompactionDbSessionID() const {
return resumable_compaction_db_session_id_;
}

std::vector<uint64_t> GetRCSubompactionIds() const {
return rc_subcompaction_ids_;
uint64_t GetResumableCompactionCompactionId() const {
return resumable_compaction_compaction_id_;
}

std::vector<std::pair<std::string, std::string>> GetRCSubcompactions() const {
return rc_subcompactions_;
std::map<uint64_t, std::pair<std::string, std::string>>
GetResumableCompactionSubcompactions() const {
return resumable_compaction_subcompactions_;
}

private:
Expand Down Expand Up @@ -595,10 +598,10 @@ class Compaction {
InternalKey penultimate_level_largest_;
PenultimateOutputRangeType penultimate_output_range_type_ =
PenultimateOutputRangeType::kNotSupported;
std::string rc_db_session_id_;
uint64_t rc_compaction_id_;
std::vector<uint64_t> rc_subcompaction_ids_;
std::vector<std::pair<std::string, std::string>> rc_subcompactions_;
std::string resumable_compaction_db_session_id_;
uint64_t resumable_compaction_compaction_id_;
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions_;
};

#ifndef NDEBUG
Expand Down
Loading

0 comments on commit 776f06f

Please sign in to comment.