Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 18, 2023
1 parent acc078f commit a37e06b
Show file tree
Hide file tree
Showing 13 changed files with 577 additions and 79 deletions.
63 changes: 61 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <iostream>
#include <limits>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -40,7 +42,6 @@
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/compression.h"

namespace ROCKSDB_NAMESPACE {

ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
Expand Down Expand Up @@ -1109,12 +1110,70 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!rc_db_session_id_to_rc_compactions_map_.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
while (!rc_db_session_id_to_rc_compactions_map_.empty()) {
auto rc_db_session_id_to_rc_compactions_map_it =
rc_db_session_id_to_rc_compactions_map_.begin();
auto rc_db_session_id = rc_db_session_id_to_rc_compactions_map_it->first;
auto rc_compaction_id_to_info_map =
rc_db_session_id_to_rc_compactions_map_it->second;
rc_db_session_id_to_rc_compactions_map_.erase(
rc_db_session_id_to_rc_compactions_map_it);
while (!rc_compaction_id_to_info_map.empty()) {
auto rc_compaction_id_to_info_map_it =
rc_compaction_id_to_info_map.begin();
auto rc_compaction_id = rc_compaction_id_to_info_map_it->first;
auto rc_info = rc_compaction_id_to_info_map_it->second;
rc_compaction_id_to_info_map.erase(rc_compaction_id_to_info_map_it);
std::vector<CompactionInputFiles> inputs;
bool has_error = false;
for (auto& rc_input : rc_info.rc_inputs_) {
CompactionInputFiles input;
input.level = rc_input.first;
assert(rc_input.second.size() > 0);
for (uint64_t rc_input_file_number : rc_input.second) {
FileMetaData* file =
current_->storage_info()->GetFileMetaDataByNumber(
rc_input_file_number);
if (file == nullptr) {
has_error = true;
break;
}
input.files.push_back(file);
}
if (has_error) {
break;
}
inputs.push_back(input);
}
if (!has_error && !inputs.empty()) {
std::vector<std::pair<std::string, std::string>> rc_subcompactions =
rc_info.rc_subcompactions_;
Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options,
mutable_db_options, inputs, rc_info.rc_output_level_,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0,
CompressionType::kNoCompression, CompressionOptions(),
Temperature::kUnknown, std::numeric_limits<uint32_t>::max(), {},
false, "", -1, false, true, CompactionReason::kUnknown,
BlobGarbageCollectionPolicy::kUseDefault, -1,
rc_info.rc_penultimate_level_,
rc_info.rc_penultimate_level_smallest_,
rc_info.rc_penultimate_level_largest_, rc_db_session_id,
rc_compaction_id, rc_info.rc_subcompaction_ids_,
rc_info.rc_subcompactions_);
result->FinalizeInputInfo(current_);
return result;
}
}
}
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Expand Down
103 changes: 98 additions & 5 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,86 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddTempRCInfo(
const std::string& rc_db_session_id, uint64_t rc_job_id,
const std::vector<std::pair<int, std::vector<uint64_t>>>& rc_inputs,
int rc_output_level, int rc_penultimate_level,
const InternalKey& rc_penultimate_level_smallest,
const InternalKey& rc_penultimate_level_largest,
const std::string& rc_subcompaction_start,
const std::string& rc_subcompaction_end) {
assert(!rc_inputs.empty());
for (auto& input : rc_inputs) {
assert(input.second.size() > 0);
}

uint64_t rc_compaction_id = rc_job_id >> 32;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t rc_subcompaction_id = rc_job_id & LAST_32_BITS_MASK;

auto iter_1 =
rc_db_session_id_to_rc_compactions_map_.find(rc_db_session_id);
if (iter_1 != rc_db_session_id_to_rc_compactions_map_.end()) {
std::map<uint64_t, TempRCInfo>& rc_compaction_id_to_info_map =
iter_1->second;
auto iter_2 = rc_compaction_id_to_info_map.find(rc_compaction_id);
if (iter_2 != rc_compaction_id_to_info_map.end()) {
assert(rc_subcompaction_start.size() != 0 ||
rc_subcompaction_end.size() != 0);
TempRCInfo& rc_compaction_info = iter_2->second;
rc_compaction_info.rc_subcompaction_ids_.push_back(rc_subcompaction_id);
rc_compaction_info.rc_subcompactions_.push_back(
{rc_subcompaction_start, rc_subcompaction_end});
} else {
TempRCInfo rc_compaction_info;
if (rc_subcompaction_start.size() == 0 &&
rc_subcompaction_end.size() == 0) {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{},
{}};
} else {
rc_compaction_info = {
rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{rc_subcompaction_id},
{{rc_subcompaction_start, rc_subcompaction_end}}};
}
rc_compaction_id_to_info_map.insert(
{rc_compaction_id, rc_compaction_info});
}
} else {
rc_db_session_id_to_rc_compactions_map_.insert({rc_db_session_id, {}});
TempRCInfo rc_compaction_info;
if (rc_subcompaction_start.size() == 0 &&
rc_subcompaction_end.size() == 0) {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{},
{}};
} else {
rc_compaction_info = {rc_inputs,
rc_output_level,
rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
{rc_subcompaction_id},
{{rc_subcompaction_start, rc_subcompaction_end}}};
}
rc_db_session_id_to_rc_compactions_map_[rc_db_session_id].insert(
{rc_compaction_id, rc_compaction_info});
}
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down Expand Up @@ -604,8 +684,8 @@ class ColumnFamilyData {
std::unique_ptr<ThreadLocalPtr> local_sv_;

// pointers for a circular linked list. we use it to support iterations over
// all column families that are alive (note: dropped column families can also
// be alive as long as client holds a reference)
// all column families that are alive (note: dropped column families can
// also be alive as long as client holds a reference)
ColumnFamilyData* next_;
ColumnFamilyData* prev_;

Expand All @@ -622,7 +702,8 @@ class ColumnFamilyData {

std::unique_ptr<WriteControllerToken> write_controller_token_;

// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
// If true --> this ColumnFamily is currently present in
// DBImpl::flush_queue_
bool queued_for_flush_;

// If true --> this ColumnFamily is currently present in
Expand All @@ -644,12 +725,24 @@ class ColumnFamilyData {

std::string full_history_ts_low_;

// For charging memory usage of file metadata created for newly added files to
// a Version associated with this CFD
// For charging memory usage of file metadata created for newly added files
// to a Version associated with this CFD
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

struct TempRCInfo {
std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs_;
int rc_output_level_;
int rc_penultimate_level_;
InternalKey rc_penultimate_level_smallest_;
InternalKey rc_penultimate_level_largest_;
std::vector<uint64_t> rc_subcompaction_ids_;
std::vector<std::pair<std::string, std::string>> rc_subcompactions_;
};
std::map<std::string, std::map<uint64_t, TempRCInfo>>
rc_db_session_id_to_rc_compactions_map_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
32 changes: 24 additions & 8 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ Compaction::Compaction(
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
double _blob_garbage_collection_age_cutoff, int penultimate_level,
InternalKey penultimate_level_smallest,
InternalKey penultimate_level_largest, std::string rc_db_session_id,
uint64_t rc_compaction_id, std::vector<uint64_t> rc_subcompaction_ids,
std::vector<std::pair<std::string, std::string>> rc_subcompactions)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -334,14 +338,21 @@ Compaction::Compaction(
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
penultimate_level != -1
? penultimate_level
:
// For simplicity, we don't support the concept of "penultimate
// level" with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)) {
start_level_, output_level_)),
rc_db_session_id_(rc_db_session_id),
rc_compaction_id_(rc_compaction_id),
rc_subcompaction_ids_(rc_subcompaction_ids),
rc_subcompactions_(rc_subcompactions) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
Expand Down Expand Up @@ -395,8 +406,13 @@ Compaction::Compaction(
}
}
}

PopulatePenultimateLevelOutputRange();
if (penultimate_level_smallest.size() == 0 ||
penultimate_level_largest.size() == 0) {
penultimate_level_smallest_ = penultimate_level_smallest;
penultimate_level_largest_ = penultimate_level_largest;
} else {
PopulatePenultimateLevelOutputRange();
}
}

void Compaction::PopulatePenultimateLevelOutputRange() {
Expand Down
64 changes: 47 additions & 17 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,29 @@ class CompactionFilter;
// A Compaction encapsulates metadata about a compaction.
class Compaction {
public:
Compaction(VersionStorageInfo* input_version,
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
CompressionOptions compression_opts,
Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
bool l0_files_might_overlap = true,
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
Compaction(
VersionStorageInfo* input_version,
const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
CompressionOptions compression_opts, Temperature output_temperature,
uint32_t max_subcompactions, std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
bool l0_files_might_overlap = true,
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1, InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string rc_db_session_id = "",
uint64_t rc_compaction_id = std::numeric_limits<uint64_t>::max(),
std::vector<uint64_t> rc_subcompaction_ids = {},
std::vector<std::pair<std::string, std::string>> rc_subcompactions = {});

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -432,6 +438,26 @@ class Compaction {
const int start_level,
const int output_level);

InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

std::string GetRCDbSessionID() const { return rc_db_session_id_; }

uint64_t GetRCCompactionId() const { return rc_compaction_id_; }

std::vector<uint64_t> GetRCSubompactionIds() const {
return rc_subcompaction_ids_;
}

std::vector<std::pair<std::string, std::string>> GetRCSubcompactions() const {
return rc_subcompactions_;
}

private:
void SetInputVersion(Version* input_version);

Expand Down Expand Up @@ -569,6 +595,10 @@ class Compaction {
InternalKey penultimate_level_largest_;
PenultimateOutputRangeType penultimate_output_range_type_ =
PenultimateOutputRangeType::kNotSupported;
std::string rc_db_session_id_;
uint64_t rc_compaction_id_;
std::vector<uint64_t> rc_subcompaction_ids_;
std::vector<std::pair<std::string, std::string>> rc_subcompactions_;
};

#ifndef NDEBUG
Expand Down
Loading

0 comments on commit a37e06b

Please sign in to comment.