Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 21, 2023
1 parent acc078f commit 63c89af
Show file tree
Hide file tree
Showing 18 changed files with 781 additions and 89 deletions.
20 changes: 15 additions & 5 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <iostream>
#include <limits>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -40,7 +42,6 @@
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/compression.h"

namespace ROCKSDB_NAMESPACE {

ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
Expand Down Expand Up @@ -1109,15 +1110,24 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!db_session_id_to_resumable_compactions.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Compaction* result = nullptr;
if (!db_session_id_to_resumable_compactions.empty()) {
result = compaction_picker_->PickResumableCompaction(
db_session_id_to_resumable_compactions, mutable_options,
mutable_db_options, current_->storage_info());
}
if (result == nullptr) {
result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options,
current_->storage_info(), log_buffer);
}
if (result != nullptr) {
result->FinalizeInputInfo(current_);
}
Expand Down
58 changes: 53 additions & 5 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "db/memtable_list.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "options/cf_options.h"
Expand Down Expand Up @@ -549,6 +550,49 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddResumableCompaction(const ResumableCompactionInfo& info) {
auto iter_1 =
db_session_id_to_resumable_compactions.find(info.db_session_id);
if (iter_1 == db_session_id_to_resumable_compactions.end()) {
db_session_id_to_resumable_compactions.insert({info.db_session_id, {}});
}
ResumableCompactions& resumable_compactions =
db_session_id_to_resumable_compactions[info.db_session_id];

uint64_t compaction_id = info.job_id >> 32;
auto iter_2 = resumable_compactions.find(compaction_id);

bool has_subcompaction = info.subcompaction_start.size() != 0 ||
info.subcompaction_end.size() != 0;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t subcompaction_id = info.job_id & LAST_32_BITS_MASK;

if (iter_2 == resumable_compactions.end()) {
ResumableCompaction resumable_compaction = {
info.inputs,
info.output_level,
info.penultimate_output_level,
info.penultimate_output_level_smallest,
info.penultimate_output_level_largest,
{}};
if (has_subcompaction) {
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
resumable_compactions.insert({compaction_id, resumable_compaction});
} else {
assert(has_subcompaction);
ResumableCompaction& resumable_compaction = iter_2->second;
assert(resumable_compaction.subcompaction_id_to_range.find(
subcompaction_id) ==
resumable_compaction.subcompaction_id_to_range.end());
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down Expand Up @@ -604,8 +648,8 @@ class ColumnFamilyData {
std::unique_ptr<ThreadLocalPtr> local_sv_;

// pointers for a circular linked list. we use it to support iterations over
// all column families that are alive (note: dropped column families can also
// be alive as long as client holds a reference)
// all column families that are alive (note: dropped column families can
// also be alive as long as client holds a reference)
ColumnFamilyData* next_;
ColumnFamilyData* prev_;

Expand All @@ -622,7 +666,8 @@ class ColumnFamilyData {

std::unique_ptr<WriteControllerToken> write_controller_token_;

// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
// If true --> this ColumnFamily is currently present in
// DBImpl::flush_queue_
bool queued_for_flush_;

// If true --> this ColumnFamily is currently present in
Expand All @@ -644,12 +689,15 @@ class ColumnFamilyData {

std::string full_history_ts_low_;

// For charging memory usage of file metadata created for newly added files to
// a Version associated with this CFD
// For charging memory usage of file metadata created for newly added files
// to a Version associated with this CFD
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

std::map<std::string, ResumableCompactions>
db_session_id_to_resumable_compactions;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
34 changes: 26 additions & 8 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,13 @@ Compaction::Compaction(
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
double _blob_garbage_collection_age_cutoff, int penultimate_level,
InternalKey penultimate_level_smallest,
InternalKey penultimate_level_largest,
std::string resumable_compaction_db_session_id,
uint64_t resumable_compaction_compaction_id,
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -334,14 +340,21 @@ Compaction::Compaction(
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
penultimate_level != -1
? penultimate_level
:
// For simplicity, we don't support the concept of "penultimate
// level" with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)) {
start_level_, output_level_)),
resumable_compaction_db_session_id_(resumable_compaction_db_session_id),
resumable_compaction_compaction_id_(resumable_compaction_compaction_id),
resumable_compaction_subcompactions_(
resumable_compaction_subcompactions) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
Expand Down Expand Up @@ -395,8 +408,13 @@ Compaction::Compaction(
}
}
}

PopulatePenultimateLevelOutputRange();
if (penultimate_level_smallest.size() != 0 ||
penultimate_level_largest.size() != 0) {
penultimate_level_smallest_ = penultimate_level_smallest;
penultimate_level_largest_ = penultimate_level_largest;
} else {
PopulatePenultimateLevelOutputRange();
}
}

void Compaction::PopulatePenultimateLevelOutputRange() {
Expand Down
35 changes: 34 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ class Compaction {
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1,
InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string resumable_compaction_db_session_id = "",
uint64_t resumable_compaction_compaction_id =
std::numeric_limits<uint64_t>::max(),
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcmpactions = {});

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -432,6 +440,27 @@ class Compaction {
const int start_level,
const int output_level);

InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

std::string GetResumableCompactionDbSessionID() const {
return resumable_compaction_db_session_id_;
}

uint64_t GetResumableCompactionCompactionId() const {
return resumable_compaction_compaction_id_;
}

std::map<uint64_t, std::pair<std::string, std::string>>
GetResumableCompactionSubcompactions() const {
return resumable_compaction_subcompactions_;
}

private:
void SetInputVersion(Version* input_version);

Expand Down Expand Up @@ -569,6 +598,10 @@ class Compaction {
InternalKey penultimate_level_largest_;
PenultimateOutputRangeType penultimate_output_range_type_ =
PenultimateOutputRangeType::kNotSupported;
std::string resumable_compaction_db_session_id_;
uint64_t resumable_compaction_compaction_id_;
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions_;
};

#ifndef NDEBUG
Expand Down
69 changes: 67 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <algorithm>
#include <cinttypes>
#include <iostream>
#include <memory>
#include <optional>
#include <set>
Expand Down Expand Up @@ -145,7 +146,11 @@ CompactionJob::CompactionJob(
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
int* bg_bottom_compaction_scheduled)
int* bg_bottom_compaction_scheduled,
std::string resumable_compaction_db_session_id,
uint64_t resumable_compaction_compaction_id,
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions)
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
Expand All @@ -156,6 +161,9 @@ CompactionJob::CompactionJob(
bottommost_level_(false),
write_hint_(Env::WLTH_NOT_SET),
compaction_job_stats_(compaction_job_stats),
resumable_compaction_db_session_id_(resumable_compaction_db_session_id),
resumable_compaction_compaction_id_(resumable_compaction_compaction_id),
resumable_compaction_subcompactions_(resumable_compaction_subcompactions),
job_id_(job_id),
dbname_(dbname),
db_id_(db_id),
Expand Down Expand Up @@ -256,7 +264,64 @@ void CompactionJob::Prepare() {
write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();

if (c->ShouldFormSubcompactions()) {
if (!resumable_compaction_db_session_id_.empty()) {
if (resumable_compaction_subcompactions_.size() > 0) {
// resource
uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
uint64_t num_actual_subcompactions =
resumable_compaction_subcompactions_.size();
AcquireSubcompactionResources(
(int)(num_actual_subcompactions - max_subcompactions_limit));
for (auto const& sc : resumable_compaction_subcompactions_) {
compact_->sub_compact_states.emplace_back(
c, std::optional<Slice>(sc.second.first),
std::optional<Slice>(sc.second.second), sc.first);
}

// // states
// // copy
// std::map<uint64_t, std::pair<std::string, std::string>>
// resumable_compaction_subcompactions_copy =
// resumable_compaction_subcompactions_;
// // sort - TODO
// // std::sort(
// // resumable_compaction_subcompactions_copy.begin(),
// // resumable_compaction_subcompactions_copy.end(),
// // [&c](const std::pair<uint64_t, std::pair<std::string,
// // std::string>>&
// // sc1,
// // const std::pair<uint64_t, std::pair<std::string,
// // std::string>>&
// // sc2) {
// // if (sc1.second.first.empty() || sc2.second.second.empty()) {
// // return true;
// // } else if (sc1.second.second.empty() ||
// sc2.second.first.empty())
// // {
// // return false;
// // }
// // return (c->column_family_data()
// // ->user_comparator()
// // ->CompareWithoutTimestamp(sc1.second.second,
// // sc2.second.second)
// // < 0
// // ? true
// // : false);
// // });
// // state
// for (auto const& sc : resumable_compaction_subcompactions_copy) {
// compact_->sub_compact_states.emplace_back(
// c, std::optional<Slice>(sc.second.first),
// std::optional<Slice>(sc.second.second), sc.first);
// }
// stats
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
} else {
compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt,
/*sub_job_id*/ 0);
}
} else if (c->ShouldFormSubcompactions()) {
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
GenSubcompactionBoundaries();
}
Expand Down
12 changes: 11 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ class CompactionJob {
std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr,
int* bg_compaction_scheduled = nullptr,
int* bg_bottom_compaction_scheduled = nullptr);
int* bg_bottom_compaction_scheduled = nullptr,
std::string resumable_compaction_db_session_id = "",
uint64_t resumable_compaction_compaction_id =
std::numeric_limits<uint64_t>::max(),
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions = {});

virtual ~CompactionJob();

Expand Down Expand Up @@ -234,6 +239,11 @@ class CompactionJob {

CompactionJobStats* compaction_job_stats_;

std::string resumable_compaction_db_session_id_;
uint64_t resumable_compaction_compaction_id_;
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions_;

private:
friend class CompactionJobTestBase;

Expand Down
Loading

0 comments on commit 63c89af

Please sign in to comment.