-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable prefix bloom filter by default #2860
Changes from all commits
71774d8
aa6dfc1
06c015d
5a48e98
64bb912
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,6 @@ | |
#include "common/fs/FileUtils.h" | ||
#include "common/utils/NebulaKeyUtils.h" | ||
#include "kvstore/KVStore.h" | ||
#include "kvstore/RocksEngineConfig.h" | ||
|
||
DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset"); | ||
|
||
|
@@ -126,6 +125,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId, | |
CHECK(status.ok()) << status.ToString(); | ||
db_.reset(db); | ||
partsNum_ = allParts().size(); | ||
extractorLen_ = sizeof(PartitionID) + vIdLen; | ||
LOG(INFO) << "open rocksdb on " << path; | ||
|
||
backup(); | ||
|
@@ -202,7 +202,7 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, | |
const std::string& end, | ||
std::unique_ptr<KVIterator>* storageIter) { | ||
rocksdb::ReadOptions options; | ||
options.total_order_seek = true; | ||
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; | ||
rocksdb::Iterator* iter = db_->NewIterator(options); | ||
if (iter) { | ||
iter->Seek(rocksdb::Slice(start)); | ||
|
@@ -213,7 +213,32 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, | |
|
||
nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix, | ||
std::unique_ptr<KVIterator>* storageIter) { | ||
// In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make | ||
// sure the prefix bloom filter exists. But this is quite error-proning, so we do a check here. | ||
if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) { | ||
return prefixWithExtractor(prefix, storageIter); | ||
} else { | ||
return prefixWithoutExtractor(prefix, storageIter); | ||
} | ||
} | ||
|
||
nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix, | ||
std::unique_ptr<KVIterator>* storageIter) { | ||
rocksdb::ReadOptions options; | ||
options.prefix_same_as_start = true; | ||
rocksdb::Iterator* iter = db_->NewIterator(options); | ||
if (iter) { | ||
iter->Seek(rocksdb::Slice(prefix)); | ||
} | ||
storageIter->reset(new RocksPrefixIter(iter, prefix)); | ||
return nebula::cpp2::ErrorCode::SUCCEEDED; | ||
} | ||
|
||
nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor( | ||
const std::string& prefix, std::unique_ptr<KVIterator>* storageIter) { | ||
rocksdb::ReadOptions options; | ||
// prefix_same_as_start is false by default | ||
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
rocksdb::Iterator* iter = db_->NewIterator(options); | ||
if (iter) { | ||
iter->Seek(rocksdb::Slice(prefix)); | ||
|
@@ -226,6 +251,8 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start, | |
const std::string& prefix, | ||
std::unique_ptr<KVIterator>* storageIter) { | ||
rocksdb::ReadOptions options; | ||
// prefix_same_as_start is false by default | ||
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; | ||
rocksdb::Iterator* iter = db_->NewIterator(options); | ||
if (iter) { | ||
iter->Seek(rocksdb::Slice(start)); | ||
|
@@ -397,6 +424,16 @@ nebula::cpp2::ErrorCode RocksEngine::setDBOption(const std::string& configKey, | |
} | ||
} | ||
|
||
ErrorOr<nebula::cpp2::ErrorCode, std::string> RocksEngine::getProperty( | ||
const std::string& property) { | ||
std::string value; | ||
if (!db_->GetProperty(property, &value)) { | ||
return nebula::cpp2::ErrorCode::E_INVALID_PARM; | ||
} else { | ||
return value; | ||
} | ||
} | ||
|
||
nebula::cpp2::ErrorCode RocksEngine::compact() { | ||
rocksdb::CompactRangeOptions options; | ||
options.change_level = FLAGS_rocksdb_compact_change_level; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,7 +51,7 @@ DEFINE_int64(rocksdb_block_cache, | |
1024, | ||
"The default block cache size used in BlockBasedTable. The unit is MB"); | ||
|
||
DEFINE_int32(row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache"); | ||
DEFINE_int32(rocksdb_row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache"); | ||
|
||
DEFINE_int32(cache_bucket_exp, 8, "Total buckets number is 1 << cache_bucket_exp"); | ||
|
||
|
@@ -78,15 +78,13 @@ DEFINE_int32(rocksdb_rate_limit, | |
0, | ||
"write limit in bytes per sec. The unit is MB. 0 means unlimited."); | ||
|
||
DEFINE_bool(enable_rocksdb_prefix_filtering, | ||
DEFINE_bool(enable_rocksdb_whole_key_filtering, | ||
false, | ||
"Whether or not to enable rocksdb's whole key bloom filter"); | ||
|
||
DEFINE_bool(enable_rocksdb_prefix_filtering, | ||
true, | ||
"Whether or not to enable rocksdb's prefix bloom filter."); | ||
DEFINE_bool(rocksdb_prefix_bloom_filter_length_flag, | ||
false, | ||
"If true, prefix bloom filter will be sizeof(PartitionID) + vidLen + " | ||
"sizeof(EdgeType). " | ||
"If false, prefix bloom filter will be sizeof(PartitionID) + vidLen. "); | ||
DEFINE_int32(rocksdb_plain_table_prefix_length, 4, "PlainTable prefix size"); | ||
|
||
DEFINE_bool(rocksdb_compact_change_level, | ||
true, | ||
|
@@ -118,34 +116,6 @@ DEFINE_int32(rocksdb_backup_interval_secs, | |
namespace nebula { | ||
namespace kvstore { | ||
|
||
class GraphPrefixTransform : public rocksdb::SliceTransform { | ||
private: | ||
size_t prefixLen_; | ||
std::string name_; | ||
|
||
public: | ||
explicit GraphPrefixTransform(size_t prefixLen) | ||
: prefixLen_(prefixLen), name_("nebula.GraphPrefix." + std::to_string(prefixLen_)) {} | ||
|
||
const char* Name() const override { return name_.c_str(); } | ||
|
||
rocksdb::Slice Transform(const rocksdb::Slice& src) const override { | ||
return rocksdb::Slice(src.data(), prefixLen_); | ||
} | ||
|
||
bool InDomain(const rocksdb::Slice& key) const override { | ||
if (key.size() < prefixLen_) { | ||
return false; | ||
} | ||
// And we should not use NebulaKeyUtils::isVertex or isEdge here, because it | ||
// will regard the prefix itself not in domain since its length does not | ||
// satisfy | ||
constexpr int32_t len = static_cast<int32_t>(sizeof(NebulaKeyType)); | ||
auto type = static_cast<NebulaKeyType>(readInt<uint32_t>(key.data(), len) & kTypeMask); | ||
return type == NebulaKeyType::kEdge || type == NebulaKeyType::kVertex; | ||
} | ||
}; | ||
|
||
static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) { | ||
static std::unordered_map<std::string, rocksdb::CompressionType> m = { | ||
{"no", rocksdb::kNoCompression}, | ||
|
@@ -256,10 +226,8 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, | |
baseOpts.rate_limiter = rate_limiter; | ||
} | ||
|
||
size_t prefixLength = sizeof(PartitionID) + vidLen; | ||
if (FLAGS_rocksdb_table_format == "BlockBasedTable") { | ||
size_t prefixLength = FLAGS_rocksdb_prefix_bloom_filter_length_flag | ||
? sizeof(PartitionID) + vidLen + sizeof(EdgeType) | ||
: sizeof(PartitionID) + vidLen; | ||
// BlockBasedTableOptions | ||
std::unordered_map<std::string, std::string> bbtOptsMap; | ||
if (!loadOptionsMap(bbtOptsMap, FLAGS_rocksdb_block_based_table_options)) { | ||
|
@@ -279,9 +247,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, | |
bbtOpts.block_cache = blockCache; | ||
} | ||
|
||
if (FLAGS_row_cache_num) { | ||
if (FLAGS_rocksdb_row_cache_num) { | ||
static std::shared_ptr<rocksdb::Cache> rowCache = | ||
rocksdb::NewLRUCache(FLAGS_row_cache_num, FLAGS_cache_bucket_exp); | ||
rocksdb::NewLRUCache(FLAGS_rocksdb_row_cache_num, FLAGS_cache_bucket_exp); | ||
baseOpts.row_cache = rowCache; | ||
} | ||
|
||
|
@@ -296,8 +264,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, | |
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel; | ||
} | ||
if (FLAGS_enable_rocksdb_prefix_filtering) { | ||
baseOpts.prefix_extractor.reset(new GraphPrefixTransform(prefixLength)); | ||
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength)); | ||
} | ||
bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have a question: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both of them could used at the same time. |
||
baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts)); | ||
baseOpts.create_if_missing = true; | ||
} else if (FLAGS_rocksdb_table_format == "PlainTable") { | ||
|
@@ -308,8 +277,10 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, | |
// by default. WAL_ttl_seconds and rocksdb_backup_interval_secs need to be | ||
// modify together if necessary | ||
FLAGS_rocksdb_disable_wal = false; | ||
baseOpts.prefix_extractor.reset( | ||
rocksdb::NewCappedPrefixTransform(FLAGS_rocksdb_plain_table_prefix_length)); | ||
if (!FLAGS_enable_rocksdb_prefix_filtering) { | ||
return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter"); | ||
} | ||
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength)); | ||
baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory()); | ||
baseOpts.create_if_missing = true; | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice