Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add some information structures #16

Merged
merged 1 commit into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions db/art/filter_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void FilterCacheManager::inherit_count_recorder(std::vector<uint32_t>& merged_se

for (uint32_t& new_segment_id : new_segment_ids) {
auto last_it = last_count_recorder_.find(new_segment_id);
uint32_t new_last_count = level_0_base_count;
uint32_t new_last_count = level_0_base_count; // level 0 segments init
if (new_last_count_recorder.count(new_segment_id) > 0) {
new_last_count = new_last_count_recorder[new_segment_id];
}
Expand All @@ -253,7 +253,7 @@ void FilterCacheManager::inherit_count_recorder(std::vector<uint32_t>& merged_se
}

auto current_it = current_count_recorder_.find(new_segment_id);
uint32_t new_current_count = level_0_base_count;
uint32_t new_current_count = level_0_base_count; // level 0 segments init
if (new_current_count_recorder.count(new_segment_id) > 0) {
new_current_count = new_current_count_recorder[new_segment_id];
}
Expand Down Expand Up @@ -297,14 +297,15 @@ void FilterCacheManager::try_retrain_model(std::map<uint32_t, uint16_t>& level_r
// we should guarantee these 3 external recorder share the same keys set
// we need to do this job outside FilterCacheManager
assert(level_recorder.size() == segment_ranges_recorder.size());
assert(level_recorder.size() == unit_size_recorder.size());
// assert(level_recorder.size() == unit_size_recorder.size());
if (train_signal_ == false) {
return;
}

// solve programming problem
std::map<uint32_t, uint16_t> label_recorder;
std::map<uint32_t, SegmentAlgoInfo> algo_infos;
/*
auto get_cnt_it = last_count_recorder_.begin();
auto unit_size_it = unit_size_recorder.begin();
while (unit_size_it != unit_size_recorder.end() && get_cnt_it != last_count_recorder_.end()) {
Expand All @@ -318,6 +319,14 @@ void FilterCacheManager::try_retrain_model(std::map<uint32_t, uint16_t>& level_r
}
}
greedy_algo_.solve(algo_infos, label_recorder, filter_cache_.cache_size_except_level_0());
*/
assert(unit_size_recorder.size() == 0);
auto get_cnt_it = last_count_recorder_.begin();
while (get_cnt_it != last_count_recorder_.end()) {
algo_infos.insert(std::make_pair(get_cnt_it->first, SegmentAlgoInfo(get_cnt_it->second, DEFAULT_UNIT_SIZE)));
get_cnt_it ++;
}
greedy_algo_.solve(algo_infos, label_recorder, filter_cache_.cache_size_except_level_0());

// programming problem may include some merged segments, we need to ignore them
auto old_level_it = level_recorder.begin();
Expand Down
60 changes: 40 additions & 20 deletions db/art/filter_cache_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,59 @@ bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unorde
heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready();
if (!heat_buckets_ready_) {
// if heat_buckets_ready_ false
pool_.submit_detach(do_prepare_heat_buckets, key, segment_info_recorder);
assert(segment_info_recorder->size() == 0); // should always empty
heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready();
if (!heat_buckets_ready_) {
pool_.submit_detach(do_prepare_heat_buckets, key, segment_info_recorder);
heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready();
}
}
return heat_buckets_ready_;
}

void FilterCacheClient::do_retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
void FilterCacheClient::do_retrain_or_keep_model(std::vector<uint16_t>* const features_nums_except_level_0,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder) {
std::map<uint32_t, uint16_t> level_copy;
std::map<uint32_t, std::vector<RangeRatePair>> segment_ranges_copy;
// if this func background monitor signal, how can it receive latest argument? input pointer!
while (!filter_cache_manager_.heat_buckets_ready());
while (!filter_cache_manager_.ready_work()); // wait for manager ready
assert(filter_cache_manager_.heat_buckets_ready()); // must guarantee that heat buckets ready before we make filter cache manager ready
// actually we will load data before we test, so we can ensure that heat buckets ready first
filter_cache_manager_.make_clf_model_ready(*features_nums);
filter_cache_manager_.try_retrain_model(*level_recorder, *segment_ranges_recorder, *unit_size_recorder);
filter_cache_manager_.update_cache_and_heap(*level_recorder, *segment_ranges_recorder);
filter_cache_manager_.make_clf_model_ready(*features_nums_except_level_0);
while (level_recorder->size() != segment_ranges_recorder->size());
level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder;
if (level_copy.size() != segment_ranges_copy.size()) {
while (level_recorder->size() != segment_ranges_recorder->size());
level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder;
}
assert(unit_size_recorder->size() == 0); // should be empty, then we use default unit size DEFAULT_UNIT_SIZE
filter_cache_manager_.try_retrain_model(level_copy, segment_ranges_copy, *unit_size_recorder);
filter_cache_manager_.update_cache_and_heap(level_copy, segment_ranges_copy);

while (true) {
// in one long period
while (!filter_cache_manager_.need_retrain()); // wait for long period end
filter_cache_manager_.try_retrain_model(*level_recorder, *segment_ranges_recorder, *unit_size_recorder);
// python lgb_model server will decide on whether keep model or retrain model
filter_cache_manager_.update_cache_and_heap(*level_recorder, *segment_ranges_recorder);
while (level_recorder->size() != segment_ranges_recorder->size());
level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder;
if (level_copy.size() != segment_ranges_copy.size()) {
while (level_recorder->size() != segment_ranges_recorder->size());
level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder;
}
assert(unit_size_recorder->size() == 0); // should be empty, then we use default unit size DEFAULT_UNIT_SIZE
filter_cache_manager_.try_retrain_model(level_copy, segment_ranges_copy, *unit_size_recorder);
filter_cache_manager_.update_cache_and_heap(level_copy, segment_ranges_copy);
}
// this loop never end
}

void FilterCacheClient::retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
void FilterCacheClient::retrain_or_keep_model(std::vector<uint16_t>* const features_nums_except_level_0,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder) {
pool_.submit_detach(do_retrain_or_keep_model, features_nums, level_recorder, segment_ranges_recorder, unit_size_recorder);
pool_.submit_detach(do_retrain_or_keep_model, features_nums_except_level_0, level_recorder, segment_ranges_recorder, unit_size_recorder);
// if first model training not end, python lgb_model server still return default units num
// then retrain model when every long period end. if model still work well, keep this model instead
// no need to return any value
Expand Down Expand Up @@ -81,18 +101,18 @@ void FilterCacheClient::make_adjustment() {
pool_.submit_detach(do_make_adjustment);
}

void FilterCacheClient::do_batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder) {
filter_cache_manager_.insert_segments(*merged_segment_ids, *new_segment_ids, *inherit_infos_recorder,
*level_recorder, level_0_base_count, *segment_ranges_recorder);
void FilterCacheClient::do_batch_insert_segments(std::vector<uint32_t> merged_segment_ids, std::vector<uint32_t> new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>> inherit_infos_recorder,
std::map<uint32_t, uint16_t> level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>> segment_ranges_recorder) {
filter_cache_manager_.insert_segments(merged_segment_ids, new_segment_ids, inherit_infos_recorder,
level_recorder, level_0_base_count, segment_ranges_recorder);
}

void FilterCacheClient::batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder) {
void FilterCacheClient::batch_insert_segments(std::vector<uint32_t> merged_segment_ids, std::vector<uint32_t> new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>> inherit_infos_recorder,
std::map<uint32_t, uint16_t> level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>> segment_ranges_recorder) {
if (level_0_base_count == 0) {
pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, INIT_LEVEL_0_COUNT, segment_ranges_recorder);
} else {
Expand Down
20 changes: 10 additions & 10 deletions db/art/filter_cache_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class FilterCacheClient {
static void do_prepare_heat_buckets(const std::string& key, std::unordered_map<uint32_t, std::vector<std::string>>* const segment_info_recorder);

// background thread part of retrain_or_keep_model
static void do_retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
static void do_retrain_or_keep_model(std::vector<uint16_t>* const features_nums_except_level_0,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder);
Expand All @@ -38,10 +38,10 @@ class FilterCacheClient {
static void do_make_adjustment();

// background thread part of batch_insert_segments
static void do_batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder);
static void do_batch_insert_segments(std::vector<uint32_t> merged_segment_ids, std::vector<uint32_t> new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>> inherit_infos_recorder,
std::map<uint32_t, uint16_t> level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>> segment_ranges_recorder);
public:
FilterCacheClient() {
heat_buckets_ready_ = false;
Expand All @@ -63,7 +63,7 @@ class FilterCacheClient {
// please ensure that 3 recorders need to keep the same segments set, or error will occur in train func
// you can use mutex in compaction and flushing to guarantee this
// then when every long period end, try to retrain a new model or keep last model
void retrain_or_keep_model(std::vector<uint16_t>* const features_nums,
void retrain_or_keep_model(std::vector<uint16_t>* const features_nums_except_level_0,
std::map<uint32_t, uint16_t>* const level_recorder,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder,
std::map<uint32_t, uint32_t>* const unit_size_recorder);
Expand All @@ -79,10 +79,10 @@ class FilterCacheClient {
void make_adjustment();

// batch insert segments into filter cache manager
void batch_insert_segments(std::vector<uint32_t>* const merged_segment_ids, std::vector<uint32_t>* const new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>>* const inherit_infos_recorder,
std::map<uint32_t, uint16_t>* const level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>>* const segment_ranges_recorder);
void batch_insert_segments(std::vector<uint32_t> merged_segment_ids, std::vector<uint32_t> new_segment_ids,
std::map<uint32_t, std::unordered_map<uint32_t, double>> inherit_infos_recorder,
std::map<uint32_t, uint16_t> level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>> segment_ranges_recorder);
};

}
4 changes: 3 additions & 1 deletion db/art/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ namespace ROCKSDB_NAMESPACE {
// also we need to multiple key range rate by RATE_SIGNIFICANT_DIGITS_FACTOR
#define HOTNESS_SIGNIFICANT_DIGITS_FACTOR 1e6
#define RATE_SIGNIFICANT_DIGITS_FACTOR 1e3
// model feature num max limit : 3 * 30 + 1
// model feature num max limit : 2 * 45 + 1
#define MAX_FEATURES_NUM 91

// config micro connecting to LightGBM server
Expand Down Expand Up @@ -205,6 +205,8 @@ namespace ROCKSDB_NAMESPACE {
#define READY_RATE 0.70
// default init L0 counts
#define INIT_LEVEL_0_COUNT 0
// default size of one filter unit (bits)
#define DEFAULT_UNIT_SIZE 0

// filter cache client background threads num
#define FILTER_CACHE_THREADS_NUM 10
Expand Down
21 changes: 20 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,26 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
period_cnt_ = 0;
last_train_period_ = 0;
*/
segment_info_recorder_ = new std::unordered_map<uint32_t, std::vector<std::string>>;
level_recorder_ = new std::map<uint32_t, uint16_t>;
level_0_base_count_ = 0;

features_nums_except_level_0_ = new std::vector<uint16_t>;
uint16_t features_num = MAX_FEATURES_NUM;
if (features_num > 0) {
features_nums_except_level_0_->emplace_back(features_num);
}

segment_ranges_recorder_ = new std::map<uint32_t, std::vector<RangeRatePair>>;

unit_size_recorder_ = new std::map<uint32_t, uint32_t>;

filter_cache_.retrain_or_keep_model(features_nums_except_level_0_,
level_recorder_,
segment_ranges_recorder_,
unit_size_recorder_);
filter_cache_.make_adjustment();
#endif

// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
Expand Down Expand Up @@ -1758,6 +1776,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
// WaLSM+: add hotness estimating
#ifdef ART
std::string art_key(key.data(), key.size());
filter_cache_.get_updating_work(art_key);
#ifdef ART_PLUS
// ready to estimate hotness, update heat buckets
/*
Expand Down
34 changes: 34 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1909,6 +1909,40 @@ class DBImpl : public DB {
// TODO: add necessary filter cache info structures
FilterCacheClient filter_cache_; // already contain FilterCacheManager

// segment_info_recorder save every segments' min key and max key
// but we only need to pass empty segment_info_recorder now
// TODO: it should contain all levels segments' min key and max key, then pass to filter cache client
// this recorder will help decide the key ranges' num, but it dont work in current work
// you can try to modify macro DEFAULT_BUCKETS_NUM to decide the key ranges' num
std::unordered_map<uint32_t, std::vector<std::string>>* segment_info_recorder_;

// record every alive segments' level
// TODO: need to be latest all the time
std::map<uint32_t, uint16_t>* level_recorder_;

// record features num of every segments
// we choose max features num to define model feature num
// if you want to use a default features num, set MAX_FEATURES_NUM to non-zero value
// then do not insert any entry into this vector later
// TODO: we dont use this vector, so we set MAX_FEATURES_NUM to non-zero value
std::vector<uint16_t>* features_nums_except_level_0_;

// should be based level 0 visit cnt in a total long period
// simply we set level_0_base_count to 0, and use macro INIT_LEVEL_0_COUNT
// we can set this macro to ( PERIOD_COUNT * TRAIN_PERIODS ) * ( level 0 sorted runs num ) / ( max level 0 segments num )
// TODO: modify INIT_LEVEL_0_COUNT to proper value
uint32_t level_0_base_count_;

// record interacting ranges and their rates of alive segments
// TODO: should be latest all the time
std::map<uint32_t, std::vector<RangeRatePair>>* segment_ranges_recorder_;

// every segment's filter unit size is the same
// this recorder should hold all alive segment
// simply, you can also use default macro DEFAULT_UNIT_SIZE for all segments, just leave this recorder empty
// TODO: modify DEFAULT_UNIT_SIZE
std::map<uint32_t, uint32_t>* unit_size_recorder_;

/*
HeatBuckets heat_buckets_;

Expand Down
9 changes: 3 additions & 6 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
// WaLSM+: first sample put keys into pool, then generate key ranges for computing hotness
#ifdef ART_PLUS
// heat_buckets not ready, still sample into pool
/*
if (!heat_buckets_.is_ready()) {
std::string art_key(key.data(), key.size());
heat_buckets_.sample(art_key, segments_info_);
}
*/
// if ready, prepare func auto return and do nothing
std::string art_key(key.data(), key.size());
filter_cache_.prepare_heat_buckets(art_key, segment_info_recorder_);
#endif
return DB::Put(o, column_family, key, val);
}
Expand Down