diff --git a/db/art/filter_cache.h b/db/art/filter_cache.h index 13683c07c..dd6fa012e 100644 --- a/db/art/filter_cache.h +++ b/db/art/filter_cache.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -8,6 +9,7 @@ #include #include #include +#include "db/version_edit.h" #include "macros.h" #include "greedy_algo.h" #include "clf_model.h" @@ -102,10 +104,11 @@ class FilterCacheManager { static std::map current_count_recorder_; // get cnt recorder of segments in current long period static std::mutex count_mutex_; // guarentee last_count_recorder and current_count_recorder treated orderedly static bool is_ready_; // check whether ready to use adaptive filter assignment + static std::map segment_in_file; // map segment_id to SST file public: FilterCacheManager() { get_cnt_ = 0; last_long_period_ = 0; last_short_period_ = 0; train_signal_ = false; } - ~FilterCacheManager(); + ~FilterCacheManager() {} // one background thread monitor this func, if return true, call try_retrain_model at once, wait for training end, and call update_cache_and_heap bool need_retrain() { return train_signal_; } @@ -243,6 +246,10 @@ class FilterCacheManager { // return false when we cannot make one adjustment // one background should exec this func and never stop bool adjust_cache_and_heap(); + + // only for test + ColumnFamilyData* cfd = nullptr; + // std::mutex cfd_mutex; }; } \ No newline at end of file diff --git a/db/art/filter_cache_client.cc b/db/art/filter_cache_client.cc index 2d0be7ca2..3fe7f368c 100644 --- a/db/art/filter_cache_client.cc +++ b/db/art/filter_cache_client.cc @@ -1,4 +1,7 @@ #include "filter_cache_client.h" +#include +#include +#include namespace ROCKSDB_NAMESPACE { @@ -120,4 +123,18 @@ void FilterCacheClient::batch_insert_segments(std::vector merged_segme } } +void FilterCacheClient::test_cfd(ColumnFamilyData* cfd) { + // std::lock_guard guard(filter_cache_manager_.cfd_mutex); + if (filter_cache_manager_.cfd == nullptr) { + std::cout << "first time cfd: " << cfd << std::endl; + filter_cache_manager_.cfd = cfd; + } else if (filter_cache_manager_.cfd == cfd) { + // do nothing + } else { + std::cout << "hold cfd: " << filter_cache_manager_.cfd << std::endl; + std::cout << "new cfd: " << cfd << std::endl; + std::cout << "error: cfd replaced" << std::endl; + assert(0); + } +} } \ No newline at end of file diff --git a/db/art/filter_cache_client.h b/db/art/filter_cache_client.h index aa86016c3..a0b488b3b 100644 --- a/db/art/filter_cache_client.h +++ b/db/art/filter_cache_client.h @@ -83,6 +83,9 @@ class FilterCacheClient { std::map> inherit_infos_recorder, std::map level_recorder, const uint32_t& level_0_base_count, std::map> segment_ranges_recorder); + + // for test only + void test_cfd(ColumnFamilyData* cfd); }; } diff --git a/db/art/filter_cache_item.cc b/db/art/filter_cache_item.cc index 6f5cb1163..9d40a8008 100644 --- a/db/art/filter_cache_item.cc +++ b/db/art/filter_cache_item.cc @@ -1,5 +1,34 @@ #include "filter_cache_item.h" +#include +#include "db/table_cache.h" +#include "table/table_reader.h" namespace ROCKSDB_NAMESPACE { + // 构造函数,可以初始化成员变量 + FilterCacheItem::FilterCacheItem(const uint32_t& segment_id) { + + } + + // 清理成员变量,避免内存泄漏,如果new了空间,就可能需要在这里清理 + FilterCacheItem::~FilterCacheItem() { + + } + + uint32_t FilterCacheItem::approximate_size() { + // uint32_t sum = 0; + // for (const auto& filter_block : filter_block_data_) { + // sum += filter_block.ApproximateMemoryUsage(); + // } + // return sum; + return 0; + } + + bool FilterCacheItem::check_key(const std::string& key) { + // TableReader* file_meta_ = TableCache::InitFileTableReader() + } + + void FilterCacheItem::enable_units(const uint32_t& units_num) { + + } } \ No newline at end of file diff --git a/db/art/filter_cache_item.h b/db/art/filter_cache_item.h index 8a591b88c..b87bef116 100644 --- a/db/art/filter_cache_item.h +++ b/db/art/filter_cache_item.h @@ -6,7 +6,9 @@ #include #include #include +#include "db/version_edit.h" #include "macros.h" +#include "table/block_based/parsed_full_filter_block.h" namespace ROCKSDB_NAMESPACE { @@ -23,6 +25,8 @@ namespace ROCKSDB_NAMESPACE { // 成员函数需要在filter_cache_item.cc里定义 class FilterCacheItem { private: + FileMetaData* file_meta_; + // std::vector filter_block_data_; // 这里定义一些必要的成员变量,尽量设置为private // 可以存handle、segment id等信息 // 允许使用STL类,如vector、map等 diff --git a/db/column_family.cc b/db/column_family.cc index d9344f4bb..241b7fc87 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -26,6 +26,7 @@ #include "db/job_context.h" #include "db/range_del_aggregator.h" #include "db/table_properties_collector.h" +#include "db/version_edit.h" #include "db/version_set.h" #include "db/write_controller.h" #include "file/sst_file_manager_impl.h" diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index c91e41c4b..77381e2fa 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1691,6 +1691,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, get_impl_options.column_family); auto cfd = cfh->cfd(); + filter_cache_.test_cfd(cfd); + if (tracer_) { // TODO: This mutex should be removed later, to improve performance when // tracing is enabled. diff --git a/db/table_cache.cc b/db/table_cache.cc index 663ce8a94..6c1865a90 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/table_cache.h" +#include #include "db/dbformat.h" #include "db/range_tombstone_fragmenter.h" @@ -514,6 +515,11 @@ Status TableCache::Get(FilterCacheClient& filter_cache, t = GetTableReaderFromHandle(handle); } } + + BlockBasedTable* block_based_table = nullptr; + block_based_table = static_cast(t); + assert(block_based_table != nullptr); + SequenceNumber* max_covering_tombstone_seq = get_context->max_covering_tombstone_seq(); if (s.ok() && max_covering_tombstone_seq != nullptr && @@ -529,7 +535,7 @@ Status TableCache::Get(FilterCacheClient& filter_cache, if (s.ok()) { get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. // only add filter_cache argument - s = t->Get(filter_cache, options, k, get_context, prefix_extractor, skip_filters); + s = block_based_table->Get(filter_cache, options, k, get_context, prefix_extractor, skip_filters); get_context->SetReplayLog(nullptr); } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { // Couldn't find Table in cache but treat as kFound if no_io set diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index ce7c4ed8a..ffcf08b5a 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -142,7 +142,7 @@ class BlockBasedTable : public TableReader { Status Get(FilterCacheClient& filter_cache, const ReadOptions& readOptions, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, - bool skip_filters = false) override; + bool skip_filters = false); #endif // WaLSM+ Note: call FullFilterKeyMayMatch() method in this file diff --git a/table/block_based/filter_block.h b/table/block_based/filter_block.h index d94c7e606..eb8d4ed88 100644 --- a/table/block_based/filter_block.h +++ b/table/block_based/filter_block.h @@ -20,9 +20,11 @@ #include #include +#include #include #include #include +#include "db/art/filter_cache_client.h" #include "db/dbformat.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" @@ -103,6 +105,19 @@ class FilterBlockReader { GetContext* get_context, BlockCacheLookupContext* lookup_context) = 0; +#ifdef ART_PLUS + virtual bool KeyMayMatch(FilterCacheClient& filter_cache, + const Slice& key, + const SliceTransform* prefix_extractor, + uint64_t block_offset, const bool no_io, + const Slice* const const_ikey_ptr, + GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + assert(false); + return false; + } +#endif + virtual void KeysMayMatch(MultiGetRange* range, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index e077603f4..461478f62 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -6,7 +6,9 @@ #include "table/block_based/partitioned_filter_block.h" #include +#include #include +#include #include #include @@ -21,6 +23,7 @@ #include "rocksdb/status.h" #include "table/block_based/block.h" #include "table/block_based/block_based_table_reader.h" +#include "table/format.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { @@ -114,6 +117,7 @@ void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock( std::string& index_key = p_index_builder_->GetPartitionKey(); filters.push_back({index_key, filter}); #endif + std::cerr << "keys_added_to_partition = " << keys_per_partition_ << "\n"; keys_added_to_partition_ = 0; Reset(); } @@ -350,6 +354,33 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( return fltr_blk_handle; } +#ifdef ART_PLUS +std::pair PartitionedFilterBlockReader::GetFilterPartitionKeyAndHandle( + const CachableEntry& filter_block, const Slice& entry) const { + IndexBlockIter iter; + const Comparator* const segment_id_removing_comparator = table()->get_rep()->segment_id_removing_comparator.get(); + Statistics* kNullStats = nullptr; + filter_block.GetValue()->NewIndexIterator( + segment_id_removing_comparator, + table()->get_rep()->get_global_seqno(BlockType::kFilter), &iter, + kNullStats, true /* total_order_seek */, false /* have_first_key */, + index_key_includes_seq(), index_value_is_full()); + iter.Seek(entry); + if (UNLIKELY(!iter.Valid())) { + // entry is larger than all the keys. However its prefix might still be + // present in the last partition. If this is called by PrefixMayMatch this + // is necessary for correct behavior. Otherwise it is unnecessary but safe. + // Assuming this is an unlikely case for full key search, the performance + // overhead should be negligible. + iter.SeekToLast(); + } + assert(iter.Valid()); + Slice fltr_block_key = iter.key(); + BlockHandle fltr_blk_handle = iter.value().handle; + return {fltr_block_key, fltr_blk_handle}; +} +#endif + // TODO: retrieve filter block from filter cache (WaLSM+) Status PartitionedFilterBlockReader::GetFilterPartitionBlock( FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle, @@ -464,21 +495,23 @@ bool PartitionedFilterBlockReader::MayMatch( return true; } - #ifdef ART_PLUS // find key "0 original_internal key". filter_index=segment_id=0. (WaLSM+) // segment_id itself is useless in comparison, // but must be appended otherwise the extracted user key will be incorrect. std::unique_ptr modified_key_buf; Slice modified_key = generate_modified_internal_key(modified_key_buf, *const_ikey_ptr, 0, 0); - auto filter_handle = GetFilterPartitionHandle(filter_block, modified_key); - #else - auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr); - #endif + // auto filter_handle = GetFilterPartitionHandle(filter_block, modified_key); + auto key_and_handle = GetFilterPartitionKeyAndHandle(filter_block, modified_key); + Slice filter_key = key_and_handle.first; + auto filter_handle = key_and_handle.second; if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range return false; } + assert(filter_key.size() >= 8); // + // uint32_t segment_id = DecodeFixed32R(filter_key.data() + filter_key); + // TODO: get some filter blocks from the filter cache and check (WaLSM+) CachableEntry filter_partition_block; s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle, diff --git a/table/block_based/partitioned_filter_block.h b/table/block_based/partitioned_filter_block.h index 0d970a7a6..138164072 100644 --- a/table/block_based/partitioned_filter_block.h +++ b/table/block_based/partitioned_filter_block.h @@ -106,7 +106,7 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon { const Slice& key, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr, GetContext* get_context, - BlockCacheLookupContext* lookup_context); + BlockCacheLookupContext* lookup_context) override; #endif // TODO: not used in WaLSM+ Benchmark, meybe used in MultiGet interface ? void KeysMayMatch(MultiGetRange* range, @@ -129,6 +129,11 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon { size_t ApproximateMemoryUsage() const override; private: + #ifdef ART_PLUS + std::pair GetFilterPartitionKeyAndHandle( + const CachableEntry& filter_block, const Slice& entry) const; + #endif + BlockHandle GetFilterPartitionHandle(const CachableEntry& filter_block, const Slice& entry) const; Status GetFilterPartitionBlock( diff --git a/util/comparator.cc b/util/comparator.cc index f82a6dd14..b7dfa40f0 100644 --- a/util/comparator.cc +++ b/util/comparator.cc @@ -232,11 +232,13 @@ class SegmentIdRemovingComparatorImpl : public Comparator { void FindShortestSeparator(std::string* start, const Slice& limit) const override { - real_comparator->FindShortestSeparator(start, limit); + // real_comparator->FindShortestSeparator(start, limit); + // do nothing to disable the key shortening feature } void FindShortSuccessor(std::string* key) const override { - real_comparator->FindShortSuccessor(key); + // real_comparator->FindShortSuccessor(key); + // do nothing to disable the key shortening feature } bool IsSameLengthImmediateSuccessor(const Slice& s,