From 458122f3a30def3037831a6ac6932bf8923e67ee Mon Sep 17 00:00:00 2001 From: Guo Teng Date: Sun, 15 Sep 2024 02:44:08 +0800 Subject: [PATCH] add some information structures --- db/art/filter_cache.cc | 15 +++++++-- db/art/filter_cache_client.cc | 60 +++++++++++++++++++++++------------ db/art/filter_cache_client.h | 20 ++++++------ db/art/macros.h | 4 ++- db/db_impl/db_impl.cc | 21 +++++++++++- db/db_impl/db_impl.h | 34 ++++++++++++++++++++ db/db_impl/db_impl_write.cc | 9 ++---- 7 files changed, 122 insertions(+), 41 deletions(-) diff --git a/db/art/filter_cache.cc b/db/art/filter_cache.cc index 85dff016a..6d71cbfb6 100644 --- a/db/art/filter_cache.cc +++ b/db/art/filter_cache.cc @@ -242,7 +242,7 @@ void FilterCacheManager::inherit_count_recorder(std::vector& merged_se for (uint32_t& new_segment_id : new_segment_ids) { auto last_it = last_count_recorder_.find(new_segment_id); - uint32_t new_last_count = level_0_base_count; + uint32_t new_last_count = level_0_base_count; // level 0 segments init if (new_last_count_recorder.count(new_segment_id) > 0) { new_last_count = new_last_count_recorder[new_segment_id]; } @@ -253,7 +253,7 @@ void FilterCacheManager::inherit_count_recorder(std::vector& merged_se } auto current_it = current_count_recorder_.find(new_segment_id); - uint32_t new_current_count = level_0_base_count; + uint32_t new_current_count = level_0_base_count; // level 0 segments init if (new_current_count_recorder.count(new_segment_id) > 0) { new_current_count = new_current_count_recorder[new_segment_id]; } @@ -297,7 +297,7 @@ void FilterCacheManager::try_retrain_model(std::map& level_r // we should guarantee these 3 external recorder share the same keys set // we need to do this job outside FilterCacheManager assert(level_recorder.size() == segment_ranges_recorder.size()); - assert(level_recorder.size() == unit_size_recorder.size()); + // assert(level_recorder.size() == unit_size_recorder.size()); if (train_signal_ == false) { return; } @@ -305,6 +305,7 @@ void FilterCacheManager::try_retrain_model(std::map& level_r // solve programming problem std::map label_recorder; std::map algo_infos; + /* auto get_cnt_it = last_count_recorder_.begin(); auto unit_size_it = unit_size_recorder.begin(); while (unit_size_it != unit_size_recorder.end() && get_cnt_it != last_count_recorder_.end()) { @@ -318,6 +319,14 @@ void FilterCacheManager::try_retrain_model(std::map& level_r } } greedy_algo_.solve(algo_infos, label_recorder, filter_cache_.cache_size_except_level_0()); + */ + assert(unit_size_recorder.size() == 0); + auto get_cnt_it = last_count_recorder_.begin(); + while (get_cnt_it != last_count_recorder_.end()) { + algo_infos.insert(std::make_pair(get_cnt_it->first, SegmentAlgoInfo(get_cnt_it->second, DEFAULT_UNIT_SIZE))); + get_cnt_it ++; + } + greedy_algo_.solve(algo_infos, label_recorder, filter_cache_.cache_size_except_level_0()); // programming problem may include some merged segments, we need to ignore them auto old_level_it = level_recorder.begin(); diff --git a/db/art/filter_cache_client.cc b/db/art/filter_cache_client.cc index ac77d0b0b..f2e053412 100644 --- a/db/art/filter_cache_client.cc +++ b/db/art/filter_cache_client.cc @@ -14,39 +14,59 @@ bool FilterCacheClient::prepare_heat_buckets(const std::string& key, std::unorde heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready(); if (!heat_buckets_ready_) { // if heat_buckets_ready_ false - pool_.submit_detach(do_prepare_heat_buckets, key, segment_info_recorder); + assert(segment_info_recorder->size() == 0); // should always empty heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready(); + if (!heat_buckets_ready_) { + pool_.submit_detach(do_prepare_heat_buckets, key, segment_info_recorder); + heat_buckets_ready_ = filter_cache_manager_.heat_buckets_ready(); + } } return heat_buckets_ready_; } -void FilterCacheClient::do_retrain_or_keep_model(std::vector* const features_nums, +void FilterCacheClient::do_retrain_or_keep_model(std::vector* const features_nums_except_level_0, std::map* const level_recorder, std::map>* const segment_ranges_recorder, std::map* const unit_size_recorder) { + std::map level_copy; + std::map> segment_ranges_copy; // if this func background monitor signal, how can it receive latest argument? input pointer! + while (!filter_cache_manager_.heat_buckets_ready()); while (!filter_cache_manager_.ready_work()); // wait for manager ready assert(filter_cache_manager_.heat_buckets_ready()); // must guarantee that heat buckets ready before we make filter cache manager ready // actually we will load data before we test, so we can ensure that heat buckets ready first - filter_cache_manager_.make_clf_model_ready(*features_nums); - filter_cache_manager_.try_retrain_model(*level_recorder, *segment_ranges_recorder, *unit_size_recorder); - filter_cache_manager_.update_cache_and_heap(*level_recorder, *segment_ranges_recorder); + filter_cache_manager_.make_clf_model_ready(*features_nums_except_level_0); + while (level_recorder->size() != segment_ranges_recorder->size()); + level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder; + if (level_copy.size() != segment_ranges_copy.size()) { + while (level_recorder->size() != segment_ranges_recorder->size()); + level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder; + } + assert(unit_size_recorder->size() == 0); // should be empty, then we use default unit size DEFAULT_UNIT_SIZE + filter_cache_manager_.try_retrain_model(level_copy, segment_ranges_copy, *unit_size_recorder); + filter_cache_manager_.update_cache_and_heap(level_copy, segment_ranges_copy); while (true) { // in one long period while (!filter_cache_manager_.need_retrain()); // wait for long period end - filter_cache_manager_.try_retrain_model(*level_recorder, *segment_ranges_recorder, *unit_size_recorder); - // python lgb_model server will decide on whether keep model or retrain model - filter_cache_manager_.update_cache_and_heap(*level_recorder, *segment_ranges_recorder); + while (level_recorder->size() != segment_ranges_recorder->size()); + level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder; + if (level_copy.size() != segment_ranges_copy.size()) { + while (level_recorder->size() != segment_ranges_recorder->size()); + level_copy = *level_recorder; segment_ranges_copy = *segment_ranges_recorder; + } + assert(unit_size_recorder->size() == 0); // should be empty, then we use default unit size DEFAULT_UNIT_SIZE + filter_cache_manager_.try_retrain_model(level_copy, segment_ranges_copy, *unit_size_recorder); + filter_cache_manager_.update_cache_and_heap(level_copy, segment_ranges_copy); } // this loop never end } -void FilterCacheClient::retrain_or_keep_model(std::vector* const features_nums, +void FilterCacheClient::retrain_or_keep_model(std::vector* const features_nums_except_level_0, std::map* const level_recorder, std::map>* const segment_ranges_recorder, std::map* const unit_size_recorder) { - pool_.submit_detach(do_retrain_or_keep_model, features_nums, level_recorder, segment_ranges_recorder, unit_size_recorder); + pool_.submit_detach(do_retrain_or_keep_model, features_nums_except_level_0, level_recorder, segment_ranges_recorder, unit_size_recorder); // if first model training not end, python lgb_model server still return default units num // then retrain model when every long period end. if model still work well, keep this model instead // no need to return any value @@ -81,18 +101,18 @@ void FilterCacheClient::make_adjustment() { pool_.submit_detach(do_make_adjustment); } -void FilterCacheClient::do_batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, - std::map>* const inherit_infos_recorder, - std::map* const level_recorder, const uint32_t& level_0_base_count, - std::map>* const segment_ranges_recorder) { - filter_cache_manager_.insert_segments(*merged_segment_ids, *new_segment_ids, *inherit_infos_recorder, - *level_recorder, level_0_base_count, *segment_ranges_recorder); +void FilterCacheClient::do_batch_insert_segments(std::vector merged_segment_ids, std::vector new_segment_ids, + std::map> inherit_infos_recorder, + std::map level_recorder, const uint32_t& level_0_base_count, + std::map> segment_ranges_recorder) { + filter_cache_manager_.insert_segments(merged_segment_ids, new_segment_ids, inherit_infos_recorder, + level_recorder, level_0_base_count, segment_ranges_recorder); } -void FilterCacheClient::batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, - std::map>* const inherit_infos_recorder, - std::map* const level_recorder, const uint32_t& level_0_base_count, - std::map>* const segment_ranges_recorder) { +void FilterCacheClient::batch_insert_segments(std::vector merged_segment_ids, std::vector new_segment_ids, + std::map> inherit_infos_recorder, + std::map level_recorder, const uint32_t& level_0_base_count, + std::map> segment_ranges_recorder) { if (level_0_base_count == 0) { pool_.submit_detach(do_batch_insert_segments, merged_segment_ids, new_segment_ids, inherit_infos_recorder, level_recorder, INIT_LEVEL_0_COUNT, segment_ranges_recorder); } else { diff --git a/db/art/filter_cache_client.h b/db/art/filter_cache_client.h index 52b05663c..aa86016c3 100644 --- a/db/art/filter_cache_client.h +++ b/db/art/filter_cache_client.h @@ -23,7 +23,7 @@ class FilterCacheClient { static void do_prepare_heat_buckets(const std::string& key, std::unordered_map>* const segment_info_recorder); // background thread part of retrain_or_keep_model - static void do_retrain_or_keep_model(std::vector* const features_nums, + static void do_retrain_or_keep_model(std::vector* const features_nums_except_level_0, std::map* const level_recorder, std::map>* const segment_ranges_recorder, std::map* const unit_size_recorder); @@ -38,10 +38,10 @@ class FilterCacheClient { static void do_make_adjustment(); // background thread part of batch_insert_segments - static void do_batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, - std::map>* const inherit_infos_recorder, - std::map* const level_recorder, const uint32_t& level_0_base_count, - std::map>* const segment_ranges_recorder); + static void do_batch_insert_segments(std::vector merged_segment_ids, std::vector new_segment_ids, + std::map> inherit_infos_recorder, + std::map level_recorder, const uint32_t& level_0_base_count, + std::map> segment_ranges_recorder); public: FilterCacheClient() { heat_buckets_ready_ = false; @@ -63,7 +63,7 @@ class FilterCacheClient { // please ensure that 3 recorders need to keep the same segments set, or error will occur in train func // you can use mutex in compaction and flushing to guarantee this // then when every long period end, try to retrain a new model or keep last model - void retrain_or_keep_model(std::vector* const features_nums, + void retrain_or_keep_model(std::vector* const features_nums_except_level_0, std::map* const level_recorder, std::map>* const segment_ranges_recorder, std::map* const unit_size_recorder); @@ -79,10 +79,10 @@ class FilterCacheClient { void make_adjustment(); // batch insert segments into filter cache manager - void batch_insert_segments(std::vector* const merged_segment_ids, std::vector* const new_segment_ids, - std::map>* const inherit_infos_recorder, - std::map* const level_recorder, const uint32_t& level_0_base_count, - std::map>* const segment_ranges_recorder); + void batch_insert_segments(std::vector merged_segment_ids, std::vector new_segment_ids, + std::map> inherit_infos_recorder, + std::map level_recorder, const uint32_t& level_0_base_count, + std::map> segment_ranges_recorder); }; } diff --git a/db/art/macros.h b/db/art/macros.h index 631e27d99..806064717 100644 --- a/db/art/macros.h +++ b/db/art/macros.h @@ -166,7 +166,7 @@ namespace ROCKSDB_NAMESPACE { // also we need to multiple key range rate by RATE_SIGNIFICANT_DIGITS_FACTOR #define HOTNESS_SIGNIFICANT_DIGITS_FACTOR 1e6 #define RATE_SIGNIFICANT_DIGITS_FACTOR 1e3 -// model feature num max limit : 3 * 30 + 1 +// model feature num max limit : 2 * 45 + 1 #define MAX_FEATURES_NUM 91 // config micro connecting to LightGBM server @@ -205,6 +205,8 @@ namespace ROCKSDB_NAMESPACE { #define READY_RATE 0.70 // default init L0 counts #define INIT_LEVEL_0_COUNT 0 +// default size of one filter unit (bits) +#define DEFAULT_UNIT_SIZE 0 // filter cache client background threads num #define FILTER_CACHE_THREADS_NUM 10 diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 94f42c6bc..c91e41c4b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -250,8 +250,26 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, period_cnt_ = 0; last_train_period_ = 0; */ + segment_info_recorder_ = new std::unordered_map>; + level_recorder_ = new std::map; + level_0_base_count_ = 0; + + features_nums_except_level_0_ = new std::vector; + uint16_t features_num = MAX_FEATURES_NUM; + if (features_num > 0) { + features_nums_except_level_0_->emplace_back(features_num); + } + + segment_ranges_recorder_ = new std::map>; + + unit_size_recorder_ = new std::map; + + filter_cache_.retrain_or_keep_model(features_nums_except_level_0_, + level_recorder_, + segment_ranges_recorder_, + unit_size_recorder_); + filter_cache_.make_adjustment(); #endif - // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -1758,6 +1776,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, // WaLSM+: add hotness estimating #ifdef ART std::string art_key(key.data(), key.size()); + filter_cache_.get_updating_work(art_key); #ifdef ART_PLUS // ready to estimate hotness, update heat buckets /* diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 751ab7df2..c009915d9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1909,6 +1909,40 @@ class DBImpl : public DB { // TODO: add necessary filter cache info structures FilterCacheClient filter_cache_; // already contain FilterCacheManager + // segment_info_recorder save every segments' min key and max key + // but we only need to pass empty segment_info_recorder now + // TODO: it should contain all levels segments' min key and max key, then pass to filter cache client + // this recorder will help decide the key ranges' num, but it dont work in current work + // you can try to modify macro DEFAULT_BUCKETS_NUM to decide the key ranges' num + std::unordered_map>* segment_info_recorder_; + + // record every alive segments' level + // TODO: need to be latest all the time + std::map* level_recorder_; + + // record features num of every segments + // we choose max features num to define model feature num + // if you want to use a default features num, set MAX_FEATURES_NUM to non-zero value + // then do not insert any entry into this vector later + // TODO: we dont use this vector, so we set MAX_FEATURES_NUM to non-zero value + std::vector* features_nums_except_level_0_; + + // should be based level 0 visit cnt in a total long period + // simply we set level_0_base_count to 0, and use macro INIT_LEVEL_0_COUNT + // we can set this macro to ( PERIOD_COUNT * TRAIN_PERIODS ) * ( level 0 sorted runs num ) / ( max level 0 segments num ) + // TODO: modify INIT_LEVEL_0_COUNT to proper value + uint32_t level_0_base_count_; + + // record interacting ranges and their rates of alive segments + // TODO: should be latest all the time + std::map>* segment_ranges_recorder_; + + // every segment's filter unit size is the same + // this recorder should hold all alive segment + // simply, you can also use default macro DEFAULT_UNIT_SIZE for all segments, just leave this recorder empty + // TODO: modify DEFAULT_UNIT_SIZE + std::map* unit_size_recorder_; + /* HeatBuckets heat_buckets_; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index b7350ea63..7838c62b2 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -23,12 +23,9 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, // WaLSM+: first sample put keys into pool, then generate key ranges for computing hotness #ifdef ART_PLUS // heat_buckets not ready, still sample into pool - /* - if (!heat_buckets_.is_ready()) { - std::string art_key(key.data(), key.size()); - heat_buckets_.sample(art_key, segments_info_); - } - */ + // if ready, prepare func auto return and do nothing + std::string art_key(key.data(), key.size()); + filter_cache_.prepare_heat_buckets(art_key, segment_info_recorder_); #endif return DB::Put(o, column_family, key, val); }