diff --git a/db/column_family.cc b/db/column_family.cc index 280533993196..409cada27264 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -11,6 +11,8 @@ #include #include +#include +#include #include #include #include @@ -40,7 +42,6 @@ #include "util/autovector.h" #include "util/cast_util.h" #include "util/compression.h" - namespace ROCKSDB_NAMESPACE { ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( @@ -1109,12 +1110,66 @@ void ColumnFamilyData::CreateNewMemtable( bool ColumnFamilyData::NeedsCompaction() const { return !mutable_cf_options_.disable_auto_compactions && - compaction_picker_->NeedsCompaction(current_->storage_info()); + (!db_session_id_to_resumable_compactions.empty() || + compaction_picker_->NeedsCompaction(current_->storage_info())); } Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) { + while (!db_session_id_to_resumable_compactions.empty()) { + auto iter_1 = db_session_id_to_resumable_compactions.begin(); + auto db_session_id = iter_1->first; + ResumableCompactions resumable_compactions = iter_1->second; + db_session_id_to_resumable_compactions.erase(iter_1); + while (!resumable_compactions.empty()) { + auto iter_2 = resumable_compactions.begin(); + auto compaction_id = iter_2->first; + ResumableCompaction resumable_compaction = iter_2->second; + resumable_compactions.erase(iter_2); + + // Verify input files stil exist thus the compaction needs resumed + std::vector inputs; + bool has_error = false; + for (auto& input : resumable_compaction.inputs) { + CompactionInputFiles input_files; + input_files.level = input.first; + for (uint64_t input_file_number : input.second) { + FileMetaData* file = + current_->storage_info()->GetFileMetaDataByNumber( + input_file_number); + if (file == nullptr) { + has_error = true; + break; + } + input_files.files.push_back(file); + } + if (has_error) { + break; + } + inputs.push_back(input_files); + } + + if (!has_error && !inputs.empty()) { + Compaction* result = new Compaction( + current_->storage_info(), ioptions_, mutable_options, + mutable_db_options, inputs, resumable_compaction.output_level, + std::numeric_limits::max(), + std::numeric_limits::max(), 0, + CompressionType::kNoCompression, CompressionOptions(), + Temperature::kUnknown, std::numeric_limits::max(), {}, + false, "", -1, false, true, CompactionReason::kUnknown, + BlobGarbageCollectionPolicy::kUseDefault, -1, + resumable_compaction.penultimate_output_level, + resumable_compaction.penultimate_output_level_smallest, + resumable_compaction.penultimate_output_level_largest, + db_session_id, compaction_id, + resumable_compaction.subcompaction_id_to_range); + result->FinalizeInputInfo(current_); + return result; + } + } + } auto* result = compaction_picker_->PickCompaction( GetName(), mutable_options, mutable_db_options, current_->storage_info(), log_buffer); diff --git a/db/column_family.h b/db/column_family.h index 2a38feb73107..2f9beed04bf9 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -18,6 +18,7 @@ #include "db/memtable_list.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" +#include "db/version_edit.h" #include "db/write_batch_internal.h" #include "db/write_controller.h" #include "options/cf_options.h" @@ -549,6 +550,49 @@ class ColumnFamilyData { // of its files (if missing) void RecoverEpochNumbers(); + void AddResumableCompaction(const ResumableCompactionInfo& info) { + auto iter_1 = + db_session_id_to_resumable_compactions.find(info.db_session_id); + if (iter_1 == db_session_id_to_resumable_compactions.end()) { + db_session_id_to_resumable_compactions.insert({info.db_session_id, {}}); + } + ResumableCompactions& resumable_compactions = + db_session_id_to_resumable_compactions[info.db_session_id]; + + uint64_t compaction_id = info.job_id >> 32; + auto iter_2 = resumable_compactions.find(compaction_id); + + bool has_subcompaction = info.subcompaction_start.size() != 0 || + info.subcompaction_end.size() != 0; + constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU; + uint64_t subcompaction_id = info.job_id & LAST_32_BITS_MASK; + + if (iter_2 == resumable_compactions.end()) { + ResumableCompaction resumable_compaction = { + info.inputs, + info.output_level, + info.penultimate_output_level, + info.penultimate_output_level_smallest, + info.penultimate_output_level_largest, + {}}; + if (has_subcompaction) { + resumable_compaction.subcompaction_id_to_range.insert( + {subcompaction_id, + {info.subcompaction_start, info.subcompaction_end}}); + } + resumable_compactions.insert({compaction_id, resumable_compaction}); + } else { + assert(has_subcompaction); + ResumableCompaction& resumable_compaction = iter_2->second; + assert(resumable_compaction.subcompaction_id_to_range.find( + subcompaction_id) == + resumable_compaction.subcompaction_id_to_range.end()); + resumable_compaction.subcompaction_id_to_range.insert( + {subcompaction_id, + {info.subcompaction_start, info.subcompaction_end}}); + } + } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -604,8 +648,8 @@ class ColumnFamilyData { std::unique_ptr local_sv_; // pointers for a circular linked list. we use it to support iterations over - // all column families that are alive (note: dropped column families can also - // be alive as long as client holds a reference) + // all column families that are alive (note: dropped column families can + // also be alive as long as client holds a reference) ColumnFamilyData* next_; ColumnFamilyData* prev_; @@ -622,7 +666,8 @@ class ColumnFamilyData { std::unique_ptr write_controller_token_; - // If true --> this ColumnFamily is currently present in DBImpl::flush_queue_ + // If true --> this ColumnFamily is currently present in + // DBImpl::flush_queue_ bool queued_for_flush_; // If true --> this ColumnFamily is currently present in @@ -644,12 +689,28 @@ class ColumnFamilyData { std::string full_history_ts_low_; - // For charging memory usage of file metadata created for newly added files to - // a Version associated with this CFD + // For charging memory usage of file metadata created for newly added files + // to a Version associated with this CFD std::shared_ptr file_metadata_cache_res_mgr_; bool mempurge_used_; std::atomic next_epoch_number_; + + struct ResumableCompaction { + std::vector>> inputs; + int output_level; + int penultimate_output_level; + InternalKey penultimate_output_level_smallest; + InternalKey penultimate_output_level_largest; + std::map> + subcompaction_id_to_range; + }; + + using ResumableCompactions = + std::map; + + std::map + db_session_id_to_resumable_compactions; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index bbab8f79fb56..10367fbb3bb3 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -285,7 +285,13 @@ Compaction::Compaction( bool _deletion_compaction, bool l0_files_might_overlap, CompactionReason _compaction_reason, BlobGarbageCollectionPolicy _blob_garbage_collection_policy, - double _blob_garbage_collection_age_cutoff) + double _blob_garbage_collection_age_cutoff, int penultimate_level, + InternalKey penultimate_level_smallest, + InternalKey penultimate_level_largest, + std::string resumable_compaction_db_session_id, + uint64_t resumable_compaction_compaction_id, + std::map> + resumable_compaction_subcompactions) : input_vstorage_(vstorage), start_level_(_inputs[0].level), output_level_(_output_level), @@ -334,14 +340,21 @@ Compaction::Compaction( ? mutable_cf_options()->blob_garbage_collection_age_cutoff : _blob_garbage_collection_age_cutoff), penultimate_level_( - // For simplicity, we don't support the concept of "penultimate level" - // with `CompactionReason::kExternalSstIngestion` and - // `CompactionReason::kRefitLevel` - _compaction_reason == CompactionReason::kExternalSstIngestion || + penultimate_level != -1 + ? penultimate_level + : + // For simplicity, we don't support the concept of "penultimate + // level" with `CompactionReason::kExternalSstIngestion` and + // `CompactionReason::kRefitLevel` + _compaction_reason == CompactionReason::kExternalSstIngestion || _compaction_reason == CompactionReason::kRefitLevel ? Compaction::kInvalidLevel : EvaluatePenultimateLevel(vstorage, immutable_options_, - start_level_, output_level_)) { + start_level_, output_level_)), + resumable_compaction_db_session_id_(resumable_compaction_db_session_id), + resumable_compaction_compaction_id_(resumable_compaction_compaction_id), + resumable_compaction_subcompactions_( + resumable_compaction_subcompactions) { MarkFilesBeingCompacted(true); if (is_manual_compaction_) { compaction_reason_ = CompactionReason::kManualCompaction; @@ -395,8 +408,13 @@ Compaction::Compaction( } } } - - PopulatePenultimateLevelOutputRange(); + if (penultimate_level_smallest.size() == 0 || + penultimate_level_largest.size() == 0) { + penultimate_level_smallest_ = penultimate_level_smallest; + penultimate_level_largest_ = penultimate_level_largest; + } else { + PopulatePenultimateLevelOutputRange(); + } } void Compaction::PopulatePenultimateLevelOutputRange() { diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 50c75f70b22c..de0b16bfe495 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -96,7 +96,15 @@ class Compaction { CompactionReason compaction_reason = CompactionReason::kUnknown, BlobGarbageCollectionPolicy blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kUseDefault, - double blob_garbage_collection_age_cutoff = -1); + double blob_garbage_collection_age_cutoff = -1, + int penultimate_level = -1, + InternalKey penultimate_level_smallest = {}, + InternalKey penultimate_level_largest = {}, + std::string resumable_compaction_db_session_id = "", + uint64_t resumable_compaction_compaction_id = + std::numeric_limits::max(), + std::map> + resumable_compaction_subcmpactions = {}); // The type of the penultimate level output range enum class PenultimateOutputRangeType : int { @@ -432,6 +440,27 @@ class Compaction { const int start_level, const int output_level); + InternalKey GetPenultimateLevelSmallestKey() const { + return penultimate_level_smallest_; + } + + InternalKey GetPenultimateLevelLargestKey() const { + return penultimate_level_largest_; + } + + std::string GetResumableCompactionDbSessionID() const { + return resumable_compaction_db_session_id_; + } + + uint64_t GetResumableCompactionCompactionId() const { + return resumable_compaction_compaction_id_; + } + + std::map> + GetResumableCompactionSubcompactions() const { + return resumable_compaction_subcompactions_; + } + private: void SetInputVersion(Version* input_version); @@ -569,6 +598,10 @@ class Compaction { InternalKey penultimate_level_largest_; PenultimateOutputRangeType penultimate_output_range_type_ = PenultimateOutputRangeType::kNotSupported; + std::string resumable_compaction_db_session_id_; + uint64_t resumable_compaction_compaction_id_; + std::map> + resumable_compaction_subcompactions_; }; #ifndef NDEBUG diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 99b099759db9..0ddd1301270c 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -145,7 +146,11 @@ CompactionJob::CompactionJob( const std::string& db_id, const std::string& db_session_id, std::string full_history_ts_low, std::string trim_ts, BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled, - int* bg_bottom_compaction_scheduled) + int* bg_bottom_compaction_scheduled, + std::string resumable_compaction_db_session_id, + uint64_t resumable_compaction_compaction_id, + std::map> + resumable_compaction_subcompactions) : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -156,6 +161,9 @@ CompactionJob::CompactionJob( bottommost_level_(false), write_hint_(Env::WLTH_NOT_SET), compaction_job_stats_(compaction_job_stats), + resumable_compaction_db_session_id_(resumable_compaction_db_session_id), + resumable_compaction_compaction_id_(resumable_compaction_compaction_id), + resumable_compaction_subcompactions_(resumable_compaction_subcompactions), job_id_(job_id), dbname_(dbname), db_id_(db_id), @@ -256,7 +264,64 @@ void CompactionJob::Prepare() { write_hint_ = cfd->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); - if (c->ShouldFormSubcompactions()) { + if (!resumable_compaction_db_session_id_.empty()) { + if (resumable_compaction_subcompactions_.size() > 0) { + // resource + uint64_t max_subcompactions_limit = GetSubcompactionsLimit(); + uint64_t num_actual_subcompactions = + resumable_compaction_subcompactions_.size(); + AcquireSubcompactionResources( + (int)(num_actual_subcompactions - max_subcompactions_limit)); + for (auto const& sc : resumable_compaction_subcompactions_) { + compact_->sub_compact_states.emplace_back( + c, std::optional(sc.second.first), + std::optional(sc.second.second), sc.first); + } + + // // states + // // copy + // std::map> + // resumable_compaction_subcompactions_copy = + // resumable_compaction_subcompactions_; + // // sort - TODO + // // std::sort( + // // resumable_compaction_subcompactions_copy.begin(), + // // resumable_compaction_subcompactions_copy.end(), + // // [&c](const std::pair>& + // // sc1, + // // const std::pair>& + // // sc2) { + // // if (sc1.second.first.empty() || sc2.second.second.empty()) { + // // return true; + // // } else if (sc1.second.second.empty() || + // sc2.second.first.empty()) + // // { + // // return false; + // // } + // // return (c->column_family_data() + // // ->user_comparator() + // // ->CompareWithoutTimestamp(sc1.second.second, + // // sc2.second.second) + // // < 0 + // // ? true + // // : false); + // // }); + // // state + // for (auto const& sc : resumable_compaction_subcompactions_copy) { + // compact_->sub_compact_states.emplace_back( + // c, std::optional(sc.second.first), + // std::optional(sc.second.second), sc.first); + // } + // stats + RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, + compact_->sub_compact_states.size()); + } else { + compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt, + /*sub_job_id*/ 0); + } + } else if (c->ShouldFormSubcompactions()) { StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); GenSubcompactionBoundaries(); } diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index e812cfc72a30..c3571083c995 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -166,7 +166,12 @@ class CompactionJob { std::string full_history_ts_low = "", std::string trim_ts = "", BlobFileCompletionCallback* blob_callback = nullptr, int* bg_compaction_scheduled = nullptr, - int* bg_bottom_compaction_scheduled = nullptr); + int* bg_bottom_compaction_scheduled = nullptr, + std::string resumable_compaction_db_session_id = "", + uint64_t resumable_compaction_compaction_id = + std::numeric_limits::max(), + std::map> + resumable_compaction_subcompactions = {}); virtual ~CompactionJob(); @@ -234,6 +239,11 @@ class CompactionJob { CompactionJobStats* compaction_job_stats_; + std::string resumable_compaction_db_session_id_; + uint64_t resumable_compaction_compaction_id_; + std::map> + resumable_compaction_subcompactions_; + private: friend class CompactionJobTestBase; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 3149bb500258..6a77b87ee8eb 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -8,6 +8,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + #include "db/compaction/compaction_job.h" #include "db/compaction/compaction_state.h" #include "logging/logging.h" @@ -53,48 +55,96 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.end = compaction_input.has_end ? sub_compact->end->ToString() : ""; - std::string compaction_input_binary; - Status s = compaction_input.Write(&compaction_input_binary); - if (!s.ok()) { - sub_compact->status = s; - return CompactionServiceJobStatus::kFailure; - } + CompactionServiceJobInfo info; + CompactionServiceJobStatus compaction_status; + Status s; + bool is_first_one; - std::ostringstream input_files_oss; - bool is_first_one = true; - for (const auto& file : compaction_input.input_files) { - input_files_oss << (is_first_one ? "" : ", ") << file; - is_first_one = false; - } + if (sub_compact->compaction->GetResumableCompactionDbSessionID().size() == + 0) { + std::string compaction_input_binary; + s = compaction_input.Write(&compaction_input_binary); + if (!s.ok()) { + sub_compact->status = s; + return CompactionServiceJobStatus::kFailure; + } - ROCKS_LOG_INFO( - db_options_.info_log, - "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", - compaction_input.column_family.name.c_str(), job_id_, - compaction_input.output_level, input_files_oss.str().c_str()); - CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, - GetCompactionId(sub_compact), thread_pri_); - CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->StartV2(info, compaction_input_binary); - switch (compaction_status) { - case CompactionServiceJobStatus::kSuccess: - break; - case CompactionServiceJobStatus::kFailure: - sub_compact->status = Status::Incomplete( - "CompactionService failed to start compaction job."); - ROCKS_LOG_WARN(db_options_.info_log, - "[%s] [JOB %d] Remote compaction failed to start.", - compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; - case CompactionServiceJobStatus::kUseLocal: - ROCKS_LOG_INFO( - db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API Start.", - compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; - default: - assert(false); // unknown status - break; + std::ostringstream input_files_oss; + is_first_one = true; + for (const auto& file : compaction_input.input_files) { + input_files_oss << (is_first_one ? "" : ", ") << file; + is_first_one = false; + } + + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", + compaction_input.column_family.name.c_str(), job_id_, + compaction_input.output_level, input_files_oss.str().c_str()); + uint64_t compaction_id = GetCompactionId(sub_compact); + info = CompactionServiceJobInfo(dbname_, db_id_, db_session_id_, + compaction_id, thread_pri_); + compaction_status = + db_options_.compaction_service->StartV2(info, compaction_input_binary); + switch (compaction_status) { + case CompactionServiceJobStatus::kSuccess: + break; + case CompactionServiceJobStatus::kFailure: + sub_compact->status = Status::Incomplete( + "CompactionService failed to start compaction job."); + ROCKS_LOG_WARN(db_options_.info_log, + "[%s] [JOB %d] Remote compaction failed to start.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + case CompactionServiceJobStatus::kUseLocal: + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API Start.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + default: + assert(false); // unknown status + break; + } + + VersionEdit resumable_compaction_info_edit; + + std::vector>> + resumable_compaction_inputs; + for (const auto& level_and_files : inputs) { + std::vector files; + for (const auto& file : level_and_files.files) { + files.push_back(file->fd.GetNumber()); + } + resumable_compaction_inputs.emplace_back(level_and_files.level, files); + } + resumable_compaction_info_edit.AddResumableCompactionInfo( + db_session_id_, compaction_id, resumable_compaction_inputs, + compaction->output_level(), compaction->GetPenultimateLevel(), + compaction->GetPenultimateLevelSmallestKey(), + compaction->GetPenultimateLevelLargestKey(), + sub_compact->start.has_value() ? sub_compact->start->ToString() : "", + sub_compact->end.has_value() ? sub_compact->end->ToString() : ""); + { + db_mutex_->Lock(); + s = versions_->LogAndApply( + compaction->column_family_data(), + *(compaction->column_family_data()->GetLatestMutableCFOptions()), + ReadOptions(Env::IOActivity::kCompaction), + &resumable_compaction_info_edit, db_mutex_, db_directory_); + db_mutex_->Unlock(); + } + + assert(s.ok()); + + } else { + uint64_t resumable_compaction_id = + sub_compact->compaction->GetResumableCompactionCompactionId() << 32 | + sub_compact->sub_job_id; + info = CompactionServiceJobInfo( + dbname_, db_id_, + sub_compact->compaction->GetResumableCompactionDbSessionID(), + resumable_compaction_id, thread_pri_); } ROCKS_LOG_INFO(db_options_.info_log, @@ -830,4 +880,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, } #endif // NDEBUG } // namespace ROCKSDB_NAMESPACE - diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 7c87f88d1be0..c9ee6b20ec05 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -57,8 +57,10 @@ class MyTestCompactionService : public CompactionService { if (i == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } - compaction_input = std::move(i->second); - jobs_.erase(i); + // Hack for resumable remote host + // compaction_input = std::move(i->second); + // jobs_.erase(i); + compaction_input = i->second; } if (is_override_wait_status_) { @@ -227,8 +229,50 @@ class CompactionServiceTest : public DBTestBase { std::shared_ptr compaction_service_; }; +TEST_F(CompactionServiceTest, ResumeBasic1) { + for (bool resume : {true, false}) { + Options options = CurrentOptions(); + options.target_file_size_base = 1 << 5; // 1KB + ReopenWithCompactionService(&options); + + ASSERT_OK(Put("k1", "old")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k1", "new")); + ASSERT_OK(Flush()); + + std::vector metadata; + + SyncPoint::GetInstance()->SetCallBack( + "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { + // override job status + auto s = static_cast(status); + *s = Status::Aborted("MyTestCompactionService failed to compact!"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + + Statistics* compactor_statistics = GetCompactorStatistics(); + compactor_statistics->Reset(); + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); + options.resume_compaction = resume; + Reopen(options); + dbfull()->TEST_WaitForCompact(); + if (resume) { + // (1) sync point -> compact range failed + // (2) sync point -> wait for compaction failed (reset stat) + ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1); + } else { + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); + } + } +} + TEST_F(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); + options.max_subcompactions = 2; + options.target_file_size_base = 1 << 5; // 1KB ReopenWithCompactionService(&options); Statistics* primary_statistics = GetPrimaryStatistics(); @@ -322,7 +366,10 @@ TEST_F(CompactionServiceTest, BasicCompactions) { assert(*id != kNullUniqueId64x2); verify_passed++; }); + SyncPoint::GetInstance()->ClearCallBack( + "DBImplSecondary::CompactWithoutInstallation::End"); Reopen(options); + dbfull()->TEST_WaitForCompact(); ASSERT_GT(verify_passed, 0); Close(); } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 08812a35bd48..20d0e33e0de5 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -3853,7 +3853,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, : kManualCompactionCanceledFalse_, db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, - &bg_bottom_compaction_scheduled_); + &bg_bottom_compaction_scheduled_, + c->GetResumableCompactionDbSessionID(), + c->GetResumableCompactionCompactionId(), + c->GetResumableCompactionSubcompactions()); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/version_edit.cc b/db/version_edit.cc index 6459c2ff863f..7bb6c2dc9aba 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -9,6 +9,8 @@ #include "db/version_edit.h" +#include + #include "db/blob/blob_index.h" #include "db/version_set.h" #include "logging/event_logger.h" @@ -17,7 +19,6 @@ #include "test_util/sync_point.h" #include "util/coding.h" #include "util/string_util.h" - namespace ROCKSDB_NAMESPACE { namespace {} // anonymous namespace @@ -318,17 +319,13 @@ bool VersionEdit::EncodeTo(std::string* dst, char p = static_cast(persist_user_defined_timestamps_); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } - return true; -} -static bool GetInternalKey(Slice* input, InternalKey* dst) { - Slice str; - if (GetLengthPrefixedSlice(input, &str)) { - dst->DecodeFrom(str); - return dst->Valid(); - } else { - return false; + if (has_resumable_compaction_info_) { + PutVarint32(dst, kResumableCompactionInfo); + resumable_compaction_info_.EncodeTo(dst); } + + return true; } bool VersionEdit::GetLevel(Slice* input, int* level, const char** /*msg*/) { @@ -798,6 +795,17 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kResumableCompactionInfo: { + ResumableCompactionInfo resumable_compaction_info; + const Status s = resumable_compaction_info.DecodeFrom(&input); + if (!s.ok()) { + return s; + } + resumable_compaction_info_ = std::move(resumable_compaction_info); + has_resumable_compaction_info_ = true; + break; + } + default: if (tag & kTagSafeIgnoreMask) { // Tag from future which can be safely ignored. diff --git a/db/version_edit.h b/db/version_edit.h index 8e14e76da932..45c6e8928fe7 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -23,12 +23,21 @@ #include "port/malloc.h" #include "rocksdb/advanced_cache.h" #include "rocksdb/advanced_options.h" +#include "rocksdb/status.h" #include "table/table_reader.h" #include "table/unique_id_impl.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { - +static bool GetInternalKey(Slice* input, InternalKey* dst) { + Slice str; + if (GetLengthPrefixedSlice(input, &str)) { + dst->DecodeFrom(str); + return dst->Valid(); + } else { + return false; + } +} // Tag numbers for serialized VersionEdit. These numbers are written to // disk and should not be changed. The number should be forward compatible so // users can down-grade RocksDB safely. A future Tag is ignored by doing '&' @@ -72,6 +81,7 @@ enum Tag : uint32_t { kWalAddition2, kWalDeletion2, kPersistUserDefinedTimestamps, + kResumableCompactionInfo, }; enum NewFileCustomTag : uint32_t { @@ -103,6 +113,19 @@ enum NewFileCustomTag : uint32_t { kPathId, }; +enum ResumableCompactionCustomTag : uint32_t { + kRCTerminate = 1, // The end of customized fields + kDBSessionId, + kJobId, + kInput, + kOutputLevel, + kPenultimateOutputLevel, + kPenultimateOutputLevelSmallest, + kPenultimateOutputLevelLargest, + kSubcompactionStart, + kSubcompactionEnd, +}; + class VersionSet; constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; @@ -383,6 +406,166 @@ struct LevelFilesBrief { } }; +class ResumableCompactionInfo { + public: + void EncodeTo(std::string* output) const { + PutVarint32(output, ResumableCompactionCustomTag::kDBSessionId); + PutLengthPrefixedSlice(output, db_session_id); + PutVarint32Varint64(output, ResumableCompactionCustomTag::kJobId, job_id); + for (const std::pair>& pair : inputs) { + PutVarint32(output, ResumableCompactionCustomTag::kInput); + PutVarint32(output, pair.first /* level */); + PutVarint64(output, pair.second.size() /* length */); + for (uint64_t file_number : pair.second) { + PutVarint64(output, file_number); + } + } + PutVarint32Varint32(output, ResumableCompactionCustomTag::kOutputLevel, + output_level); + PutVarint32Varint32(output, + ResumableCompactionCustomTag::kPenultimateOutputLevel, + penultimate_output_level); + + if (penultimate_output_level_smallest.size() > 0) { + PutVarint32( + output, + ResumableCompactionCustomTag::kPenultimateOutputLevelSmallest); + PutLengthPrefixedSlice(output, + penultimate_output_level_smallest.Encode()); + } + + if (penultimate_output_level_largest.size() > 0) { + PutVarint32(output, + ResumableCompactionCustomTag::kPenultimateOutputLevelLargest); + PutLengthPrefixedSlice(output, penultimate_output_level_largest.Encode()); + } + + PutVarint32(output, ResumableCompactionCustomTag::kSubcompactionStart); + PutLengthPrefixedSlice(output, subcompaction_start); + PutVarint32(output, ResumableCompactionCustomTag::kSubcompactionEnd); + PutLengthPrefixedSlice(output, subcompaction_end); + PutVarint32(output, ResumableCompactionCustomTag::kRCTerminate); + } + + Status DecodeFrom(Slice* input) { + while (true) { + uint32_t custom_tag = 0; + Slice field; + if (!GetVarint32(input, &custom_tag)) { + return Status::Corruption( + "Error decoding custom tag for resumable compaction info"); + } + if (custom_tag == ResumableCompactionCustomTag::kRCTerminate) { + break; + } + switch (custom_tag) { + case ResumableCompactionCustomTag::kDBSessionId: + if (GetLengthPrefixedSlice(input, &field)) { + db_session_id.assign(field.data(), field.size()); + } else { + return Status::Corruption( + "Error decoding db dession id for resumable compaction info"); + } + break; + case ResumableCompactionCustomTag::kJobId: + if (!GetVarint64(input, &job_id)) { + return Status::Corruption( + "Error decoding job id for resumable compaction info"); + } + break; + case ResumableCompactionCustomTag::kInput: { + std::vector inputs_decoded; + uint32_t input_level; + if (!GetVarint32(input, &input_level)) { + return Status::Corruption( + "Error decoding input level for resumable compaction info"); + } + uint64_t input_length; + if (!GetVarint64(input, &input_length)) { + return Status::Corruption( + "Error decoding input length for resumable compaction info"); + } + for (uint64_t i = 0; i < input_length; ++i) { + uint64_t input_file_number; + if (!GetVarint64(input, &input_file_number)) { + return Status::Corruption( + "Error decoding input file number for resumable compaction " + "info"); + } + inputs_decoded.push_back(input_file_number); + } + inputs.push_back({input_level, inputs_decoded}); + break; + } + case ResumableCompactionCustomTag::kOutputLevel: + uint32_t output_level_decoded; + if (GetVarint32(input, &output_level_decoded)) { + output_level = output_level_decoded; + } else { + return Status::Corruption( + "Error decoding output level for resumable compaction info"); + } + break; + case ResumableCompactionCustomTag::kPenultimateOutputLevel: + uint32_t penultimate_output_level_decoded; + if (GetVarint32(input, &penultimate_output_level_decoded)) { + penultimate_output_level = penultimate_output_level_decoded; + } else { + return Status::Corruption( + "Error decoding penultimate output level for resumable " + "compaction info"); + } + break; + case ResumableCompactionCustomTag::kPenultimateOutputLevelSmallest: + if (!GetInternalKey(input, &penultimate_output_level_smallest)) { + return Status::Corruption( + "Error decoding penultimate output level smallest for " + "resumable compaction info"); + } + break; + case ResumableCompactionCustomTag::kPenultimateOutputLevelLargest: + if (!GetInternalKey(input, &penultimate_output_level_largest)) { + return Status::Corruption( + "Error decoding penultimate output level largest for resumable " + "compaction info"); + } + break; + case ResumableCompactionCustomTag::kSubcompactionStart: + if (GetLengthPrefixedSlice(input, &field)) { + subcompaction_start.assign(field.data(), field.size()); + } else { + return Status::Corruption( + "Error decoding subcompaction start for resumable compaction " + "info"); + } + break; + case ResumableCompactionCustomTag::kSubcompactionEnd: + if (GetLengthPrefixedSlice(input, &field)) { + subcompaction_end.assign(field.data(), field.size()); + } else { + return Status::Corruption( + "Error decoding subcompaction end for resumable compaction " + "info"); + } + break; + default: + // Ignore unknown tags + break; + } + } + return Status::OK(); + } + std::string db_session_id; + uint64_t job_id; + std::vector>> inputs; + int output_level; + int penultimate_output_level; + InternalKey penultimate_output_level_smallest; + InternalKey penultimate_output_level_largest; + std::string subcompaction_start; + std::string subcompaction_end; +}; + // The state of a DB at any given time is referred to as a Version. // Any modification to the Version is considered a Version Edit. A Version is // constructed by joining a sequence of Version Edits. Version Edits are written @@ -531,6 +714,42 @@ class VersionEdit { } } + void AddResumableCompactionInfo( + const std::string& db_session_id, uint64_t job_id, + const std::vector>>& inputs, + int output_level, int penultimate_output_level, + const InternalKey& penultimate_output_level_smallest, + const InternalKey& penultimate_output_level_largest, + const std::string& subcompaction_start, + const std::string& subcompaction_end) { + assert(!db_session_id.empty()); + assert(inputs.size() > 0); + for (const auto& input : inputs) { + assert(input.second.size() > 0); + } + has_resumable_compaction_info_ = true; + resumable_compaction_info_.db_session_id = db_session_id; + resumable_compaction_info_.job_id = job_id; + resumable_compaction_info_.inputs = inputs; + resumable_compaction_info_.output_level = output_level; + resumable_compaction_info_.penultimate_output_level = + penultimate_output_level; + resumable_compaction_info_.penultimate_output_level_smallest = + penultimate_output_level_smallest; + resumable_compaction_info_.penultimate_output_level_largest = + penultimate_output_level_largest; + resumable_compaction_info_.subcompaction_start = subcompaction_start; + resumable_compaction_info_.subcompaction_end = subcompaction_end; + } + + bool HasResumableCompactionInfo() const { + return has_resumable_compaction_info_; + } + + const ResumableCompactionInfo& GetResumableCompactionInfo() const { + return resumable_compaction_info_; + } + // Add a new blob file. void AddBlobFile(uint64_t blob_file_number, uint64_t total_blob_count, uint64_t total_blob_bytes, std::string checksum_method, @@ -764,6 +983,8 @@ class VersionEdit { // Since table files and blob files share the same file number space, we just // record the file number here. autovector files_to_quarantine_; -}; + bool has_resumable_compaction_info_ = false; + ResumableCompactionInfo resumable_compaction_info_; +}; } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 90afc0938c9e..c7585a0833fd 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -633,6 +633,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const std::string& new_ts = edit.GetFullHistoryTsLow(); cfd->SetFullHistoryTsLow(new_ts); } + if (cfd->ioptions()->resume_compaction && + edit.HasResumableCompactionInfo() && + cfd->ioptions()->compaction_service != nullptr) { + cfd->AddResumableCompaction(edit.GetResumableCompactionInfo()); + } } if (s.ok()) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index be4eb8fba27f..3ad9bcad056e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -417,7 +417,7 @@ struct CompactionServiceJobInfo { // different DBs and sessions. Env::Priority priority; - + CompactionServiceJobInfo() {} CompactionServiceJobInfo(std::string db_name_, std::string db_id_, std::string db_session_id_, uint64_t job_id_, Env::Priority priority_) @@ -563,6 +563,8 @@ struct DBOptions { // Default: true bool verify_sst_unique_id_in_manifest = true; + bool resume_compaction = true; + // Use the specified object to interact with the environment, // e.g. to read/write files, schedule background work, etc. In the near // future, support for doing storage operations such as read/write files diff --git a/options/db_options.cc b/options/db_options.cc index ca72404dd21a..c67f09d39c3d 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -692,6 +692,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) options.track_and_verify_wals_in_manifest), verify_sst_unique_id_in_manifest( options.verify_sst_unique_id_in_manifest), + resume_compaction(options.resume_compaction), env(options.env), rate_limiter(options.rate_limiter), sst_file_manager(options.sst_file_manager), diff --git a/options/db_options.h b/options/db_options.h index 701a83febb0c..17a3377ad920 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -28,6 +28,7 @@ struct ImmutableDBOptions { bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; bool verify_sst_unique_id_in_manifest; + bool resume_compaction; Env* env; std::shared_ptr rate_limiter; std::shared_ptr sst_file_manager;