Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Nov 30, 2023
1 parent acc078f commit 72f1441
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 4 deletions.
33 changes: 32 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <limits>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -1109,12 +1110,42 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!temp_rc_infos_.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
while (!temp_rc_infos_.empty()) {
std::vector<CompactionInputFiles> inputs;

auto rc_info = temp_rc_infos_.back();
temp_rc_infos_.pop_back();

bool error = false;
for (auto rc_input : rc_info.rc_inputs_) {
CompactionInputFiles input;
input.level = rc_input.first;
for (uint64_t rc_input_file_number : rc_input.second) {
FileMetaData* file = current_->storage_info()->GetFileMetaDataByNumber(
rc_input_file_number);
if (file == nullptr) {
error = true;
break;
}
input.files.push_back(file);
}
if (error) {
break;
}
}
if (!error) {
// Compaction* result = new Compaction();
Compaction* result = nullptr;
return result;
}
}
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Expand Down
29 changes: 29 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,21 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddTempRCInfo(
const std::string& rc_db_session_id, uint64_t rc_job_id,
const std::vector<std::pair<int, std::vector<uint64_t>>>& rc_inputs,
int rc_output_level, int rc_penultimate_level,
const InternalKey& rc_penultimate_level_smallest,
const InternalKey& rc_penultimate_level_largest,
const std::string& rc_subcompaction_start,
const std::string& rc_subcompaction_end) {
temp_rc_infos_.push_back({rc_db_session_id, rc_job_id, rc_inputs,
rc_output_level, rc_penultimate_level,
rc_penultimate_level_smallest,
rc_penultimate_level_largest,
rc_subcompaction_start, rc_subcompaction_end});
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down Expand Up @@ -650,6 +665,20 @@ class ColumnFamilyData {
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

struct TempRCInfo {
std::string rc_db_session_id_;
uint64_t rc_job_id_;
std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs_;
int rc_output_level_;
int rc_penultimate_level_;
InternalKey rc_penultimate_level_smallest_;
InternalKey rc_penultimate_level_largest_;
std::string rc_subcompaction_start_;
std::string rc_subcompaction_end_;
};

std::vector<TempRCInfo> temp_rc_infos_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
8 changes: 8 additions & 0 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ class Compaction {
const int start_level,
const int output_level);

InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

private:
void SetInputVersion(Version* input_version);

Expand Down
37 changes: 34 additions & 3 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
uint64_t compaction_id = GetCompactionId(sub_compact);
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, compaction_id,
thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
Expand All @@ -96,6 +97,37 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
assert(false); // unknown status
break;
}
// Persist
VersionEdit vc_info;

std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs;
for (const auto& files_per_level : inputs) {
std::vector<uint64_t> files;
for (const auto& file : files_per_level.files) {
files.push_back(file->fd.GetNumber());
}
rc_inputs.emplace_back(files_per_level.level, files);
}

vc_info.AddRCInfo(
db_session_id_, compaction_id, rc_inputs, compaction->output_level(),
compaction->GetPenultimateLevel(),
compaction->GetPenultimateLevelSmallestKey(),
compaction->GetPenultimateLevelLargestKey(),
sub_compact->start.has_value() ? sub_compact->start->ToString() : "",
sub_compact->end.has_value() ? sub_compact->end->ToString() : "");

{
db_mutex_->Lock();
s = versions_->LogAndApply(
compaction->column_family_data(),
*(compaction->column_family_data()->GetLatestMutableCFOptions()),
ReadOptions(Env::IOActivity::kCompaction), &vc_info, db_mutex_,
db_directory_);
db_mutex_->Unlock();
}

assert(s.ok());

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
Expand Down Expand Up @@ -830,4 +862,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE

99 changes: 99 additions & 0 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,53 @@ bool VersionEdit::EncodeTo(std::string* dst,
char p = static_cast<char>(persist_user_defined_timestamps_);
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}

if (!rc_db_session_id_.empty()) {
PutVarint32(dst, kRCDBSessionId);
PutLengthPrefixedSlice(dst, rc_db_session_id_);
}

// if rc_job_id_??
PutVarint32Varint64(dst, kRCJobId, rc_job_id_);

if (!rc_inputs_.empty()) {
for (const std::pair<int, std::vector<uint64_t>>& pair : rc_inputs_) {
PutVarint32(dst, kRCInput);
PutVarint32(dst, pair.first /* level */);
PutVarint64(dst, pair.second.size() /* length */);
for (uint64_t file_number : pair.second) {
PutVarint64(dst, file_number);
}
}
}

// if rc_output_level_ ??
PutVarint32Varint32(dst, kRCOutputLevel, rc_output_level_);

// if rc_penultimate_level_ ??
PutVarint32Varint32(dst, kRCPenultimateLevel, rc_penultimate_level_);

if (rc_penultimate_level_smallest_.size() > 0) {
PutVarint32(dst, kRCPenultimateLevelSmallest);
PutLengthPrefixedSlice(dst, rc_penultimate_level_smallest_.Encode());
}

if (rc_penultimate_level_largest_.size() > 0) {
PutVarint32(dst, kRCPenultimateLevelLargest);
PutLengthPrefixedSlice(dst, rc_penultimate_level_largest_.Encode());
}

// if rc_subcompaction_start_
if (rc_subcompaction_start_.size() > 0) {
PutVarint32(dst, kRCSubcompactionStart);
PutLengthPrefixedSlice(dst, rc_subcompaction_start_);
}

// if rc_subcompaction_end_
if (rc_subcompaction_end_.size() > 0) {
PutVarint32(dst, kRCSubcompactionEnd);
PutLengthPrefixedSlice(dst, rc_subcompaction_end_);
}
return true;
}

Expand Down Expand Up @@ -798,6 +845,58 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kRCDBSessionId:
GetLengthPrefixedSlice(&input, &str);
rc_db_session_id_.assign(str.data(), str.size());
break;

case kRCJobId:
GetVarint64(&input, &rc_job_id_);
break;

case kRCInput:
uint32_t rc_input_level;
GetVarint32(&input, &rc_input_level);
rc_inputs_.push_back({rc_input_level, {}});
uint64_t rc_input_length;
GetVarint64(&input, &rc_input_length);
for (uint64_t i = 0; i < rc_input_length; ++i) {
uint64_t rc_input_file_number;
GetVarint64(&input, &rc_input_file_number);
rc_inputs_.back().second.push_back(rc_input_file_number);
}
break;

case kRCOutputLevel:
uint32_t rc_output_level;
GetVarint32(&input, &rc_output_level);
rc_output_level_ = rc_output_level;
break;

case kRCPenultimateLevel:
uint32_t rc_penultimate_level;
GetVarint32(&input, &rc_penultimate_level);
rc_penultimate_level_ = rc_penultimate_level;
break;

case kRCPenultimateLevelSmallest:
GetInternalKey(&input, &rc_penultimate_level_smallest_);
break;

case kRCPenultimateLevelLargest:
GetInternalKey(&input, &rc_penultimate_level_largest_);
break;

case kRCSubcompactionStart:
GetLengthPrefixedSlice(&input, &str);
rc_subcompaction_start_.assign(str.data(), str.size());
break;

case kRCSubcompactionEnd:
GetLengthPrefixedSlice(&input, &str);
rc_subcompaction_end_.assign(str.data(), str.size());
break;

default:
if (tag & kTagSafeIgnoreMask) {
// Tag from future which can be safely ignored.
Expand Down
66 changes: 66 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ enum Tag : uint32_t {
kWalAddition2,
kWalDeletion2,
kPersistUserDefinedTimestamps,
kRCDBSessionId,
kRCJobId,
kRCInput,
kRCOutputLevel,
kRCPenultimateLevel,
kRCPenultimateLevelSmallest,
kRCPenultimateLevelLargest,
kRCSubcompactionStart,
kRCSubcompactionEnd,
};

enum NewFileCustomTag : uint32_t {
Expand Down Expand Up @@ -531,6 +540,53 @@ class VersionEdit {
}
}

void AddRCInfo(
const std::string& rc_db_session_id, uint64_t rc_job_id,
const std::vector<std::pair<int, std::vector<uint64_t>>>& rc_inputs,
int rc_output_level, int rc_penultimate_level,
const InternalKey& rc_penultimate_level_smallest,
const InternalKey& rc_penultimate_level_largest,
const std::string& rc_subcompaction_start,
const std::string& rc_subcompaction_end) {
rc_db_session_id_ = rc_db_session_id;
rc_job_id_ = rc_job_id;
rc_inputs_ = rc_inputs;
rc_output_level_ = rc_output_level;
rc_penultimate_level_ = rc_penultimate_level;
rc_penultimate_level_smallest_ = rc_penultimate_level_smallest;
rc_penultimate_level_largest_ = rc_penultimate_level_largest;
rc_subcompaction_start_ = rc_subcompaction_start;
rc_subcompaction_end_ = rc_subcompaction_end;
}

bool HasRCDBSessionID() const { return !rc_db_session_id_.empty(); }

std::string GetRCDbSessionID() const { return rc_db_session_id_; }

uint64_t GetRCJobID() const { return rc_job_id_; }

std::vector<std::pair<int, std::vector<uint64_t>>> GetRCInputs() const {
return rc_inputs_;
}

int GetRCOutputLevel() const { return rc_output_level_; }

int GetRCPenultimateLevel() const { return rc_penultimate_level_; }

InternalKey GetRCPenultimateLevelSmallest() const {
return rc_penultimate_level_smallest_;
}

InternalKey GetRCPenultimateLevelLargest() const {
return rc_penultimate_level_largest_;
}

std::string GetRCSubcompactionStart() const {
return rc_subcompaction_start_;
}

std::string GetRCSubcompactionEnd() const { return rc_subcompaction_end_; }

// Add a new blob file.
void AddBlobFile(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, std::string checksum_method,
Expand Down Expand Up @@ -764,6 +820,16 @@ class VersionEdit {
// Since table files and blob files share the same file number space, we just
// record the file number here.
autovector<uint64_t> files_to_quarantine_;

std::string rc_db_session_id_;
uint64_t rc_job_id_;
std::vector<std::pair<int, std::vector<uint64_t>>> rc_inputs_;
int rc_output_level_;
int rc_penultimate_level_;
InternalKey rc_penultimate_level_smallest_;
InternalKey rc_penultimate_level_largest_;
std::string rc_subcompaction_start_;
std::string rc_subcompaction_end_;
};

} // namespace ROCKSDB_NAMESPACE
8 changes: 8 additions & 0 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,14 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const std::string& new_ts = edit.GetFullHistoryTsLow();
cfd->SetFullHistoryTsLow(new_ts);
}
if (edit.HasRCDBSessionID()) {
cfd->AddTempRCInfo(
edit.GetRCDbSessionID(), edit.GetRCJobID(), edit.GetRCInputs(),
edit.GetRCOutputLevel(), edit.GetRCPenultimateLevel(),
edit.GetRCPenultimateLevelSmallest(),
edit.GetRCPenultimateLevelLargest(), edit.GetRCSubcompactionStart(),
edit.GetRCSubcompactionEnd());
}
}

if (s.ok()) {
Expand Down

0 comments on commit 72f1441

Please sign in to comment.