Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable prefix bloom filter by default #2860

Merged
merged 5 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@
# * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable rocksdb's prefix bloom filter, enabled by default.
--enable_rocksdb_prefix_filtering=true
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
Expand Down
6 changes: 4 additions & 2 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@
# * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false
# Whether or not to enable rocksdb's prefix bloom filter, enabled by default.
--enable_rocksdb_prefix_filtering=true
# Whether or not to enable rocksdb's whole key bloom filter, disabled by default.
--enable_rocksdb_whole_key_filtering=false

############### misc ####################
--snapshot_part_rate_limit=8388608
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class KVEngine {
virtual nebula::cpp2::ErrorCode setDBOption(const std::string& configKey,
const std::string& configValue) = 0;

// Get DB Property
virtual ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(
const std::string& property) = 0;

virtual nebula::cpp2::ErrorCode compact() = 0;

virtual nebula::cpp2::ErrorCode flush() = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class KVStore {

virtual std::vector<std::string> getDataRoot() const = 0;

virtual ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(
GraphSpaceID spaceId, const std::string& property) = 0;

protected:
KVStore() = default;
};
Expand Down
13 changes: 7 additions & 6 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "kvstore/RateLimiter.h"

DEFINE_uint32(snapshot_part_rate_limit,
1024 * 1024 * 2,
1024 * 1024 * 8,
"max bytes of pulling snapshot for each partition in one second");
DEFINE_uint32(snapshot_batch_size, 1024 * 512, "batch size for snapshot, in bytes");

Expand All @@ -22,16 +22,15 @@ const int32_t kReserveNum = 1024 * 4;

NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
// Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit.
// So by default, the total send rate is limited to 4 * 2Mb = 8Mb.
// So by default, the total send rate is limited to 4 * 8Mb = 32Mb.
LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit
<< " for each part";
<< " for each part by default";
}

void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>(FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_part_rate_limit);
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
std::vector<std::string> data;
Expand Down Expand Up @@ -74,7 +73,9 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
size_t batchSize = 0;
while (iter && iter->valid()) {
if (batchSize >= FLAGS_snapshot_batch_size) {
rateLimiter->consume(batchSize);
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
Expand Down
21 changes: 21 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,5 +1168,26 @@ nebula::cpp2::ErrorCode NebulaStore::multiPutWithoutReplicator(GraphSpaceID spac
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

ErrorOr<nebula::cpp2::ErrorCode, std::string> NebulaStore::getProperty(
GraphSpaceID spaceId, const std::string& property) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
LOG(ERROR) << "Get Space " << spaceId << " Failed";
return error(spaceRet);
}
auto space = nebula::value(spaceRet);

folly::dynamic obj = folly::dynamic::object;
for (size_t i = 0; i < space->engines_.size(); i++) {
auto val = space->engines_[i]->getProperty(property);
if (!ok(val)) {
return error(val);
}
auto eng = folly::stringPrintf("Engine %zu", i);
obj[eng] = std::move(value(val));
Comment on lines +1186 to +1187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

}
return folly::toJson(obj);
}

} // namespace kvstore
} // namespace nebula
3 changes: 3 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ class NebulaStore : public KVStore, public Handler {
nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) override;

ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(GraphSpaceID spaceId,
const std::string& property) override;

private:
void loadPartFromDataPath();

Expand Down
15 changes: 6 additions & 9 deletions src/kvstore/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,29 @@ namespace kvstore {
// For now, there are two major cases: snapshot (both for balance or catch up) and rebuild index.
class RateLimiter {
public:
RateLimiter(int32_t rate, int32_t burstSize)
: rate_(static_cast<double>(rate)), burstSize_(static_cast<double>(burstSize)) {
RateLimiter() {
// token will be available after 1 second, to prevent speed spike at the beginning
auto now = time::WallClock::fastNowInSec();
int64_t waitInSec = FLAGS_skip_wait_in_rate_limiter ? 0 : 1;
bucket_.reset(new folly::TokenBucket(rate_, burstSize_, static_cast<double>(now + waitInSec)));
bucket_.reset(new folly::DynamicTokenBucket(static_cast<double>(now + waitInSec)));
}

// Caller must make sure the **the parition has been add, and won't be removed during consume.**
// Snaphot and rebuild index follow this principle by design.
void consume(size_t toConsume) {
if (toConsume > burstSize_) {
void consume(double toConsume, double rate, double burstSize) {
if (toConsume > burstSize) {
// consumeWithBorrowAndWait do nothing when toConsume > burstSize_, we sleep 1s instead
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
// If there are enouth tokens, consume and return immediately.
// If not, cosume anyway, but sleep enough time before return.
auto now = time::WallClock::fastNowInSec();
bucket_->consumeWithBorrowAndWait(static_cast<double>(toConsume), static_cast<double>(now));
bucket_->consumeWithBorrowAndWait(toConsume, rate, burstSize, static_cast<double>(now));
}
}

private:
std::unique_ptr<folly::TokenBucket> bucket_;
double rate_{1 << 20};
double burstSize_{1 << 20};
std::unique_ptr<folly::DynamicTokenBucket> bucket_;
};

} // namespace kvstore
Expand Down
41 changes: 39 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "common/fs/FileUtils.h"
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/KVStore.h"
#include "kvstore/RocksEngineConfig.h"

DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset");

Expand Down Expand Up @@ -126,6 +125,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
CHECK(status.ok()) << status.ToString();
db_.reset(db);
partsNum_ = allParts().size();
extractorLen_ = sizeof(PartitionID) + vIdLen;
LOG(INFO) << "open rocksdb on " << path;

backup();
Expand Down Expand Up @@ -202,7 +202,7 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
const std::string& end,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.total_order_seek = true;
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand All @@ -213,7 +213,32 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,

nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
// In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make
// sure the prefix bloom filter exists. But this is quite error-proning, so we do a check here.
if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) {
return prefixWithExtractor(prefix, storageIter);
} else {
return prefixWithoutExtractor(prefix, storageIter);
}
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.prefix_same_as_start = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
}
storageIter->reset(new RocksPrefixIter(iter, prefix));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor(
const std::string& prefix, std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(prefix));
Expand All @@ -226,6 +251,8 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
iter->Seek(rocksdb::Slice(start));
Expand Down Expand Up @@ -397,6 +424,16 @@ nebula::cpp2::ErrorCode RocksEngine::setDBOption(const std::string& configKey,
}
}

ErrorOr<nebula::cpp2::ErrorCode, std::string> RocksEngine::getProperty(
const std::string& property) {
std::string value;
if (!db_->GetProperty(property, &value)) {
return nebula::cpp2::ErrorCode::E_INVALID_PARM;
} else {
return value;
}
}

nebula::cpp2::ErrorCode RocksEngine::compact() {
rocksdb::CompactRangeOptions options;
options.change_level = FLAGS_rocksdb_compact_change_level;
Expand Down
10 changes: 10 additions & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/base/Base.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVIterator.h"
#include "kvstore/RocksEngineConfig.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -128,6 +129,12 @@ class RocksEngine : public KVEngine {
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter);

nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter);

/*********************
* Data modification
********************/
Expand Down Expand Up @@ -161,6 +168,8 @@ class RocksEngine : public KVEngine {
nebula::cpp2::ErrorCode setDBOption(const std::string& configKey,
const std::string& configValue) override;

ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(const std::string& property) override;

nebula::cpp2::ErrorCode compact() override;

nebula::cpp2::ErrorCode flush() override;
Expand Down Expand Up @@ -190,6 +199,7 @@ class RocksEngine : public KVEngine {
std::string backupPath_;
std::unique_ptr<rocksdb::BackupEngine> backupDb_{nullptr};
int32_t partsNum_ = -1;
size_t extractorLen_;
};

} // namespace kvstore
Expand Down
59 changes: 15 additions & 44 deletions src/kvstore/RocksEngineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ DEFINE_int64(rocksdb_block_cache,
1024,
"The default block cache size used in BlockBasedTable. The unit is MB");

DEFINE_int32(row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");
DEFINE_int32(rocksdb_row_cache_num, 16 * 1000 * 1000, "Total keys inside the cache");

DEFINE_int32(cache_bucket_exp, 8, "Total buckets number is 1 << cache_bucket_exp");

Expand All @@ -78,15 +78,13 @@ DEFINE_int32(rocksdb_rate_limit,
0,
"write limit in bytes per sec. The unit is MB. 0 means unlimited.");

DEFINE_bool(enable_rocksdb_prefix_filtering,
DEFINE_bool(enable_rocksdb_whole_key_filtering,
false,
"Whether or not to enable rocksdb's whole key bloom filter");

DEFINE_bool(enable_rocksdb_prefix_filtering,
true,
"Whether or not to enable rocksdb's prefix bloom filter.");
DEFINE_bool(rocksdb_prefix_bloom_filter_length_flag,
false,
"If true, prefix bloom filter will be sizeof(PartitionID) + vidLen + "
"sizeof(EdgeType). "
"If false, prefix bloom filter will be sizeof(PartitionID) + vidLen. ");
DEFINE_int32(rocksdb_plain_table_prefix_length, 4, "PlainTable prefix size");

DEFINE_bool(rocksdb_compact_change_level,
true,
Expand Down Expand Up @@ -118,34 +116,6 @@ DEFINE_int32(rocksdb_backup_interval_secs,
namespace nebula {
namespace kvstore {

class GraphPrefixTransform : public rocksdb::SliceTransform {
private:
size_t prefixLen_;
std::string name_;

public:
explicit GraphPrefixTransform(size_t prefixLen)
: prefixLen_(prefixLen), name_("nebula.GraphPrefix." + std::to_string(prefixLen_)) {}

const char* Name() const override { return name_.c_str(); }

rocksdb::Slice Transform(const rocksdb::Slice& src) const override {
return rocksdb::Slice(src.data(), prefixLen_);
}

bool InDomain(const rocksdb::Slice& key) const override {
if (key.size() < prefixLen_) {
return false;
}
// And we should not use NebulaKeyUtils::isVertex or isEdge here, because it
// will regard the prefix itself not in domain since its length does not
// satisfy
constexpr int32_t len = static_cast<int32_t>(sizeof(NebulaKeyType));
auto type = static_cast<NebulaKeyType>(readInt<uint32_t>(key.data(), len) & kTypeMask);
return type == NebulaKeyType::kEdge || type == NebulaKeyType::kVertex;
}
};

static rocksdb::Status initRocksdbCompression(rocksdb::Options& baseOpts) {
static std::unordered_map<std::string, rocksdb::CompressionType> m = {
{"no", rocksdb::kNoCompression},
Expand Down Expand Up @@ -256,10 +226,8 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.rate_limiter = rate_limiter;
}

size_t prefixLength = sizeof(PartitionID) + vidLen;
if (FLAGS_rocksdb_table_format == "BlockBasedTable") {
size_t prefixLength = FLAGS_rocksdb_prefix_bloom_filter_length_flag
? sizeof(PartitionID) + vidLen + sizeof(EdgeType)
: sizeof(PartitionID) + vidLen;
// BlockBasedTableOptions
std::unordered_map<std::string, std::string> bbtOptsMap;
if (!loadOptionsMap(bbtOptsMap, FLAGS_rocksdb_block_based_table_options)) {
Expand All @@ -279,9 +247,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
bbtOpts.block_cache = blockCache;
}

if (FLAGS_row_cache_num) {
if (FLAGS_rocksdb_row_cache_num) {
static std::shared_ptr<rocksdb::Cache> rowCache =
rocksdb::NewLRUCache(FLAGS_row_cache_num, FLAGS_cache_bucket_exp);
rocksdb::NewLRUCache(FLAGS_rocksdb_row_cache_num, FLAGS_cache_bucket_exp);
baseOpts.row_cache = rowCache;
}

Expand All @@ -296,8 +264,9 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
baseOpts.compaction_style == rocksdb::CompactionStyle::kCompactionStyleLevel;
}
if (FLAGS_enable_rocksdb_prefix_filtering) {
baseOpts.prefix_extractor.reset(new GraphPrefixTransform(prefixLength));
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
}
bbtOpts.whole_key_filtering = FLAGS_enable_rocksdb_whole_key_filtering;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a question:
When enable_rocksdb_prefix_filtering and enable_rocksdb_whole_key_filterin are both true,
which one is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of them could used at the same time.

baseOpts.table_factory.reset(NewBlockBasedTableFactory(bbtOpts));
baseOpts.create_if_missing = true;
} else if (FLAGS_rocksdb_table_format == "PlainTable") {
Expand All @@ -308,8 +277,10 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts,
// by default. WAL_ttl_seconds and rocksdb_backup_interval_secs need to be
// modify together if necessary
FLAGS_rocksdb_disable_wal = false;
baseOpts.prefix_extractor.reset(
rocksdb::NewCappedPrefixTransform(FLAGS_rocksdb_plain_table_prefix_length));
if (!FLAGS_enable_rocksdb_prefix_filtering) {
return rocksdb::Status::InvalidArgument("PlainTable should use prefix bloom filter");
}
baseOpts.prefix_extractor.reset(rocksdb::NewCappedPrefixTransform(prefixLength));
baseOpts.table_factory.reset(rocksdb::NewPlainTableFactory());
baseOpts.create_if_missing = true;
} else {
Expand Down
Loading