Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 20, 2023
1 parent acc078f commit 1751c78
Show file tree
Hide file tree
Showing 15 changed files with 655 additions and 76 deletions.
59 changes: 57 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <iostream>
#include <limits>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -40,7 +42,6 @@
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/compression.h"

namespace ROCKSDB_NAMESPACE {

ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
Expand Down Expand Up @@ -1109,12 +1110,66 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!db_session_id_to_resumable_compactions.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
while (!db_session_id_to_resumable_compactions.empty()) {
auto iter_1 = db_session_id_to_resumable_compactions.begin();
auto db_session_id = iter_1->first;
ResumableCompactions resumable_compactions = iter_1->second;
db_session_id_to_resumable_compactions.erase(iter_1);
while (!resumable_compactions.empty()) {
auto iter_2 = resumable_compactions.begin();
auto compaction_id = iter_2->first;
ResumableCompaction resumable_compaction = iter_2->second;
resumable_compactions.erase(iter_2);

// Verify input files stil exist thus the compaction needs resumed
std::vector<CompactionInputFiles> inputs;
bool has_error = false;
for (auto& input : resumable_compaction.inputs) {
CompactionInputFiles input_files;
input_files.level = input.first;
for (uint64_t input_file_number : input.second) {
FileMetaData* file =
current_->storage_info()->GetFileMetaDataByNumber(
input_file_number);
if (file == nullptr) {
has_error = true;
break;
}
input_files.files.push_back(file);
}
if (has_error) {
break;
}
inputs.push_back(input_files);
}

if (!has_error && !inputs.empty()) {
Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options,
mutable_db_options, inputs, resumable_compaction.output_level,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0,
CompressionType::kNoCompression, CompressionOptions(),
Temperature::kUnknown, std::numeric_limits<uint32_t>::max(), {},
false, "", -1, false, true, CompactionReason::kUnknown,
BlobGarbageCollectionPolicy::kUseDefault, -1,
resumable_compaction.penultimate_output_level,
resumable_compaction.penultimate_output_level_smallest,
resumable_compaction.penultimate_output_level_largest,
db_session_id, compaction_id,
resumable_compaction.subcompaction_id_to_range);
result->FinalizeInputInfo(current_);
return result;
}
}
}
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Expand Down
71 changes: 66 additions & 5 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "db/memtable_list.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "options/cf_options.h"
Expand Down Expand Up @@ -549,6 +550,49 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddResumableCompaction(const ResumableCompactionInfo& info) {
auto iter_1 =
db_session_id_to_resumable_compactions.find(info.db_session_id);
if (iter_1 == db_session_id_to_resumable_compactions.end()) {
db_session_id_to_resumable_compactions.insert({info.db_session_id, {}});
}
ResumableCompactions& resumable_compactions =
db_session_id_to_resumable_compactions[info.db_session_id];

uint64_t compaction_id = info.job_id >> 32;
auto iter_2 = resumable_compactions.find(compaction_id);

bool has_subcompaction = info.subcompaction_start.size() != 0 ||
info.subcompaction_end.size() != 0;
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
uint64_t subcompaction_id = info.job_id & LAST_32_BITS_MASK;

if (iter_2 == resumable_compactions.end()) {
ResumableCompaction resumable_compaction = {
info.inputs,
info.output_level,
info.penultimate_output_level,
info.penultimate_output_level_smallest,
info.penultimate_output_level_largest,
{}};
if (has_subcompaction) {
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
resumable_compactions.insert({compaction_id, resumable_compaction});
} else {
assert(has_subcompaction);
ResumableCompaction& resumable_compaction = iter_2->second;
assert(resumable_compaction.subcompaction_id_to_range.find(
subcompaction_id) ==
resumable_compaction.subcompaction_id_to_range.end());
resumable_compaction.subcompaction_id_to_range.insert(
{subcompaction_id,
{info.subcompaction_start, info.subcompaction_end}});
}
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down Expand Up @@ -604,8 +648,8 @@ class ColumnFamilyData {
std::unique_ptr<ThreadLocalPtr> local_sv_;

// pointers for a circular linked list. we use it to support iterations over
// all column families that are alive (note: dropped column families can also
// be alive as long as client holds a reference)
// all column families that are alive (note: dropped column families can
// also be alive as long as client holds a reference)
ColumnFamilyData* next_;
ColumnFamilyData* prev_;

Expand All @@ -622,7 +666,8 @@ class ColumnFamilyData {

std::unique_ptr<WriteControllerToken> write_controller_token_;

// If true --> this ColumnFamily is currently present in DBImpl::flush_queue_
// If true --> this ColumnFamily is currently present in
// DBImpl::flush_queue_
bool queued_for_flush_;

// If true --> this ColumnFamily is currently present in
Expand All @@ -644,12 +689,28 @@ class ColumnFamilyData {

std::string full_history_ts_low_;

// For charging memory usage of file metadata created for newly added files to
// a Version associated with this CFD
// For charging memory usage of file metadata created for newly added files
// to a Version associated with this CFD
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

struct ResumableCompaction {
std::vector<std::pair<int, std::vector<uint64_t>>> inputs;
int output_level;
int penultimate_output_level;
InternalKey penultimate_output_level_smallest;
InternalKey penultimate_output_level_largest;
std::map<uint64_t, std::pair<std::string, std::string>>
subcompaction_id_to_range;
};

using ResumableCompactions =
std::map<uint64_t /* compaction id */, ResumableCompaction>;

std::map<std::string, ResumableCompactions>
db_session_id_to_resumable_compactions;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
34 changes: 26 additions & 8 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,13 @@ Compaction::Compaction(
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
double _blob_garbage_collection_age_cutoff, int penultimate_level,
InternalKey penultimate_level_smallest,
InternalKey penultimate_level_largest,
std::string resumable_compaction_db_session_id,
uint64_t resumable_compaction_compaction_id,
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -334,14 +340,21 @@ Compaction::Compaction(
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
penultimate_level != -1
? penultimate_level
:
// For simplicity, we don't support the concept of "penultimate
// level" with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)) {
start_level_, output_level_)),
resumable_compaction_db_session_id_(resumable_compaction_db_session_id),
resumable_compaction_compaction_id_(resumable_compaction_compaction_id),
resumable_compaction_subcompactions_(
resumable_compaction_subcompactions) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
Expand Down Expand Up @@ -395,8 +408,13 @@ Compaction::Compaction(
}
}
}

PopulatePenultimateLevelOutputRange();
if (penultimate_level_smallest.size() == 0 ||
penultimate_level_largest.size() == 0) {
penultimate_level_smallest_ = penultimate_level_smallest;
penultimate_level_largest_ = penultimate_level_largest;
} else {
PopulatePenultimateLevelOutputRange();
}
}

void Compaction::PopulatePenultimateLevelOutputRange() {
Expand Down
35 changes: 34 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ class Compaction {
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
double blob_garbage_collection_age_cutoff = -1,
int penultimate_level = -1,
InternalKey penultimate_level_smallest = {},
InternalKey penultimate_level_largest = {},
std::string resumable_compaction_db_session_id = "",
uint64_t resumable_compaction_compaction_id =
std::numeric_limits<uint64_t>::max(),
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcmpactions = {});

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -432,6 +440,27 @@ class Compaction {
const int start_level,
const int output_level);

InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

std::string GetResumableCompactionDbSessionID() const {
return resumable_compaction_db_session_id_;
}

uint64_t GetResumableCompactionCompactionId() const {
return resumable_compaction_compaction_id_;
}

std::map<uint64_t, std::pair<std::string, std::string>>
GetResumableCompactionSubcompactions() const {
return resumable_compaction_subcompactions_;
}

private:
void SetInputVersion(Version* input_version);

Expand Down Expand Up @@ -569,6 +598,10 @@ class Compaction {
InternalKey penultimate_level_largest_;
PenultimateOutputRangeType penultimate_output_range_type_ =
PenultimateOutputRangeType::kNotSupported;
std::string resumable_compaction_db_session_id_;
uint64_t resumable_compaction_compaction_id_;
std::map<uint64_t, std::pair<std::string, std::string>>
resumable_compaction_subcompactions_;
};

#ifndef NDEBUG
Expand Down
Loading

0 comments on commit 1751c78

Please sign in to comment.