diff --git a/db/column_family.cc b/db/column_family.cc index 280533993196..b8afc168e58d 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1109,21 +1109,111 @@ void ColumnFamilyData::CreateNewMemtable( bool ColumnFamilyData::NeedsCompaction() const { return !mutable_cf_options_.disable_auto_compactions && - compaction_picker_->NeedsCompaction(current_->storage_info()); + (!resumable_compactions_of_all_db_sessions_.empty() || + compaction_picker_->NeedsCompaction(current_->storage_info())); } Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) { - auto* result = compaction_picker_->PickCompaction( - GetName(), mutable_options, mutable_db_options, current_->storage_info(), - log_buffer); + Compaction* result = + PickResumableCompaction(mutable_options, mutable_db_options); + if (result == nullptr) { + result = compaction_picker_->PickCompaction( + GetName(), mutable_options, mutable_db_options, + current_->storage_info(), log_buffer); + } if (result != nullptr) { result->FinalizeInputInfo(current_); } return result; } +Compaction* ColumnFamilyData::PickResumableCompaction( + const MutableCFOptions& mutable_options, + const MutableDBOptions& mutable_db_options) { + Compaction* result = nullptr; + + while (!resumable_compactions_of_all_db_sessions_.empty()) { + auto iter_1 = resumable_compactions_of_all_db_sessions_.begin(); + auto db_session_id = iter_1->first; + ResumableCompactionsPerDBSession resumable_compactions_per_db_session = + iter_1->second; + while (!resumable_compactions_per_db_session.empty()) { + auto iter_2 = resumable_compactions_per_db_session.begin(); + auto compaction_job_id = iter_2->first; + ResumableCompaction resumable_compaction = iter_2->second; + + resumable_compactions_per_db_session.erase(iter_2); + if (resumable_compactions_per_db_session.empty()) { + resumable_compactions_of_all_db_sessions_.erase(iter_1); + } + + result = GetCompactionFromResumableCompaction( + db_session_id, compaction_job_id, resumable_compaction, + mutable_options, mutable_db_options); + + if (result != nullptr) { + return result; + } + } + } + + return result; +} + +Compaction* ColumnFamilyData::GetCompactionFromResumableCompaction( + std::string& db_session_id, uint64_t compaction_job_id, + ResumableCompaction& resumable_compaction, + const MutableCFOptions& mutable_options, + const MutableDBOptions& mutable_db_options) const { + std::vector inputs; + + for (const auto& resumable_compaction_inputs_per_level : + resumable_compaction.inputs) { + CompactionInputFiles compaction_input_files; + compaction_input_files.level = resumable_compaction_inputs_per_level.first; + if (resumable_compaction_inputs_per_level.second.empty()) { + return nullptr; + } + for (auto compaction_input_file : + resumable_compaction_inputs_per_level.second) { + FileMetaData* file = current_->storage_info()->GetFileMetaDataByNumber( + compaction_input_file); + if (file == nullptr || file->being_compacted) { + return nullptr; + } else { + compaction_input_files.files.push_back(file); + } + } + inputs.push_back(compaction_input_files); + } + + if (compaction_picker_->FilesRangeOverlapWithCompaction( + inputs, resumable_compaction.output_level, + resumable_compaction.penultimate_output_level != -1 + ? resumable_compaction.penultimate_output_level + : Compaction::EvaluatePenultimateLevel( + current_->storage_info(), ioptions_, inputs[0].level, + resumable_compaction.output_level))) { + return nullptr; + } + + Compaction* result = new Compaction( + current_->storage_info(), ioptions_, mutable_options, mutable_db_options, + inputs, resumable_compaction.output_level, + std::numeric_limits::max(), + std::numeric_limits::max(), 0, CompressionType::kNoCompression, + CompressionOptions(), Temperature::kUnknown, + std::numeric_limits::max(), {}, false, "", -1, false, true, + CompactionReason::kUnknown, BlobGarbageCollectionPolicy::kUseDefault, -1, + {db_session_id, compaction_job_id, resumable_compaction}); + + compaction_picker_->RegisterCompaction(result); + + return result; +} + bool ColumnFamilyData::RangeOverlapWithCompaction( const Slice& smallest_user_key, const Slice& largest_user_key, int level) const { @@ -1578,6 +1668,48 @@ void ColumnFamilyData::RecoverEpochNumbers() { vstorage->RecoverEpochNumbers(this); } +void ColumnFamilyData::AddResumableCompaction( + const ResumableCompactionInfo& info) { + const uint64_t compaction_job_id = + GetCompactionJobIdFromCompactionID(info.compaction_id); + const uint64_t subcompaction_job_id = + GetSubCompactionJobIdFromCompactionID(info.compaction_id); + + auto iter_1 = + resumable_compactions_of_all_db_sessions_.find(info.db_session_id); + if (iter_1 == resumable_compactions_of_all_db_sessions_.end()) { + resumable_compactions_of_all_db_sessions_.insert({info.db_session_id, {}}); + } + ResumableCompactionsPerDBSession& resumable_compactions_per_db_session = + resumable_compactions_of_all_db_sessions_[info.db_session_id]; + + auto iter_2 = resumable_compactions_per_db_session.find(compaction_job_id); + if (iter_2 == resumable_compactions_per_db_session.end()) { + ResumableCompaction resumable_compaction = { + info.inputs, + info.output_level, + info.penultimate_output_level, + info.penultimate_output_level_smallest, + info.penultimate_output_level_largest, + {}}; + resumable_compactions_per_db_session.insert( + {compaction_job_id, resumable_compaction}); + } + + const bool has_subcompaction = info.subcompaction_start.size() != 0 || + info.subcompaction_end.size() != 0; + if (has_subcompaction) { + ResumableCompaction& resumable_compaction = + resumable_compactions_per_db_session[compaction_job_id]; + assert(resumable_compaction.subcompaction_job_id_to_range.find( + subcompaction_job_id) == + resumable_compaction.subcompaction_job_id_to_range.end()); + resumable_compaction.subcompaction_job_id_to_range.insert( + {subcompaction_job_id, + {info.subcompaction_start, info.subcompaction_end}}); + } +} + ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, const FileOptions& file_options, diff --git a/db/column_family.h b/db/column_family.h index 2a38feb73107..b356fc53f0ad 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -50,6 +50,23 @@ class BlobFileCache; class BlobSource; extern const double kIncSlowdownRatio; + +struct ResumableCompaction { + std::vector>> inputs; + int output_level; + int penultimate_output_level; + InternalKey penultimate_output_level_smallest; + InternalKey penultimate_output_level_largest; + std::map> + subcompaction_job_id_to_range; +}; + +using ResumableCompactionsPerDBSession = + std::map; + +using ResumableCompactionsOfAllDBSessions = + std::map; + // This file contains a list of data structures for managing column family // level metadata. // @@ -549,6 +566,8 @@ class ColumnFamilyData { // of its files (if missing) void RecoverEpochNumbers(); + void AddResumableCompaction(const ResumableCompactionInfo& info); + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -564,6 +583,16 @@ class ColumnFamilyData { std::vector GetDbPaths() const; + Compaction* PickResumableCompaction( + const MutableCFOptions& mutable_options, + const MutableDBOptions& mutable_db_options); + + Compaction* GetCompactionFromResumableCompaction( + std::string& db_session_id, uint64_t compaction_job_id, + ResumableCompaction& resumable_compaction, + const MutableCFOptions& mutable_options, + const MutableDBOptions& mutable_db_options) const; + uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. @@ -650,6 +679,8 @@ class ColumnFamilyData { bool mempurge_used_; std::atomic next_epoch_number_; + + ResumableCompactionsOfAllDBSessions resumable_compactions_of_all_db_sessions_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index bbab8f79fb56..ba9702ea55b4 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -285,7 +285,9 @@ Compaction::Compaction( bool _deletion_compaction, bool l0_files_might_overlap, CompactionReason _compaction_reason, BlobGarbageCollectionPolicy _blob_garbage_collection_policy, - double _blob_garbage_collection_age_cutoff) + double _blob_garbage_collection_age_cutoff, + std::tuple + resumable_compaction_info) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -333,11 +335,15 @@ Compaction::Compaction( _blob_garbage_collection_age_cutoff > 1 ? mutable_cf_options()->blob_garbage_collection_age_cutoff : _blob_garbage_collection_age_cutoff), + resumable_compaction_info_(resumable_compaction_info), penultimate_level_( - // For simplicity, we don't support the concept of "penultimate level" - // with `CompactionReason::kExternalSstIngestion` and - // `CompactionReason::kRefitLevel` - _compaction_reason == CompactionReason::kExternalSstIngestion || + IsResumableCompaction() + ? GetResumableCompaction().penultimate_output_level + : + // For simplicity, we don't support the concept of "penultimate + // level" with `CompactionReason::kExternalSstIngestion` and + // `CompactionReason::kRefitLevel` + _compaction_reason == CompactionReason::kExternalSstIngestion || _compaction_reason == CompactionReason::kRefitLevel ? Compaction::kInvalidLevel : EvaluatePenultimateLevel(vstorage, immutable_options_, @@ -396,7 +402,15 @@ Compaction::Compaction( } } - PopulatePenultimateLevelOutputRange(); + if (IsResumableCompaction()) { + const ResumableCompaction& resumable_compaction = GetResumableCompaction(); + penultimate_level_smallest_ = + resumable_compaction.penultimate_output_level_smallest; + penultimate_level_largest_ = + resumable_compaction.penultimate_output_level_largest; + } else { + PopulatePenultimateLevelOutputRange(); + } } void Compaction::PopulatePenultimateLevelOutputRange() { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 50c75f70b22c..d4524265588b 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -49,6 +49,20 @@ int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a, int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a, const InternalKey* b); +inline uint64_t CalculateCompactionId(uint64_t compaction_job_id, + uint64_t sub_compaction_job_id) { + return compaction_job_id << 32 | sub_compaction_job_id; +} + +inline uint64_t GetCompactionJobIdFromCompactionID(uint64_t compaction_id) { + return compaction_id >> 32; +} + +inline uint64_t GetSubCompactionJobIdFromCompactionID(uint64_t compaction_id) { + constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU; + return compaction_id & LAST_32_BITS_MASK; +} + // An AtomicCompactionUnitBoundary represents a range of keys [smallest, // largest] that exactly spans one ore more neighbouring SSTs on the same // level. Every pair of SSTs in this range "overlap" (i.e., the largest @@ -96,7 +110,12 @@ class Compaction { CompactionReason compaction_reason = CompactionReason::kUnknown, BlobGarbageCollectionPolicy blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kUseDefault, - double blob_garbage_collection_age_cutoff = -1); + double blob_garbage_collection_age_cutoff = -1, + std::tuple + resumable_compaction_info = + std::tuple( + "", std::numeric_limits::max(), + ResumableCompaction())); // The type of the penultimate level output range enum class PenultimateOutputRangeType : int { @@ -432,6 +451,30 @@ class Compaction { const int start_level, const int output_level); + InternalKey GetPenultimateLevelSmallestKey() const { + return penultimate_level_smallest_; + } + + InternalKey GetPenultimateLevelLargestKey() const { + return penultimate_level_largest_; + } + + bool IsResumableCompaction() const { + return !std::get<0>(resumable_compaction_info_).empty(); + } + + const std::string& GetResumableCompactionDBSessionID() const { + return std::get<0>(resumable_compaction_info_); + } + + uint64_t GetResumableCompactionJobId() const { + return std::get<1>(resumable_compaction_info_); + } + + const ResumableCompaction& GetResumableCompaction() const { + return std::get<2>(resumable_compaction_info_); + } + private: void SetInputVersion(Version* input_version); @@ -558,6 +601,9 @@ class Compaction { // Blob garbage collection age cutoff. double blob_garbage_collection_age_cutoff_; + const std::tuple + resumable_compaction_info_; + // only set when per_key_placement feature is enabled, -1 (kInvalidLevel) // means not supported. const int penultimate_level_; @@ -596,5 +642,4 @@ struct PerKeyPlacementContext { // Return sum of sizes of all files in `files`. extern uint64_t TotalFileSize(const std::vector& files); - } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 99b099759db9..56516e0c2ad7 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -256,28 +256,33 @@ void CompactionJob::Prepare() { write_hint_ = cfd->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); - if (c->ShouldFormSubcompactions()) { - StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); - GenSubcompactionBoundaries(); - } - if (boundaries_.size() >= 1) { - for (size_t i = 0; i <= boundaries_.size(); i++) { - compact_->sub_compact_states.emplace_back( - c, (i != 0) ? std::optional(boundaries_[i - 1]) : std::nullopt, - (i != boundaries_.size()) ? std::optional(boundaries_[i]) - : std::nullopt, - static_cast(i)); - // assert to validate that boundaries don't have same user keys (without - // timestamp part). - assert(i == 0 || i == boundaries_.size() || - cfd->user_comparator()->CompareWithoutTimestamp( - boundaries_[i - 1], boundaries_[i]) < 0); - } - RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, - compact_->sub_compact_states.size()); + if (c->IsResumableCompaction()) { + FormResumableCompactionSubcompactionStates(); } else { - compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt, - /*sub_job_id*/ 0); + if (c->ShouldFormSubcompactions()) { + StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); + GenSubcompactionBoundaries(); + } + if (boundaries_.size() >= 1) { + for (size_t i = 0; i <= boundaries_.size(); i++) { + compact_->sub_compact_states.emplace_back( + c, + (i != 0) ? std::optional(boundaries_[i - 1]) : std::nullopt, + (i != boundaries_.size()) ? std::optional(boundaries_[i]) + : std::nullopt, + static_cast(i)); + // assert to validate that boundaries don't have same user keys (without + // timestamp part). + assert(i == 0 || i == boundaries_.size() || + cfd->user_comparator()->CompareWithoutTimestamp( + boundaries_[i - 1], boundaries_[i]) < 0); + } + RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, + compact_->sub_compact_states.size()); + } else { + compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt, + /*sub_job_id*/ 0); + } } // collect all seqno->time information from the input files which will be used @@ -450,6 +455,34 @@ struct RangeWithSize { : range(a, b), size(s) {} }; +void CompactionJob::FormResumableCompactionSubcompactionStates() { + auto* c = compact_->compaction; + assert(c->IsResumableCompaction()); + + auto& resumable_compaction_subcompactions = + c->GetResumableCompaction().subcompaction_job_id_to_range; + + if (resumable_compaction_subcompactions.empty()) { + compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt, + /*sub_job_id*/ 0); + } else { + uint64_t num_actual_subcompactions = + resumable_compaction_subcompactions.size(); + uint64_t max_subcompactions_limit = GetSubcompactionsLimit(); + AcquireSubcompactionResources( + (int)(num_actual_subcompactions - max_subcompactions_limit)); + for (const auto& subcompaction_job_id_and_range : + resumable_compaction_subcompactions) { + compact_->sub_compact_states.emplace_back( + c, std::optional(subcompaction_job_id_and_range.second.first), + std::optional(subcompaction_job_id_and_range.second.second), + subcompaction_job_id_and_range.first); + } + RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, + compact_->sub_compact_states.size()); + } +} + void CompactionJob::GenSubcompactionBoundaries() { // The goal is to find some boundary keys so that we can evenly partition // the compaction input data into max_subcompactions ranges. @@ -1492,7 +1525,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const { - return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id; + return CalculateCompactionId((uint64_t)job_id_, + (uint64_t)sub_compact->sub_job_id); } void CompactionJob::RecordDroppedKeys( diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index e812cfc72a30..bd4e3b1908ba 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -244,6 +244,8 @@ class CompactionJob { // consecutive groups such that each group has a similar size. void GenSubcompactionBoundaries(); + void FormResumableCompactionSubcompactionStates(); + // Get the number of planned subcompactions based on max_subcompactions and // extra reserved resources uint64_t GetSubcompactionsLimit(); @@ -267,6 +269,8 @@ class CompactionJob { CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService( SubcompactionState* sub_compact); + Status PersistResumableCompactionToManifest(SubcompactionState* sub_compact); + // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); @@ -506,6 +510,7 @@ class CompactionServiceCompactionJob : private CompactionJob { private: // Get table file name in output_path std::string GetTableFileName(uint64_t file_number) override; + // Specific the compaction output path, otherwise it uses default DB path const std::string output_path_; diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index 0556e992754e..cad45ac6faf7 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -15,6 +15,7 @@ #include #include +#include "db/column_family.h" #include "db/compaction/compaction.h" #include "db/version_set.h" #include "options/cf_options.h" @@ -86,13 +87,13 @@ class CompactionPicker { virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const = 0; -// Sanitize the input set of compaction input files. -// When the input parameters do not describe a valid compaction, the -// function will try to fix the input_files by adding necessary -// files. If it's not possible to conver an invalid input_files -// into a valid one by adding more files, the function will return a -// non-ok status with specific reason. -// + // Sanitize the input set of compaction input files. + // When the input parameters do not describe a valid compaction, the + // function will try to fix the input_files by adding necessary + // files. If it's not possible to conver an invalid input_files + // into a valid one by adding more files, the function will return a + // non-ok status with specific reason. + // Status SanitizeCompactionInputFiles(std::unordered_set* input_files, const ColumnFamilyMetaData& cf_meta, const int output_level) const; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 3149bb500258..8470d698c521 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -11,6 +11,7 @@ #include "db/compaction/compaction_job.h" #include "db/compaction/compaction_state.h" #include "logging/logging.h" +#include "monitoring/instrumented_mutex.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/thread_status_util.h" #include "options/options_helper.h" @@ -53,50 +54,70 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.end = compaction_input.has_end ? sub_compact->end->ToString() : ""; - std::string compaction_input_binary; - Status s = compaction_input.Write(&compaction_input_binary); - if (!s.ok()) { - sub_compact->status = s; - return CompactionServiceJobStatus::kFailure; - } + CompactionServiceJobInfo info; + CompactionServiceJobStatus compaction_status; + Status s; + + if (sub_compact->compaction->IsResumableCompaction()) { + uint64_t resumable_compaction_id = CalculateCompactionId( + sub_compact->compaction->GetResumableCompactionJobId(), + (uint64_t)(sub_compact->sub_job_id)); + info = CompactionServiceJobInfo( + dbname_, db_id_, + sub_compact->compaction->GetResumableCompactionDBSessionID(), + resumable_compaction_id, thread_pri_); + } else { + std::string compaction_input_binary; + s = compaction_input.Write(&compaction_input_binary); + if (!s.ok()) { + sub_compact->status = s; + return CompactionServiceJobStatus::kFailure; + } - std::ostringstream input_files_oss; - bool is_first_one = true; - for (const auto& file : compaction_input.input_files) { - input_files_oss << (is_first_one ? "" : ", ") << file; - is_first_one = false; - } + std::ostringstream input_files_oss; + bool is_first_one_in_input = true; + for (const auto& file : compaction_input.input_files) { + input_files_oss << (is_first_one_in_input ? "" : ", ") << file; + is_first_one_in_input = false; + } - ROCKS_LOG_INFO( - db_options_.info_log, - "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", - compaction_input.column_family.name.c_str(), job_id_, - compaction_input.output_level, input_files_oss.str().c_str()); - CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, - GetCompactionId(sub_compact), thread_pri_); - CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->StartV2(info, compaction_input_binary); - switch (compaction_status) { - case CompactionServiceJobStatus::kSuccess: - break; - case CompactionServiceJobStatus::kFailure: - sub_compact->status = Status::Incomplete( - "CompactionService failed to start compaction job."); - ROCKS_LOG_WARN(db_options_.info_log, - "[%s] [JOB %d] Remote compaction failed to start.", - compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; - case CompactionServiceJobStatus::kUseLocal: - ROCKS_LOG_INFO( - db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API Start.", - compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; - default: - assert(false); // unknown status - break; - } + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", + compaction_input.column_family.name.c_str(), job_id_, + compaction_input.output_level, input_files_oss.str().c_str()); + uint64_t compaction_id = GetCompactionId(sub_compact); + info = CompactionServiceJobInfo(dbname_, db_id_, db_session_id_, + compaction_id, thread_pri_); + compaction_status = + db_options_.compaction_service->StartV2(info, compaction_input_binary); + switch (compaction_status) { + case CompactionServiceJobStatus::kSuccess: + break; + case CompactionServiceJobStatus::kFailure: + sub_compact->status = Status::Incomplete( + "CompactionService failed to start compaction job."); + ROCKS_LOG_WARN(db_options_.info_log, + "[%s] [JOB %d] Remote compaction failed to start.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + case CompactionServiceJobStatus::kUseLocal: + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API Start.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + default: + assert(false); // unknown status + break; + } + s = PersistResumableCompactionToManifest(sub_compact); + if (!s.ok()) { + sub_compact->status = s; + return CompactionServiceJobStatus::kFailure; + } + } ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", compaction_input.column_family.name.c_str(), job_id_); @@ -147,10 +168,10 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( sub_compact->status = compaction_result.status; std::ostringstream output_files_oss; - is_first_one = true; + bool is_first_one_in_ouput = true; for (const auto& file : compaction_result.output_files) { - output_files_oss << (is_first_one ? "" : ", ") << file.file_name; - is_first_one = false; + output_files_oss << (is_first_one_in_ouput ? "" : ", ") << file.file_name; + is_first_one_in_ouput = false; } ROCKS_LOG_INFO(db_options_.info_log, @@ -208,6 +229,46 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( return CompactionServiceJobStatus::kSuccess; } +Status CompactionJob::PersistResumableCompactionToManifest( + SubcompactionState* sub_compact) { + const Compaction* compaction = sub_compact->compaction; + const std::vector& inputs = + *(compact_->compaction->inputs()); + assert(!inputs.empty()); + + Status s; + VersionEdit edit; + std::vector>> + resumable_compaction_inputs; + + for (const auto& files_per_level : inputs) { + assert(!files_per_level.empty()); + std::vector files; + for (const auto& file : files_per_level.files) { + files.push_back(file->fd.GetNumber()); + } + resumable_compaction_inputs.emplace_back(files_per_level.level, files); + } + + edit.AddResumableCompactionInfo( + db_session_id_, GetCompactionId(sub_compact), resumable_compaction_inputs, + compaction->output_level(), compaction->GetPenultimateLevel(), + compaction->GetPenultimateLevelSmallestKey(), + compaction->GetPenultimateLevelLargestKey(), + sub_compact->start.has_value() ? sub_compact->start->ToString() : "", + sub_compact->end.has_value() ? sub_compact->end->ToString() : ""); + + { + InstrumentedMutexLock l(db_mutex_); + s = versions_->LogAndApply( + compaction->column_family_data(), + *(compaction->column_family_data()->GetLatestMutableCFOptions()), + ReadOptions(Env::IOActivity::kCompaction), &edit, db_mutex_, + db_directory_); + } + return s; +} + std::string CompactionServiceCompactionJob::GetTableFileName( uint64_t file_number) { return MakeTableFileName(output_path_, file_number); @@ -830,4 +891,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, } #endif // NDEBUG } // namespace ROCKSDB_NAMESPACE - diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 7c87f88d1be0..83224cf3239c 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -3,6 +3,7 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include #include "db/db_test_util.h" #include "port/stack_trace.h" @@ -31,9 +32,11 @@ class MyTestCompactionService : public CompactionService { const char* Name() const override { return kClassName(); } + int start_count = 0; CompactionServiceJobStatus StartV2( const CompactionServiceJobInfo& info, const std::string& compaction_service_input) override { + start_count++; InstrumentedMutexLock l(&mutex_); start_info_ = info; assert(info.db_name == db_path_); @@ -57,8 +60,10 @@ class MyTestCompactionService : public CompactionService { if (i == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } - compaction_input = std::move(i->second); - jobs_.erase(i); + // Hack for resumable remote host + // compaction_input = std::move(i->second); + // jobs_.erase(i); + compaction_input = i->second; } if (is_override_wait_status_) { @@ -227,8 +232,109 @@ class CompactionServiceTest : public DBTestBase { std::shared_ptr compaction_service_; }; +TEST_F(CompactionServiceTest, ResumeBasic1) { + for (bool resume : {true, false}) { + Options options = CurrentOptions(); + options.target_file_size_base = 1 << 5; // 1KB + ReopenWithCompactionService(&options); + + ASSERT_OK(Put("k1", "old")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "new")); + ASSERT_OK(Flush()); + + std::vector metadata; + + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { + // override job status + auto s = static_cast(status); + *s = Status::Aborted("MyTestCompactionService failed to compact!"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Statistics* compactor_statistics = GetCompactorStatistics(); + compactor_statistics->Reset(); + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); + options.resume_compaction = resume; + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CompactWithoutInstallation::HackTest", [&](void*) { + // Check overlap + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (const auto& file : meta.levels[0].files) { + std::string fname = file.db_path + "/" + file.name; + Status s = db_->CompactFiles(CompactionOptions(), {fname}, 1); + ASSERT_NOK(s); + } + }); + auto my_cs = GetCompactionService(); + my_cs->start_count = 0; + Reopen(options); + dbfull()->TEST_WaitForCompact(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + if (resume) { + ASSERT_EQ(my_cs->start_count, 0); + ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1); + } else { + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); + } + } +} + +TEST_F(CompactionServiceTest, ResumeBasic2) { + Options options = CurrentOptions(); + options.target_file_size_base = 1 << 5; // 1KB + ReopenWithCompactionService(&options); + + ASSERT_OK(Put("k1", "old")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "new")); + ASSERT_OK(Flush()); + + std::vector metadata; + + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { + // override job status + auto s = static_cast(status); + *s = Status::Aborted("MyTestCompactionService failed to compact!"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Statistics* compactor_statistics = GetCompactorStatistics(); + compactor_statistics->Reset(); + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); + int count = 0; + port::Thread thread1; + SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { + if (count > 0) { + return; + } + thread1 = port::Thread([&] { + Status s = db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + s.PermitUncheckedError(); + }); + count++; + SystemClock::Default()->SleepForMicroseconds(10000000); + }); + auto my_cs = GetCompactionService(); + my_cs->start_count = 0; + Reopen(options); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(my_cs->start_count, 1); + thread1.join(); +} + TEST_F(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); + options.max_subcompactions = 2; + options.target_file_size_base = 1 << 5; // 1KB ReopenWithCompactionService(&options); Statistics* primary_statistics = GetPrimaryStatistics(); @@ -322,7 +428,10 @@ TEST_F(CompactionServiceTest, BasicCompactions) { assert(*id != kNullUniqueId64x2); verify_passed++; }); + SyncPoint::GetInstance()->ClearCallBack( + "DBImplSecondary::CompactWithoutInstallation::End"); Reopen(options); + dbfull()->TEST_WaitForCompact(); ASSERT_GT(verify_passed, 0); Close(); } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 08812a35bd48..88cd955dcf0f 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -3364,6 +3364,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, prepicked_compaction, bg_thread_pri); + TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (s.IsBusy()) { bg_cv_.SignalAll(); // In case a waiter can proceed despite the error diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 074fa86214ab..f877f76a3287 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -206,6 +206,13 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, "file size check will be skipped during open."); } + if (result.compaction_service == nullptr && !result.resume_compaction) { + result.resume_compaction = false; + ROCKS_LOG_WARN(result.info_log, + "resume_compaction is disabled since it's only supported " + "when remote compaction is used"); + } + return result; } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 235a528ba08a..a35964d747bc 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -914,6 +914,8 @@ Status DBImplSecondary::CompactWithoutInstallation( input.db_id, db_session_id_, secondary_path_, input, result); mutex_.Unlock(); + TEST_SYNC_POINT_CALLBACK( + "DBImplSecondary::CompactWithoutInstallation::HackTest", nullptr); s = compaction_job.Run(); mutex_.Lock(); diff --git a/db/version_edit.cc b/db/version_edit.cc index 6459c2ff863f..c55d9ff0e130 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -318,6 +318,15 @@ bool VersionEdit::EncodeTo(std::string* dst, char p = static_cast(persist_user_defined_timestamps_); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } + + if (has_resumable_compaction_info_) { + PutVarint32(dst, kResumableCompactionInfo); + Status s = resumable_compaction_info_.EncodeTo(dst); + if (!s.ok()) { + return false; + } + } + return true; } @@ -798,6 +807,17 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kResumableCompactionInfo: { + ResumableCompactionInfo resumable_compaction_info; + const Status s = resumable_compaction_info.DecodeFrom(&input); + if (!s.ok()) { + return s; + } + resumable_compaction_info_ = std::move(resumable_compaction_info); + has_resumable_compaction_info_ = true; + break; + } + default: if (tag & kTagSafeIgnoreMask) { // Tag from future which can be safely ignored. @@ -1118,4 +1138,122 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { return jw.Get(); } +Status ResumableCompactionInfo::DecodeFrom(Slice* input) { + while (true) { + uint32_t custom_tag = 0; + Slice slice_field; + if (!GetVarint32(input, &custom_tag)) { + return Status::Corruption( + "Error decoding custom tag for resumable compaction info"); + } + if (custom_tag == ResumableCompactionCustomTag::kRCTerminate) { + break; + } + switch (custom_tag) { + case ResumableCompactionCustomTag::kDBSessionId: + if (!GetLengthPrefixedSlice(input, &slice_field)) { + return Status::Corruption( + "Error decoding db dession id for resumable compaction info"); + + } else { + db_session_id.assign(slice_field.data(), slice_field.size()); + } + break; + case ResumableCompactionCustomTag::kCompactionId: + if (!GetVarint64(input, &compaction_id)) { + return Status::Corruption( + "Error decoding compaction id for resumable compaction info"); + } + break; + case ResumableCompactionCustomTag::kInput: { + uint32_t input_level; + std::vector input_file_numbers; + if (!GetVarint32(input, &input_level)) { + return Status::Corruption( + "Error decoding input level for resumable compaction info"); + } + uint64_t input_length; + if (!GetVarint64(input, &input_length)) { + return Status::Corruption( + "Error decoding input length for resumable compaction info"); + } + if (input_length == 0) { + return Status::Corruption("There is no input file for input level " + + std::to_string(input_level) + + " for resumable compaction info"); + } + for (uint64_t i = 0; i < input_length; ++i) { + uint64_t input_file_number; + if (!GetVarint64(input, &input_file_number)) { + return Status::Corruption( + "Error decoding input file number for resumable compaction " + "info"); + } + input_file_numbers.push_back(input_file_number); + } + inputs.push_back({input_level, input_file_numbers}); + break; + } + case ResumableCompactionCustomTag::kOutputLevel: + uint32_t output_level_decoded; + if (!GetVarint32(input, &output_level_decoded)) { + return Status::Corruption( + "Error decoding output level for resumable compaction info"); + } else { + output_level = output_level_decoded; + } + break; + case ResumableCompactionCustomTag::kPenultimateOutputLevel: + uint32_t penultimate_output_level_decoded; + if (!GetVarint32(input, &penultimate_output_level_decoded)) { + return Status::Corruption( + "Error decoding penultimate output level for resumable " + "compaction info"); + } else { + penultimate_output_level = penultimate_output_level_decoded; + } + break; + case ResumableCompactionCustomTag::kPenultimateOutputLevelSmallest: + if (penultimate_output_level != -1 && + !GetInternalKey(input, &penultimate_output_level_smallest)) { + return Status::Corruption( + "Error decoding penultimate output level smallest for " + "resumable compaction info"); + } + break; + case ResumableCompactionCustomTag::kPenultimateOutputLevelLargest: + if (penultimate_output_level != -1 && + !GetInternalKey(input, &penultimate_output_level_largest)) { + return Status::Corruption( + "Error decoding penultimate output level largest for resumable " + "compaction info"); + } + break; + case ResumableCompactionCustomTag::kSubcompactionStart: + if (!GetLengthPrefixedSlice(input, &slice_field)) { + return Status::Corruption( + "Error decoding subcompaction start for resumable compaction " + "info"); + } else { + subcompaction_start.assign(slice_field.data(), slice_field.size()); + } + break; + case ResumableCompactionCustomTag::kSubcompactionEnd: + if (!GetLengthPrefixedSlice(input, &slice_field)) { + return Status::Corruption( + "Error decoding subcompaction end for resumable compaction " + "info"); + + } else { + subcompaction_end.assign(slice_field.data(), slice_field.size()); + } + break; + default: + // Ignore unknown tags + break; + } + } + return Status::OK(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit.h b/db/version_edit.h index 8e14e76da932..a0d7ff945500 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include #include @@ -23,12 +24,12 @@ #include "port/malloc.h" #include "rocksdb/advanced_cache.h" #include "rocksdb/advanced_options.h" +#include "rocksdb/status.h" #include "table/table_reader.h" #include "table/unique_id_impl.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { - // Tag numbers for serialized VersionEdit. These numbers are written to // disk and should not be changed. The number should be forward compatible so // users can down-grade RocksDB safely. A future Tag is ignored by doing '&' @@ -72,6 +73,7 @@ enum Tag : uint32_t { kWalAddition2, kWalDeletion2, kPersistUserDefinedTimestamps, + kResumableCompactionInfo, }; enum NewFileCustomTag : uint32_t { @@ -103,6 +105,19 @@ enum NewFileCustomTag : uint32_t { kPathId, }; +enum ResumableCompactionCustomTag : uint32_t { + kRCTerminate = 1, // The end of customized fields + kDBSessionId, + kCompactionId, + kInput, + kOutputLevel, + kPenultimateOutputLevel, + kPenultimateOutputLevelSmallest, + kPenultimateOutputLevelLargest, + kSubcompactionStart, + kSubcompactionEnd, +}; + class VersionSet; constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; @@ -383,6 +398,88 @@ struct LevelFilesBrief { } }; +class ResumableCompactionInfo { + public: + Status EncodeTo(std::string* output) const { + if (db_session_id.empty()) { + return Status::Incomplete("db session id should not be empty"); + } + if (output_level == -1) { + return Status::Incomplete( + "Output level should be greater than or equal to 0"); + } + if (penultimate_output_level != -1 && + (!penultimate_output_level_smallest.Valid() || + !penultimate_output_level_largest.Valid())) { + return Status::Incomplete( + "Penultimate output level range should be valid when penultimate " + "output level is valid"); + } + + PutVarint32(output, ResumableCompactionCustomTag::kDBSessionId); + PutLengthPrefixedSlice(output, db_session_id); + + PutVarint32Varint64(output, ResumableCompactionCustomTag::kCompactionId, + compaction_id); + + for (const std::pair>& level_and_input_files : + inputs) { + if (level_and_input_files.second.empty()) { + continue; + } + PutVarint32(output, ResumableCompactionCustomTag::kInput); + PutVarint32(output, level_and_input_files.first /* level */); + PutVarint64(output, level_and_input_files.second.size() /* length */); + for (uint64_t file_number : level_and_input_files.second) { + PutVarint64(output, file_number); + } + } + + PutVarint32Varint32(output, ResumableCompactionCustomTag::kOutputLevel, + output_level); + + PutVarint32Varint32(output, + ResumableCompactionCustomTag::kPenultimateOutputLevel, + penultimate_output_level); + + PutVarint32(output, + ResumableCompactionCustomTag::kPenultimateOutputLevelSmallest); + PutLengthPrefixedSlice(output, + penultimate_output_level_smallest.Valid() + ? penultimate_output_level_smallest.Encode() + : ""); + + PutVarint32(output, + ResumableCompactionCustomTag::kPenultimateOutputLevelLargest); + PutLengthPrefixedSlice(output, + penultimate_output_level_largest.Valid() + ? penultimate_output_level_largest.Encode() + : ""); + + PutVarint32(output, ResumableCompactionCustomTag::kSubcompactionStart); + PutLengthPrefixedSlice(output, subcompaction_start); + + PutVarint32(output, ResumableCompactionCustomTag::kSubcompactionEnd); + PutLengthPrefixedSlice(output, subcompaction_end); + + PutVarint32(output, ResumableCompactionCustomTag::kRCTerminate); + + return Status::OK(); + } + + Status DecodeFrom(Slice* input); + + std::string db_session_id = ""; + uint64_t compaction_id = std::numeric_limits::max(); + std::vector>> inputs = {}; + int output_level = -1; + int penultimate_output_level = -1; + InternalKey penultimate_output_level_smallest = {}; + InternalKey penultimate_output_level_largest = {}; + std::string subcompaction_start = ""; + std::string subcompaction_end = ""; +}; + // The state of a DB at any given time is referred to as a Version. // Any modification to the Version is considered a Version Edit. A Version is // constructed by joining a sequence of Version Edits. Version Edits are written @@ -531,6 +628,40 @@ class VersionEdit { } } + bool HasResumableCompactionInfo() const { + return has_resumable_compaction_info_; + } + + void AddResumableCompactionInfo( + const std::string& db_session_id, uint64_t compaction_id, + const std::vector>>& inputs, + int output_level, int penultimate_output_level, + const InternalKey& penultimate_output_level_smallest, + const InternalKey& penultimate_output_level_largest, + const std::string& subcompaction_start, + const std::string& subcompaction_end) { + if (has_resumable_compaction_info_) { + return; + } + resumable_compaction_info_.db_session_id = db_session_id; + resumable_compaction_info_.compaction_id = compaction_id; + resumable_compaction_info_.inputs = inputs; + resumable_compaction_info_.output_level = output_level; + resumable_compaction_info_.penultimate_output_level = + penultimate_output_level; + resumable_compaction_info_.penultimate_output_level_smallest = + penultimate_output_level_smallest; + resumable_compaction_info_.penultimate_output_level_largest = + penultimate_output_level_largest; + resumable_compaction_info_.subcompaction_start = subcompaction_start; + resumable_compaction_info_.subcompaction_end = subcompaction_end; + has_resumable_compaction_info_ = true; + } + + const ResumableCompactionInfo& GetResumableCompactionInfo() const { + return resumable_compaction_info_; + } + // Add a new blob file. void AddBlobFile(uint64_t blob_file_number, uint64_t total_blob_count, uint64_t total_blob_bytes, std::string checksum_method, @@ -764,6 +895,9 @@ class VersionEdit { // Since table files and blob files share the same file number space, we just // record the file number here. autovector files_to_quarantine_; + + bool has_resumable_compaction_info_ = false; + ResumableCompactionInfo resumable_compaction_info_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 90afc0938c9e..daffb75e63be 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -633,6 +633,10 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const std::string& new_ts = edit.GetFullHistoryTsLow(); cfd->SetFullHistoryTsLow(new_ts); } + if (cfd->ioptions()->resume_compaction && + edit.HasResumableCompactionInfo()) { + cfd->AddResumableCompaction(edit.GetResumableCompactionInfo()); + } } if (s.ok()) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index be4eb8fba27f..3ad9bcad056e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -417,7 +417,7 @@ struct CompactionServiceJobInfo { // different DBs and sessions. Env::Priority priority; - + CompactionServiceJobInfo() {} CompactionServiceJobInfo(std::string db_name_, std::string db_id_, std::string db_session_id_, uint64_t job_id_, Env::Priority priority_) @@ -563,6 +563,8 @@ struct DBOptions { // Default: true bool verify_sst_unique_id_in_manifest = true; + bool resume_compaction = true; + // Use the specified object to interact with the environment, // e.g. to read/write files, schedule background work, etc. In the near // future, support for doing storage operations such as read/write files diff --git a/options/db_options.cc b/options/db_options.cc index ca72404dd21a..c67f09d39c3d 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -692,6 +692,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) options.track_and_verify_wals_in_manifest), verify_sst_unique_id_in_manifest( options.verify_sst_unique_id_in_manifest), + resume_compaction(options.resume_compaction), env(options.env), rate_limiter(options.rate_limiter), sst_file_manager(options.sst_file_manager), diff --git a/options/db_options.h b/options/db_options.h index 701a83febb0c..17a3377ad920 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -28,6 +28,7 @@ struct ImmutableDBOptions { bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; bool verify_sst_unique_id_in_manifest; + bool resume_compaction; Env* env; std::shared_ptr rate_limiter; std::shared_ptr sst_file_manager;