From 08b104c08e3b54e2dcd99ac570bc690a2f7802bd Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Wed, 6 Dec 2023 14:13:03 -0800 Subject: [PATCH] draft: before subcompaction change to temp_rc_infos_ --- db/column_family.cc | 58 +++++++++++++- db/column_family.h | 29 +++++++ db/compaction/compaction.cc | 24 ++++-- db/compaction/compaction.h | 13 +++- db/compaction/compaction_service_job.cc | 37 ++++++++- db/compaction/compaction_service_test.cc | 3 + db/version_edit.cc | 99 ++++++++++++++++++++++++ db/version_edit.h | 66 ++++++++++++++++ db/version_edit_handler.cc | 8 ++ 9 files changed, 325 insertions(+), 12 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 280533993196..158bd6a26813 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -1109,12 +1110,67 @@ void ColumnFamilyData::CreateNewMemtable( bool ColumnFamilyData::NeedsCompaction() const { return !mutable_cf_options_.disable_auto_compactions && - compaction_picker_->NeedsCompaction(current_->storage_info()); + (!temp_rc_infos_.empty() || + compaction_picker_->NeedsCompaction(current_->storage_info())); } Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) { + while (!temp_rc_infos_.empty()) { + // std::vector live_files; + // current_->version_set()->GetLiveFilesMetaData(&live_files); + std::vector inputs; + + auto rc_info = temp_rc_infos_.back(); + temp_rc_infos_.pop_back(); + + bool error = false; + for (auto rc_input : rc_info.rc_inputs_) { + CompactionInputFiles input; + input.level = rc_input.first; + + for (uint64_t rc_input_file_number : rc_input.second) { + // bool found_in_live_files = false; + // for (auto& lf : live_files) { + // if (lf.file_number == rc_input_file_number) { + // found_in_live_files = true; + // break; + // } + // } + // if (!found_in_live_files) { + // error = true; + // break; + // } + FileMetaData* file = current_->storage_info()->GetFileMetaDataByNumber( + rc_input_file_number); + if (file == nullptr) { + error = true; + break; + } + input.files.push_back(file); + } + if (error) { + break; + } + inputs.push_back(input); + } + if (!error) { + Compaction* result = new Compaction( + current_->storage_info(), ioptions_, mutable_options, + mutable_db_options, inputs, rc_info.rc_output_level_, + std::numeric_limits::max(), + std::numeric_limits::max(), 0, + CompressionType::kNoCompression, CompressionOptions(), + Temperature::kUnknown, std::numeric_limits::max(), {}, + false, "", -1, false, true, CompactionReason::kUnknown, + BlobGarbageCollectionPolicy::kUseDefault, -1, + rc_info.rc_penultimate_level_, rc_info.rc_penultimate_level_smallest_, + rc_info.rc_penultimate_level_largest_); + result->FinalizeInputInfo(current_); + return result; + } + } auto* result = compaction_picker_->PickCompaction( GetName(), mutable_options, mutable_db_options, current_->storage_info(), log_buffer); diff --git a/db/column_family.h b/db/column_family.h index 2a38feb73107..8bbc87d8b4cc 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -549,6 +549,21 @@ class ColumnFamilyData { // of its files (if missing) void RecoverEpochNumbers(); + void AddTempRCInfo( + const std::string& rc_db_session_id, uint64_t rc_job_id, + const std::vector>>& rc_inputs, + int rc_output_level, int rc_penultimate_level, + const InternalKey& rc_penultimate_level_smallest, + const InternalKey& rc_penultimate_level_largest, + const std::string& rc_subcompaction_start, + const std::string& rc_subcompaction_end) { + temp_rc_infos_.push_back({rc_db_session_id, rc_job_id, rc_inputs, + rc_output_level, rc_penultimate_level, + rc_penultimate_level_smallest, + rc_penultimate_level_largest, + rc_subcompaction_start, rc_subcompaction_end}); + } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -650,6 +665,20 @@ class ColumnFamilyData { bool mempurge_used_; std::atomic next_epoch_number_; + + struct TempRCInfo { + std::string rc_db_session_id_; + uint64_t rc_job_id_; + std::vector>> rc_inputs_; + int rc_output_level_; + int rc_penultimate_level_; + InternalKey rc_penultimate_level_smallest_; + InternalKey rc_penultimate_level_largest_; + std::string rc_subcompaction_start_; + std::string rc_subcompaction_end_; + }; + + std::vector temp_rc_infos_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index bbab8f79fb56..00b82dbca419 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -285,7 +285,9 @@ Compaction::Compaction( bool _deletion_compaction, bool l0_files_might_overlap, CompactionReason _compaction_reason, BlobGarbageCollectionPolicy _blob_garbage_collection_policy, - double _blob_garbage_collection_age_cutoff) + double _blob_garbage_collection_age_cutoff, int penultimate_level, + InternalKey penultimate_level_smallest, + InternalKey penultimate_level_largest) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -334,10 +336,13 @@ Compaction::Compaction( ? mutable_cf_options()->blob_garbage_collection_age_cutoff : _blob_garbage_collection_age_cutoff), penultimate_level_( - // For simplicity, we don't support the concept of "penultimate level" - // with `CompactionReason::kExternalSstIngestion` and - // `CompactionReason::kRefitLevel` - _compaction_reason == CompactionReason::kExternalSstIngestion || + penultimate_level != -1 + ? penultimate_level + : + // For simplicity, we don't support the concept of "penultimate + // level" with `CompactionReason::kExternalSstIngestion` and + // `CompactionReason::kRefitLevel` + _compaction_reason == CompactionReason::kExternalSstIngestion || _compaction_reason == CompactionReason::kRefitLevel ? Compaction::kInvalidLevel : EvaluatePenultimateLevel(vstorage, immutable_options_, @@ -395,8 +400,13 @@ Compaction::Compaction( } } } - - PopulatePenultimateLevelOutputRange(); + if (penultimate_level_smallest.size() == 0 || + penultimate_level_largest.size() == 0) { + penultimate_level_smallest_ = penultimate_level_smallest; + penultimate_level_largest_ = penultimate_level_largest; + } else { + PopulatePenultimateLevelOutputRange(); + } } void Compaction::PopulatePenultimateLevelOutputRange() { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 50c75f70b22c..be7a00fbbe11 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -96,7 +96,10 @@ class Compaction { CompactionReason compaction_reason = CompactionReason::kUnknown, BlobGarbageCollectionPolicy blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kUseDefault, - double blob_garbage_collection_age_cutoff = -1); + double blob_garbage_collection_age_cutoff = -1, + int penultimate_level = -1, + InternalKey penultimate_level_smallest = {}, + InternalKey penultimate_level_largest = {}); // The type of the penultimate level output range enum class PenultimateOutputRangeType : int { @@ -432,6 +435,14 @@ class Compaction { const int start_level, const int output_level); + InternalKey GetPenultimateLevelSmallestKey() const { + return penultimate_level_smallest_; + } + + InternalKey GetPenultimateLevelLargestKey() const { + return penultimate_level_largest_; + } + private: void SetInputVersion(Version* input_version); diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 3149bb500258..3d0e31660500 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -72,8 +72,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", compaction_input.column_family.name.c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); - CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, - GetCompactionId(sub_compact), thread_pri_); + uint64_t compaction_id = GetCompactionId(sub_compact); + CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, compaction_id, + thread_pri_); CompactionServiceJobStatus compaction_status = db_options_.compaction_service->StartV2(info, compaction_input_binary); switch (compaction_status) { @@ -96,6 +97,37 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( assert(false); // unknown status break; } + // Persist + VersionEdit vc_info; + + std::vector>> rc_inputs; + for (const auto& files_per_level : inputs) { + std::vector files; + for (const auto& file : files_per_level.files) { + files.push_back(file->fd.GetNumber()); + } + rc_inputs.emplace_back(files_per_level.level, files); + } + + vc_info.AddRCInfo( + db_session_id_, compaction_id, rc_inputs, compaction->output_level(), + compaction->GetPenultimateLevel(), + compaction->GetPenultimateLevelSmallestKey(), + compaction->GetPenultimateLevelLargestKey(), + sub_compact->start.has_value() ? sub_compact->start->ToString() : "", + sub_compact->end.has_value() ? sub_compact->end->ToString() : ""); + + { + db_mutex_->Lock(); + s = versions_->LogAndApply( + compaction->column_family_data(), + *(compaction->column_family_data()->GetLatestMutableCFOptions()), + ReadOptions(Env::IOActivity::kCompaction), &vc_info, db_mutex_, + db_directory_); + db_mutex_->Unlock(); + } + + assert(s.ok()); ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", @@ -830,4 +862,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, } #endif // NDEBUG } // namespace ROCKSDB_NAMESPACE - diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 7c87f88d1be0..bf29d4b14998 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -229,6 +229,8 @@ class CompactionServiceTest : public DBTestBase { TEST_F(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); + options.max_subcompactions = 2; + options.target_file_size_base = 1 << 5; // 1KB ReopenWithCompactionService(&options); Statistics* primary_statistics = GetPrimaryStatistics(); @@ -323,6 +325,7 @@ TEST_F(CompactionServiceTest, BasicCompactions) { verify_passed++; }); Reopen(options); + dbfull()->TEST_WaitForCompact(); ASSERT_GT(verify_passed, 0); Close(); } diff --git a/db/version_edit.cc b/db/version_edit.cc index 6459c2ff863f..500d403db4be 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -318,6 +318,53 @@ bool VersionEdit::EncodeTo(std::string* dst, char p = static_cast(persist_user_defined_timestamps_); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } + + if (!rc_db_session_id_.empty()) { + PutVarint32(dst, kRCDBSessionId); + PutLengthPrefixedSlice(dst, rc_db_session_id_); + } + + // if rc_job_id_?? + PutVarint32Varint64(dst, kRCJobId, rc_job_id_); + + if (!rc_inputs_.empty()) { + for (const std::pair>& pair : rc_inputs_) { + PutVarint32(dst, kRCInput); + PutVarint32(dst, pair.first /* level */); + PutVarint64(dst, pair.second.size() /* length */); + for (uint64_t file_number : pair.second) { + PutVarint64(dst, file_number); + } + } + } + + // if rc_output_level_ ?? + PutVarint32Varint32(dst, kRCOutputLevel, rc_output_level_); + + // if rc_penultimate_level_ ?? + PutVarint32Varint32(dst, kRCPenultimateLevel, rc_penultimate_level_); + + if (rc_penultimate_level_smallest_.size() > 0) { + PutVarint32(dst, kRCPenultimateLevelSmallest); + PutLengthPrefixedSlice(dst, rc_penultimate_level_smallest_.Encode()); + } + + if (rc_penultimate_level_largest_.size() > 0) { + PutVarint32(dst, kRCPenultimateLevelLargest); + PutLengthPrefixedSlice(dst, rc_penultimate_level_largest_.Encode()); + } + + // if rc_subcompaction_start_ + if (rc_subcompaction_start_.size() > 0) { + PutVarint32(dst, kRCSubcompactionStart); + PutLengthPrefixedSlice(dst, rc_subcompaction_start_); + } + + // if rc_subcompaction_end_ + if (rc_subcompaction_end_.size() > 0) { + PutVarint32(dst, kRCSubcompactionEnd); + PutLengthPrefixedSlice(dst, rc_subcompaction_end_); + } return true; } @@ -798,6 +845,58 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kRCDBSessionId: + GetLengthPrefixedSlice(&input, &str); + rc_db_session_id_.assign(str.data(), str.size()); + break; + + case kRCJobId: + GetVarint64(&input, &rc_job_id_); + break; + + case kRCInput: + uint32_t rc_input_level; + GetVarint32(&input, &rc_input_level); + rc_inputs_.push_back({rc_input_level, {}}); + uint64_t rc_input_length; + GetVarint64(&input, &rc_input_length); + for (uint64_t i = 0; i < rc_input_length; ++i) { + uint64_t rc_input_file_number; + GetVarint64(&input, &rc_input_file_number); + rc_inputs_.back().second.push_back(rc_input_file_number); + } + break; + + case kRCOutputLevel: + uint32_t rc_output_level; + GetVarint32(&input, &rc_output_level); + rc_output_level_ = rc_output_level; + break; + + case kRCPenultimateLevel: + uint32_t rc_penultimate_level; + GetVarint32(&input, &rc_penultimate_level); + rc_penultimate_level_ = rc_penultimate_level; + break; + + case kRCPenultimateLevelSmallest: + GetInternalKey(&input, &rc_penultimate_level_smallest_); + break; + + case kRCPenultimateLevelLargest: + GetInternalKey(&input, &rc_penultimate_level_largest_); + break; + + case kRCSubcompactionStart: + GetLengthPrefixedSlice(&input, &str); + rc_subcompaction_start_.assign(str.data(), str.size()); + break; + + case kRCSubcompactionEnd: + GetLengthPrefixedSlice(&input, &str); + rc_subcompaction_end_.assign(str.data(), str.size()); + break; + default: if (tag & kTagSafeIgnoreMask) { // Tag from future which can be safely ignored. diff --git a/db/version_edit.h b/db/version_edit.h index 8e14e76da932..034ee3e10aee 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -72,6 +72,15 @@ enum Tag : uint32_t { kWalAddition2, kWalDeletion2, kPersistUserDefinedTimestamps, + kRCDBSessionId, + kRCJobId, + kRCInput, + kRCOutputLevel, + kRCPenultimateLevel, + kRCPenultimateLevelSmallest, + kRCPenultimateLevelLargest, + kRCSubcompactionStart, + kRCSubcompactionEnd, }; enum NewFileCustomTag : uint32_t { @@ -531,6 +540,53 @@ class VersionEdit { } } + void AddRCInfo( + const std::string& rc_db_session_id, uint64_t rc_job_id, + const std::vector>>& rc_inputs, + int rc_output_level, int rc_penultimate_level, + const InternalKey& rc_penultimate_level_smallest, + const InternalKey& rc_penultimate_level_largest, + const std::string& rc_subcompaction_start, + const std::string& rc_subcompaction_end) { + rc_db_session_id_ = rc_db_session_id; + rc_job_id_ = rc_job_id; + rc_inputs_ = rc_inputs; + rc_output_level_ = rc_output_level; + rc_penultimate_level_ = rc_penultimate_level; + rc_penultimate_level_smallest_ = rc_penultimate_level_smallest; + rc_penultimate_level_largest_ = rc_penultimate_level_largest; + rc_subcompaction_start_ = rc_subcompaction_start; + rc_subcompaction_end_ = rc_subcompaction_end; + } + + bool HasRCDBSessionID() const { return !rc_db_session_id_.empty(); } + + std::string GetRCDbSessionID() const { return rc_db_session_id_; } + + uint64_t GetRCJobID() const { return rc_job_id_; } + + std::vector>> GetRCInputs() const { + return rc_inputs_; + } + + int GetRCOutputLevel() const { return rc_output_level_; } + + int GetRCPenultimateLevel() const { return rc_penultimate_level_; } + + InternalKey GetRCPenultimateLevelSmallest() const { + return rc_penultimate_level_smallest_; + } + + InternalKey GetRCPenultimateLevelLargest() const { + return rc_penultimate_level_largest_; + } + + std::string GetRCSubcompactionStart() const { + return rc_subcompaction_start_; + } + + std::string GetRCSubcompactionEnd() const { return rc_subcompaction_end_; } + // Add a new blob file. void AddBlobFile(uint64_t blob_file_number, uint64_t total_blob_count, uint64_t total_blob_bytes, std::string checksum_method, @@ -764,6 +820,16 @@ class VersionEdit { // Since table files and blob files share the same file number space, we just // record the file number here. autovector files_to_quarantine_; + + std::string rc_db_session_id_; + uint64_t rc_job_id_; + std::vector>> rc_inputs_; + int rc_output_level_; + int rc_penultimate_level_; + InternalKey rc_penultimate_level_smallest_; + InternalKey rc_penultimate_level_largest_; + std::string rc_subcompaction_start_; + std::string rc_subcompaction_end_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 90afc0938c9e..d1dfd11e5bb5 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -633,6 +633,14 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const std::string& new_ts = edit.GetFullHistoryTsLow(); cfd->SetFullHistoryTsLow(new_ts); } + if (edit.HasRCDBSessionID()) { + cfd->AddTempRCInfo( + edit.GetRCDbSessionID(), edit.GetRCJobID(), edit.GetRCInputs(), + edit.GetRCOutputLevel(), edit.GetRCPenultimateLevel(), + edit.GetRCPenultimateLevelSmallest(), + edit.GetRCPenultimateLevelLargest(), edit.GetRCSubcompactionStart(), + edit.GetRCSubcompactionEnd()); + } } if (s.ok()) {