Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wqy dev #19

Open
wants to merge 5 commits into
base: gt-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion db/art/filter_cache.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <cstdint>
#include <iostream>
#include <fstream>
#include <mutex>
Expand All @@ -8,6 +9,7 @@
#include <map>
#include <set>
#include <unordered_map>
#include "db/version_edit.h"
#include "macros.h"
#include "greedy_algo.h"
#include "clf_model.h"
Expand Down Expand Up @@ -102,10 +104,11 @@ class FilterCacheManager {
static std::map<uint32_t, uint32_t> current_count_recorder_; // get cnt recorder of segments in current long period
static std::mutex count_mutex_; // guarentee last_count_recorder and current_count_recorder treated orderedly
static bool is_ready_; // check whether ready to use adaptive filter assignment
static std::map<uint32_t, FileMetaData*> segment_in_file; // map segment_id to SST file
public:
FilterCacheManager() { get_cnt_ = 0; last_long_period_ = 0; last_short_period_ = 0; train_signal_ = false; }

~FilterCacheManager();
~FilterCacheManager() {}

// one background thread monitor this func, if return true, call try_retrain_model at once, wait for training end, and call update_cache_and_heap
bool need_retrain() { return train_signal_; }
Expand Down Expand Up @@ -243,6 +246,10 @@ class FilterCacheManager {
// return false when we cannot make one adjustment
// one background should exec this func and never stop
bool adjust_cache_and_heap();

// only for test
ColumnFamilyData* cfd = nullptr;
// std::mutex cfd_mutex;
};

}
17 changes: 17 additions & 0 deletions db/art/filter_cache_client.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include "filter_cache_client.h"
#include <iostream>
#include <mutex>
#include <ostream>

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -120,4 +123,18 @@ void FilterCacheClient::batch_insert_segments(std::vector<uint32_t> merged_segme
}
}

void FilterCacheClient::test_cfd(ColumnFamilyData* cfd) {
// std::lock_guard<std::mutex> guard(filter_cache_manager_.cfd_mutex);
if (filter_cache_manager_.cfd == nullptr) {
std::cout << "first time cfd: " << cfd << std::endl;
filter_cache_manager_.cfd = cfd;
} else if (filter_cache_manager_.cfd == cfd) {
// do nothing
} else {
std::cout << "hold cfd: " << filter_cache_manager_.cfd << std::endl;
std::cout << "new cfd: " << cfd << std::endl;
std::cout << "error: cfd replaced" << std::endl;
assert(0);
}
}
}
3 changes: 3 additions & 0 deletions db/art/filter_cache_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class FilterCacheClient {
std::map<uint32_t, std::unordered_map<uint32_t, double>> inherit_infos_recorder,
std::map<uint32_t, uint16_t> level_recorder, const uint32_t& level_0_base_count,
std::map<uint32_t, std::vector<RangeRatePair>> segment_ranges_recorder);

// for test only
void test_cfd(ColumnFamilyData* cfd);
};

}
29 changes: 29 additions & 0 deletions db/art/filter_cache_item.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
#include "filter_cache_item.h"
#include <cstdint>
#include "db/table_cache.h"
#include "table/table_reader.h"

namespace ROCKSDB_NAMESPACE {

// 构造函数,可以初始化成员变量
FilterCacheItem::FilterCacheItem(const uint32_t& segment_id) {

}

// 清理成员变量,避免内存泄漏,如果new了空间,就可能需要在这里清理
FilterCacheItem::~FilterCacheItem() {

}

uint32_t FilterCacheItem::approximate_size() {
// uint32_t sum = 0;
// for (const auto& filter_block : filter_block_data_) {
// sum += filter_block.ApproximateMemoryUsage();
// }
// return sum;
return 0;
}

bool FilterCacheItem::check_key(const std::string& key) {
// TableReader* file_meta_ = TableCache::InitFileTableReader()
}

void FilterCacheItem::enable_units(const uint32_t& units_num) {

}
}
4 changes: 4 additions & 0 deletions db/art/filter_cache_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <cassert>
#include <vector>
#include <map>
#include "db/version_edit.h"
#include "macros.h"
#include "table/block_based/parsed_full_filter_block.h"

namespace ROCKSDB_NAMESPACE {

Expand All @@ -23,6 +25,8 @@ namespace ROCKSDB_NAMESPACE {
// 成员函数需要在filter_cache_item.cc里定义
class FilterCacheItem {
private:
FileMetaData* file_meta_;
// std::vector<ParsedFullFilterBlock> filter_block_data_;
// 这里定义一些必要的成员变量,尽量设置为private
// 可以存handle、segment id等信息
// 允许使用STL类,如vector、map等
Expand Down
1 change: 1 addition & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "db/job_context.h"
#include "db/range_del_aggregator.h"
#include "db/table_properties_collector.h"
#include "db/version_edit.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "file/sst_file_manager_impl.h"
Expand Down
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
get_impl_options.column_family);
auto cfd = cfh->cfd();

filter_cache_.test_cfd(cfd);

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
Expand Down
8 changes: 7 additions & 1 deletion db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/table_cache.h"
#include <cassert>

#include "db/dbformat.h"
#include "db/range_tombstone_fragmenter.h"
Expand Down Expand Up @@ -514,6 +515,11 @@ Status TableCache::Get(FilterCacheClient& filter_cache,
t = GetTableReaderFromHandle(handle);
}
}

BlockBasedTable* block_based_table = nullptr;
block_based_table = static_cast<BlockBasedTable*>(t);
assert(block_based_table != nullptr);

SequenceNumber* max_covering_tombstone_seq =
get_context->max_covering_tombstone_seq();
if (s.ok() && max_covering_tombstone_seq != nullptr &&
Expand All @@ -529,7 +535,7 @@ Status TableCache::Get(FilterCacheClient& filter_cache,
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
// only add filter_cache argument
s = t->Get(filter_cache, options, k, get_context, prefix_extractor, skip_filters);
s = block_based_table->Get(filter_cache, options, k, get_context, prefix_extractor, skip_filters);
get_context->SetReplayLog(nullptr);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
Expand Down
2 changes: 1 addition & 1 deletion table/block_based/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class BlockBasedTable : public TableReader {
Status Get(FilterCacheClient& filter_cache,
const ReadOptions& readOptions, const Slice& key,
GetContext* get_context, const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
bool skip_filters = false);
#endif

// WaLSM+ Note: call FullFilterKeyMayMatch() method in this file
Expand Down
15 changes: 15 additions & 0 deletions table/block_based/filter_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

#include <stddef.h>
#include <stdint.h>
#include <cassert>
#include <memory>
#include <string>
#include <vector>
#include "db/art/filter_cache_client.h"
#include "db/dbformat.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -103,6 +105,19 @@ class FilterBlockReader {
GetContext* get_context,
BlockCacheLookupContext* lookup_context) = 0;

#ifdef ART_PLUS
virtual bool KeyMayMatch(FilterCacheClient& filter_cache,
const Slice& key,
const SliceTransform* prefix_extractor,
uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr,
GetContext* get_context,
BlockCacheLookupContext* lookup_context) {
assert(false);
return false;
}
#endif

virtual void KeysMayMatch(MultiGetRange* range,
const SliceTransform* prefix_extractor,
uint64_t block_offset, const bool no_io,
Expand Down
43 changes: 38 additions & 5 deletions table/block_based/partitioned_filter_block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include "table/block_based/partitioned_filter_block.h"

#include <atomic>
#include <cassert>
#include <cstring>
#include <iostream>
#include <memory>
#include <utility>

Expand All @@ -21,6 +23,7 @@
#include "rocksdb/status.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/format.h"
#include "util/coding.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -114,6 +117,7 @@ void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock(
std::string& index_key = p_index_builder_->GetPartitionKey();
filters.push_back({index_key, filter});
#endif
std::cerr << "keys_added_to_partition = " << keys_per_partition_ << "\n";
keys_added_to_partition_ = 0;
Reset();
}
Expand Down Expand Up @@ -350,6 +354,33 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
return fltr_blk_handle;
}

#ifdef ART_PLUS
std::pair<Slice, BlockHandle> PartitionedFilterBlockReader::GetFilterPartitionKeyAndHandle(
const CachableEntry<Block>& filter_block, const Slice& entry) const {
IndexBlockIter iter;
const Comparator* const segment_id_removing_comparator = table()->get_rep()->segment_id_removing_comparator.get();
Statistics* kNullStats = nullptr;
filter_block.GetValue()->NewIndexIterator(
segment_id_removing_comparator,
table()->get_rep()->get_global_seqno(BlockType::kFilter), &iter,
kNullStats, true /* total_order_seek */, false /* have_first_key */,
index_key_includes_seq(), index_value_is_full());
iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) {
// entry is larger than all the keys. However its prefix might still be
// present in the last partition. If this is called by PrefixMayMatch this
// is necessary for correct behavior. Otherwise it is unnecessary but safe.
// Assuming this is an unlikely case for full key search, the performance
// overhead should be negligible.
iter.SeekToLast();
}
assert(iter.Valid());
Slice fltr_block_key = iter.key();
BlockHandle fltr_blk_handle = iter.value().handle;
return {fltr_block_key, fltr_blk_handle};
}
#endif

// TODO: retrieve filter block from filter cache (WaLSM+)
Status PartitionedFilterBlockReader::GetFilterPartitionBlock(
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle,
Expand Down Expand Up @@ -464,21 +495,23 @@ bool PartitionedFilterBlockReader::MayMatch(
return true;
}

#ifdef ART_PLUS
// find key "0 original_internal key". filter_index=segment_id=0. (WaLSM+)
// segment_id itself is useless in comparison,
// but must be appended otherwise the extracted user key will be incorrect.
std::unique_ptr<const char[]> modified_key_buf;
Slice modified_key =
generate_modified_internal_key(modified_key_buf, *const_ikey_ptr, 0, 0);
auto filter_handle = GetFilterPartitionHandle(filter_block, modified_key);
#else
auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr);
#endif
// auto filter_handle = GetFilterPartitionHandle(filter_block, modified_key);
auto key_and_handle = GetFilterPartitionKeyAndHandle(filter_block, modified_key);
Slice filter_key = key_and_handle.first;
auto filter_handle = key_and_handle.second;
if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
return false;
}

assert(filter_key.size() >= 8); //
// uint32_t segment_id = DecodeFixed32R(filter_key.data() + filter_key);

// TODO: get some filter blocks from the filter cache and check (WaLSM+)
CachableEntry<ParsedFullFilterBlock> filter_partition_block;
s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle,
Expand Down
7 changes: 6 additions & 1 deletion table/block_based/partitioned_filter_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon<Block> {
const Slice& key, const SliceTransform* prefix_extractor,
uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr, GetContext* get_context,
BlockCacheLookupContext* lookup_context);
BlockCacheLookupContext* lookup_context) override;
#endif
// TODO: not used in WaLSM+ Benchmark, meybe used in MultiGet interface ?
void KeysMayMatch(MultiGetRange* range,
Expand All @@ -129,6 +129,11 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon<Block> {
size_t ApproximateMemoryUsage() const override;

private:
#ifdef ART_PLUS
std::pair<Slice, BlockHandle> GetFilterPartitionKeyAndHandle(
const CachableEntry<Block>& filter_block, const Slice& entry) const;
#endif

BlockHandle GetFilterPartitionHandle(const CachableEntry<Block>& filter_block,
const Slice& entry) const;
Status GetFilterPartitionBlock(
Expand Down
6 changes: 4 additions & 2 deletions util/comparator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,13 @@ class SegmentIdRemovingComparatorImpl : public Comparator {

void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
real_comparator->FindShortestSeparator(start, limit);
// real_comparator->FindShortestSeparator(start, limit);
// do nothing to disable the key shortening feature
}

void FindShortSuccessor(std::string* key) const override {
real_comparator->FindShortSuccessor(key);
// real_comparator->FindShortSuccessor(key);
// do nothing to disable the key shortening feature
}

bool IsSameLengthImmediateSuccessor(const Slice& s,
Expand Down