Skip to content

Commit

Permalink
draft: before subcompaction change to temp_rc_infos_
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 6, 2023
1 parent acc078f commit 08b104c
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 12 deletions.
58 changes: 57 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <limits>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -1109,12 +1110,67 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!temp_rc_infos_.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
while (!temp_rc_infos_.empty()) {
// std::vector<LiveFileMetaData> live_files;
// current_->version_set()->GetLiveFilesMetaData(&live_files);
std::vector<CompactionInputFiles> inputs;

auto rc_info = temp_rc_infos_.back();
temp_rc_infos_.pop_back();

bool error = false;
for (auto rc_input : rc_info.rc_inputs_) {
CompactionInputFiles input;
input.level = rc_input.first;

for (uint64_t rc_input_file_number : rc_input.second) {
// bool found_in_live_files = false;
// for (auto& lf : live_files) {
// if (lf.file_number == rc_input_file_number) {
// found_in_live_files = true;
// break;
// }
// }
// if (!found_in_live_files) {
// error = true;
// break;
// }
FileMetaData* file = current_->storage_info()->GetFileMetaDataByNumber(
rc_input_file_number);
if (file == nullptr) {
error = true;
break;
}
input.files.push_back(file);
}
if (error) {
break;
}
inputs.push_back(input);
}
if (!error) {
Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options,
mutable_db_options, inputs, rc_info.rc_output_level_,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0,
CompressionType::kNoCompression, CompressionOptions(),
Temperature::kUnknown, std::numeric_limits<uint32_t>::max(), {},
false, "", -1, false, true, CompactionReason::kUnknown,
BlobGarbageCollectionPolicy::kUseDefault, -1,
rc_info.rc_penultimate_level_, rc_info.rc_penultimate_level_smallest_,
rc_info.rc_penultimate_level_largest_);
result->FinalizeInputInfo(current_);
return result;
}
}
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Expand Down
29 changes: 29 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,21 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddTempRCInfo(
const std::string& rc_db_session_id, uint64_t rc_job_id,
const std::vector<std::pair<int, std::vector<uint64_t>>>& rc_inputs,
int rc_output_level, int rc_penultimate_level,
const InternalKey& rc_penultimate_level_smallest,
const InternalKey& rc_penultimate_level_largest,
const std::string& rc_subcompaction_start,
const std::string& rc_subcompaction_end) {
temp_rc_infos_.push_back({rc_db_session_id, rc_job_id, rc_inputs,
rc_output_level, rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
rc_subcompaction_start, rc_subcompaction_end});
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down Expand Up @@ -650,6 +665,20 @@ class ColumnFamilyData {
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

struct TempRCInfo {
std::string rc_db_session_id_;
uint64_t rc_job_id_;
std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs_;
int rc_output_level_;
int rc_penultimate_level_;
InternalKey rc_penultimate_level_smallest_;
InternalKey rc_penultimate_level_largest_;
std::string rc_subcompaction_start_;
std::string rc_subcompaction_end_;
};

std::vector<TempRCInfo> temp_rc_infos_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
24 changes: 17 additions & 7 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ Compaction::Compaction(
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
double _blob_garbage_collection_age_cutoff, int penultimate_level,
InternalKey penultimate_level_smallest,
InternalKey penultimate_level_largest)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -334,10 +336,13 @@ Compaction::Compaction(
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
penultimate_level != -1
? penultimate_level
:
// For simplicity, we don't support the concept of "penultimate
// level" with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
Expand Down Expand Up @@ -395,8 +400,13 @@ Compaction::Compaction(
}
}
}

PopulatePenultimateLevelOutputRange();
if (penultimate_level_smallest.size() == 0 ||
penultimate_level_largest.size() == 0) {
penultimate_level_smallest_ = penultimate_level_smallest;
penultimate_level_largest_ = penultimate_level_largest;
} else {
PopulatePenultimateLevelOutputRange();
}
}

void Compaction::PopulatePenultimateLevelOutputRange() {
Expand Down
13 changes: 12 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ class Compaction {
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1,
InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {});

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -432,6 +435,14 @@ class Compaction {
const int start_level,
const int output_level);

InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

private:
void SetInputVersion(Version* input_version);

Expand Down
37 changes: 34 additions & 3 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
uint64_t compaction_id = GetCompactionId(sub_compact);
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, compaction_id,
thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
Expand All @@ -96,6 +97,37 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
assert(false); // unknown status
break;
}
// Persist
VersionEdit vc_info;

std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs;
for (const auto& files_per_level : inputs) {
std::vector<uint64_t> files;
for (const auto& file : files_per_level.files) {
files.push_back(file->fd.GetNumber());
}
rc_inputs.emplace_back(files_per_level.level, files);
}

vc_info.AddRCInfo(
db_session_id_, compaction_id, rc_inputs, compaction->output_level(),
compaction->GetPenultimateLevel(),
compaction->GetPenultimateLevelSmallestKey(),
compaction->GetPenultimateLevelLargestKey(),
sub_compact->start.has_value() ? sub_compact->start->ToString() : "",
sub_compact->end.has_value() ? sub_compact->end->ToString() : "");

{
db_mutex_->Lock();
s = versions_->LogAndApply(
compaction->column_family_data(),
*(compaction->column_family_data()->GetLatestMutableCFOptions()),
ReadOptions(Env::IOActivity::kCompaction), &vc_info, db_mutex_,
db_directory_);
db_mutex_->Unlock();
}

assert(s.ok());

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
Expand Down Expand Up @@ -830,4 +862,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE

3 changes: 3 additions & 0 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class CompactionServiceTest : public DBTestBase {

TEST_F(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
options.max_subcompactions = 2;
options.target_file_size_base = 1 << 5; // 1KB
ReopenWithCompactionService(&options);

Statistics* primary_statistics = GetPrimaryStatistics();
Expand Down Expand Up @@ -323,6 +325,7 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
verify_passed++;
});
Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_GT(verify_passed, 0);
Close();
}
Expand Down
99 changes: 99 additions & 0 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,53 @@ bool VersionEdit::EncodeTo(std::string* dst,
char p = static_cast<char>(persist_user_defined_timestamps_);
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}

if (!rc_db_session_id_.empty()) {
PutVarint32(dst, kRCDBSessionId);
PutLengthPrefixedSlice(dst, rc_db_session_id_);
}

// if rc_job_id_??
PutVarint32Varint64(dst, kRCJobId, rc_job_id_);

if (!rc_inputs_.empty()) {
for (const std::pair<int, std::vector<uint64_t>>& pair : rc_inputs_) {
PutVarint32(dst, kRCInput);
PutVarint32(dst, pair.first /* level */);
PutVarint64(dst, pair.second.size() /* length */);
for (uint64_t file_number : pair.second) {
PutVarint64(dst, file_number);
}
}
}

// if rc_output_level_ ??
PutVarint32Varint32(dst, kRCOutputLevel, rc_output_level_);

// if rc_penultimate_level_ ??
PutVarint32Varint32(dst, kRCPenultimateLevel, rc_penultimate_level_);

if (rc_penultimate_level_smallest_.size() > 0) {
PutVarint32(dst, kRCPenultimateLevelSmallest);
PutLengthPrefixedSlice(dst, rc_penultimate_level_smallest_.Encode());
}

if (rc_penultimate_level_largest_.size() > 0) {
PutVarint32(dst, kRCPenultimateLevelLargest);
PutLengthPrefixedSlice(dst, rc_penultimate_level_largest_.Encode());
}

// if rc_subcompaction_start_
if (rc_subcompaction_start_.size() > 0) {
PutVarint32(dst, kRCSubcompactionStart);
PutLengthPrefixedSlice(dst, rc_subcompaction_start_);
}

// if rc_subcompaction_end_
if (rc_subcompaction_end_.size() > 0) {
PutVarint32(dst, kRCSubcompactionEnd);
PutLengthPrefixedSlice(dst, rc_subcompaction_end_);
}
return true;
}

Expand Down Expand Up @@ -798,6 +845,58 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kRCDBSessionId:
GetLengthPrefixedSlice(&input, &str);
rc_db_session_id_.assign(str.data(), str.size());
break;

case kRCJobId:
GetVarint64(&input, &rc_job_id_);
break;

case kRCInput:
uint32_t rc_input_level;
GetVarint32(&input, &rc_input_level);
rc_inputs_.push_back({rc_input_level, {}});
uint64_t rc_input_length;
GetVarint64(&input, &rc_input_length);
for (uint64_t i = 0; i < rc_input_length; ++i) {
uint64_t rc_input_file_number;
GetVarint64(&input, &rc_input_file_number);
rc_inputs_.back().second.push_back(rc_input_file_number);
}
break;

case kRCOutputLevel:
uint32_t rc_output_level;
GetVarint32(&input, &rc_output_level);
rc_output_level_ = rc_output_level;
break;

case kRCPenultimateLevel:
uint32_t rc_penultimate_level;
GetVarint32(&input, &rc_penultimate_level);
rc_penultimate_level_ = rc_penultimate_level;
break;

case kRCPenultimateLevelSmallest:
GetInternalKey(&input, &rc_penultimate_level_smallest_);
break;

case kRCPenultimateLevelLargest:
GetInternalKey(&input, &rc_penultimate_level_largest_);
break;

case kRCSubcompactionStart:
GetLengthPrefixedSlice(&input, &str);
rc_subcompaction_start_.assign(str.data(), str.size());
break;

case kRCSubcompactionEnd:
GetLengthPrefixedSlice(&input, &str);
rc_subcompaction_end_.assign(str.data(), str.size());
break;

default:
if (tag & kTagSafeIgnoreMask) {
// Tag from future which can be safely ignored.
Expand Down
Loading

0 comments on commit 08b104c

Please sign in to comment.