Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs/metaserver: fixed rocksdb storage memory leak caused by unrel… #1388

Merged
merged 1 commit into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ storage.rocksdb.ordered_write_buffer_size=134217728
# rocksdb column family's max_write_buffer_number
# for store dentry and inode's s3chunkinfo list (default: 15)
storage.rocksdb.ordered_max_write_buffer_number=15
# rocksdb block cache(LRU) capacity (default: 128MB)
storage.rocksdb.block_cache_capacity=134217728
# rocksdb block cache(LRU) capacity (default: 512MB)
storage.rocksdb.block_cache_capacity=536870912
# rocksdb memtable prefix bloom size ratio (size=write_buffer_size*memtable_prefix_bloom_size_ratio)
storage.rocksdb.memtable_prefix_bloom_size_ratio=0.1
# if the number of inode's s3chunkinfo exceed the limit_size,
# we will sending its with rpc streaming instead of
# padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB)
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/metaserver/metaserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ void Metaserver::InitStorage() {
LOG_IF(FATAL, !conf_->GetUInt64Value(
"storage.rocksdb.block_cache_capacity",
&storageOptions_.blockCacheCapacity));
LOG_IF(FATAL, !conf_->GetDoubleValue(
"storage.rocksdb.memtable_prefix_bloom_size_ratio",
&storageOptions_.memtablePrefixBloomSizeRatio));
LOG_IF(FATAL, !conf_->GetUInt64Value(
"storage.s3_meta_inside_inode.limit_size",
&storageOptions_.s3MetaLimitSizeInsideInode));
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/metaserver/storage/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ struct StorageOptions {

uint64_t blockCacheCapacity;

double memtablePrefixBloomSizeRatio;

// misc config item
uint64_t s3MetaLimitSizeInsideInode;
};
Expand Down
67 changes: 49 additions & 18 deletions curvefs/src/metaserver/storage/rocksdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,39 +42,66 @@ const std::string RocksDBOptions::kOrderedColumnFamilyName_ = // NOLINT

RocksDBOptions::RocksDBOptions(StorageOptions options) {
// db options
RocksDBStorageComparator cmp;
dbOptions_.comparator = &cmp;
// the database will be created if it is missing
dbOptions_.create_if_missing = true;
// missing column families will be automatically created
dbOptions_.create_missing_column_families = true;
dbOptions_.enable_blob_files = true;
// maximum number of concurrent background memtable flush jobs,
// submitted by default to the HIGH priority thread pool
dbOptions_.max_background_flushes = 2;
// maximum number of concurrent background compaction jobs,
// submitted to the default LOW priority thread pool.
dbOptions_.max_background_compactions = 4;
// allows OS to incrementally sync files to disk while they are being
// written, asynchronously, in the background.
dbOptions_.bytes_per_sync = 1048576;
dbOptions_.compaction_pri = ROCKSDB_NAMESPACE::kMinOverlappingRatio;
dbOptions_.prefix_extractor.reset(NewCappedPrefixTransform(3));

// table options
std::shared_ptr<ROCKSDB_NAMESPACE::Cache> cache =
NewLRUCache(options.blockCacheCapacity);
BlockBasedTableOptions tableOptions;
tableOptions.block_size = 16 * 1024; // 16KB
// default: an 8MB internal cache
tableOptions.block_cache = cache;
tableOptions.block_size = 16 * 1024; // 16MB
// whether to put index/filter blocks in the block cache
tableOptions.cache_index_and_filter_blocks = true;
// only evicted from cache when the table reader is freed
tableOptions.pin_l0_filter_and_index_blocks_in_cache = true;
tableOptions.filter_policy.reset(NewBloomFilterPolicy(10, true));
dbOptions_.table_factory.reset(NewBlockBasedTableFactory(tableOptions));
// reset bloom filter
tableOptions.filter_policy.reset(NewBloomFilterPolicy(10, false));

// column failmy options
auto unorderedCFOptions = ColumnFamilyOptions();
auto orderedCFOptions = ColumnFamilyOptions();
comparator_ = std::make_shared<RocksDBStorageComparator>();
ColumnFamilyOptions cfOptions = ColumnFamilyOptions();
// user-defined key comparator
cfOptions.comparator = comparator_.get();
// large values (blobs) are written to separate blob files, and
// only pointers to them are stored in SST files
cfOptions.enable_blob_files = true;
// RocksDB will pick target size of each level dynamically
cfOptions.level_compaction_dynamic_level_bytes = true;
cfOptions.compaction_pri = ROCKSDB_NAMESPACE::kMinOverlappingRatio;
// use the specified function to determine the prefixes for keys
cfOptions.prefix_extractor.reset(NewFixedPrefixTransform(sizeof(size_t)));
// The size in bytes of the filter for memtable is
// write_buffer_size * memtable_prefix_bloom_size_ratio
cfOptions.memtable_prefix_bloom_size_ratio =
options.memtablePrefixBloomSizeRatio;
// reset table options for column failmy
cfOptions.table_factory.reset(NewBlockBasedTableFactory(tableOptions));

ColumnFamilyOptions unorderedCFOptions = cfOptions;
// amount of data to build up in memory (backed by an unsorted log
// on disk) before converting to a sorted on-disk file.
unorderedCFOptions.write_buffer_size = options.unorderedWriteBufferSize;
// the maximum number of write buffers that are built up in memory.
unorderedCFOptions.max_write_buffer_number =
options.unorderedMaxWriteBufferNumber;
unorderedCFOptions.level_compaction_dynamic_level_bytes = true;

ColumnFamilyOptions orderedCFOptions = cfOptions;
orderedCFOptions.write_buffer_size = options.orderedWriteBufferSize;
orderedCFOptions.max_write_buffer_number =
options.orderedMaxWriteBufferNumber;
orderedCFOptions.level_compaction_dynamic_level_bytes = true;

columnFamilies_.push_back(ColumnFamilyDescriptor(
ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, unorderedCFOptions));
Expand Down Expand Up @@ -210,6 +237,7 @@ Status RocksDBStorage::ToStorageStatus(const ROCKSDB_NAMESPACE::Status& s) {
return Status::InternalError();
}

// "ordered:name"
std::string RocksDBStorage::ToInternalName(const std::string& name,
bool ordered) {
std::ostringstream oss;
Expand All @@ -225,7 +253,7 @@ std::string RocksDBStorage::FormatInternalKey(size_t num4name,
}

// NOTE: we will convert name to number for compare prefix
// eg: iname:key
// eg: Hash(iname):key
std::string RocksDBStorage::ToInternalKey(const std::string& iname,
const std::string& key) {
size_t num4name = Hash(iname);
Expand Down Expand Up @@ -382,8 +410,8 @@ Status RocksDBStorage::Clear(const std::string& name, bool ordered) {
}

auto handle = GetColumnFamilyHandle(ordered);
std::string iname = ToInternalName(name, ordered);
std::string beginKey = ToInternalKey(iname, ""); // "name:"
std::string iname = ToInternalName(name, ordered); // "1:name"
std::string beginKey = ToInternalKey(iname, ""); // "Hash(iname):"
size_t beginNum = BinrayString2Number(beginKey);
std::string endKey = FormatInternalKey(beginNum + 1, "");
ROCKSDB_NAMESPACE::Status s = db_->DeleteRange(
Expand All @@ -404,24 +432,27 @@ std::shared_ptr<StorageTransaction> RocksDBStorage::BeginTransaction() {
}

Status RocksDBStorage::Commit() {
if (!InTransaction_) {
if (!InTransaction_ || nullptr == txn_) {
return Status::NotSupported();
}

Status s = ToStorageStatus(txn_->Commit());
if (s.ok()) {
CommitKeys();
}
delete txn_;
return s;
}

Status RocksDBStorage::Rollback() {
if (!InTransaction_) {
if (!InTransaction_ || nullptr == txn_) {
return Status::NotSupported();
}
pending4set_.clear();
pending4del_.clear();
return ToStorageStatus(txn_->Rollback());
Status s = ToStorageStatus(txn_->Commit());
delete txn_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, use std::unique_ptr if possiable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

Copy link
Contributor Author

@Wine93 Wine93 May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

the txn_ is the member of RocksDBStorage, we should free it manually when transaction finished, the std::unique_ptr is not work.

return s;
}

bool RocksDBStorage::GetStatistics(StorageStatistics* statistics) {
Expand Down
27 changes: 18 additions & 9 deletions curvefs/src/metaserver/storage/rocksdb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ using ROCKSDB_NAMESPACE::Transaction;
using ROCKSDB_NAMESPACE::TransactionDB;
using ROCKSDB_NAMESPACE::NewLRUCache;
using ROCKSDB_NAMESPACE::NewBloomFilterPolicy;
using ROCKSDB_NAMESPACE::NewCappedPrefixTransform;
using ROCKSDB_NAMESPACE::NewFixedPrefixTransform;
using ROCKSDB_NAMESPACE::NewBlockBasedTableFactory;
using STORAGE_TYPE = KVStorage::STORAGE_TYPE;

Expand Down Expand Up @@ -86,12 +86,17 @@ class RocksDBOptions {
std::vector<ColumnFamilyDescriptor> columnFamilies_;

static const std::string kOrderedColumnFamilyName_;

std::shared_ptr<rocksdb::Comparator> comparator_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to wrap it with a std::shared_ptr, just storing the raw value is enough.

Copy link
Contributor Author

@Wine93 Wine93 May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we storing raw value, like rocksdb::Comparator*, the memory of comparator object will never be freed:

comparator_ = std::make_shared<RocksDBStorageComparator>();
...
cfOptions.comparator = comparator_.get();

};

class RocksDBStorageComparator : public rocksdb::Comparator {
public:
// if slice1 < slice2, return -1
// if slice1 > slice2, return 1
// if slice1 == slice2, return 0
int Compare(const rocksdb::Slice& slice1,
const rocksdb::Slice& slice2) const {
const rocksdb::Slice& slice2) const override {
std::string key1 = std::string(slice1.data(), slice1.size());
std::string key2 = std::string(slice2.data(), slice2.size());
size_t num1 = BinrayString2Number(key1);
Expand All @@ -111,9 +116,12 @@ class RocksDBStorageComparator : public rocksdb::Comparator {
}

// Ignore the following methods for now
const char* Name() const { return "RocksDBStorageComparator"; }
void FindShortestSeparator(std::string*, const rocksdb::Slice&) const {}
void FindShortSuccessor(std::string*) const {}
const char* Name() const override { return "RocksDBStorageComparator"; }

void FindShortestSeparator(std::string*,
const rocksdb::Slice&) const override {}

void FindShortSuccessor(std::string*) const override {}
};

class RocksDBStorage : public KVStorage, public StorageTransaction {
Expand Down Expand Up @@ -336,7 +344,8 @@ class RocksDBStorageIterator : public Iterator {
size_(size),
status_(status),
prefixChecking_(true),
ordered_(ordered) {
ordered_(ordered),
iter_(nullptr) {
if (status_ == 0) {
readOptions_ = storage_->ReadOptions();
if (storage_->InTransaction_) {
Expand Down Expand Up @@ -375,9 +384,9 @@ class RocksDBStorageIterator : public Iterator {
void SeekToFirst() {
auto handler = storage_->GetColumnFamilyHandle(ordered_);
if (storage_->InTransaction_) {
iter_ = storage_->txn_->GetIterator(readOptions_, handler);
iter_.reset(storage_->txn_->GetIterator(readOptions_, handler));
} else {
iter_ = storage_->db_->NewIterator(readOptions_, handler);
iter_.reset(storage_->db_->NewIterator(readOptions_, handler));
}
iter_->Seek(prefix_);
}
Expand Down Expand Up @@ -420,7 +429,7 @@ class RocksDBStorageIterator : public Iterator {
int status_;
bool ordered_;
bool prefixChecking_;
rocksdb::Iterator* iter_;
std::unique_ptr<rocksdb::Iterator> iter_;
RocksDBStorage* storage_;
rocksdb::ReadOptions readOptions_;
};
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/metaserver/storage/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ std::string Number2BinaryString(size_t num) {

size_t BinrayString2Number(const std::string& str) {
if (str.size() < sizeof(size_t)) {
LOG(ERROR) << "The length of binray string must equal or greater than "
<< sizeof(size_t) << ", but now is " << str.size();
return 0;
}
return *reinterpret_cast<const size_t*>(str.c_str());
Expand Down
12 changes: 10 additions & 2 deletions curvefs/test/metaserver/storage/rocksdb_storage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,17 @@ class RocksDBStorageTest : public testing::Test {
std::string ret;
ASSERT_TRUE(ExecShell("mkdir -p " + dirname_, &ret));

options_.maxMemoryQuotaBytes = 32212254720;
options_.maxDiskQuotaBytes = 2199023255552;
options_.dataDir = dbpath_;
options_.maxMemoryQuotaBytes = 1024;
options_.maxDiskQuotaBytes = 1024;
options_.compression = false;
options_.unorderedWriteBufferSize = 134217728;
options_.unorderedMaxWriteBufferNumber = 5;
options_.orderedWriteBufferSize = 134217728;
options_.orderedMaxWriteBufferNumber = 15;
options_.blockCacheCapacity = 134217728;
options_.memtablePrefixBloomSizeRatio = 0.1;

kvStorage_ = std::make_shared<RocksDBStorage>(options_);
ASSERT_TRUE(kvStorage_->Open());
}
Expand Down
83 changes: 79 additions & 4 deletions curvefs/test/metaserver/storage/storage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void TestHClear(std::shared_ptr<KVStorage> kvStorage) {
s = kvStorage->HGet("partition:1", "key1", &value);
ASSERT_TRUE(s.IsNotFound());

// CASE 2: iterator after clear
// CASE 2: get all after clear table
iterator = kvStorage->HGetAll("partition:1");
ASSERT_EQ(iterator->Status(), 0);
size = 0;
Expand All @@ -300,7 +300,7 @@ void TestHClear(std::shared_ptr<KVStorage> kvStorage) {
ASSERT_EQ(size, 0);
ASSERT_EQ(iterator->Size(), 0);

// CASE 3: invoke set after clear
// CASE 3: set key-value after clear table
s = kvStorage->HSet("partition:1", "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->HGet("partition:1", "key1", &value);
Expand All @@ -315,6 +315,45 @@ void TestHClear(std::shared_ptr<KVStorage> kvStorage) {
}
ASSERT_EQ(size, 1);
ASSERT_EQ(iterator->Size(), 1);

s = kvStorage->HClear("partition:1");
ASSERT_TRUE(s.ok());

// CASE 4: clear different table
std::string tablename1 = "partition:1";
std::string tablename2 = "partition:2";
std::string tablename3 = "partition:3";

s = kvStorage->HSet(tablename1, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->HSet(tablename2, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->HSet(tablename3, "key1", Value("value1"));
ASSERT_TRUE(s.ok());

// clear table1
s = kvStorage->HClear(tablename1);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->HSize(tablename1), 0);
ASSERT_EQ(kvStorage->HSize(tablename2), 1);
ASSERT_EQ(kvStorage->HSize(tablename3), 1);

// clear table2
s = kvStorage->HClear(tablename2);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->HSize(tablename1), 0);
ASSERT_EQ(kvStorage->HSize(tablename2), 0);
ASSERT_EQ(kvStorage->HSize(tablename3), 1);

// clear table3
s = kvStorage->HClear(tablename3);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->HSize(tablename1), 0);
ASSERT_EQ(kvStorage->HSize(tablename2), 0);
ASSERT_EQ(kvStorage->HSize(tablename3), 0);
}

void TestSGet(std::shared_ptr<KVStorage> kvStorage) {
Expand Down Expand Up @@ -645,7 +684,7 @@ void TestSClear(std::shared_ptr<KVStorage> kvStorage) {
s = kvStorage->SGet("partition:1", "key1", &value);
ASSERT_TRUE(s.IsNotFound());

// CASE 2: iterator after clear
// CASE 2: get all after clear table
iterator = kvStorage->SGetAll("partition:1");
ASSERT_EQ(iterator->Status(), 0);
size = 0;
Expand All @@ -655,7 +694,7 @@ void TestSClear(std::shared_ptr<KVStorage> kvStorage) {
ASSERT_EQ(size, 0);
ASSERT_EQ(iterator->Size(), 0);

// CASE 3: invoke set after clear
// CASE 3: set key-value after clear table
s = kvStorage->SSet("partition:1", "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->SGet("partition:1", "key1", &value);
Expand All @@ -670,6 +709,42 @@ void TestSClear(std::shared_ptr<KVStorage> kvStorage) {
}
ASSERT_EQ(size, 1);
ASSERT_EQ(iterator->Size(), 1);

// CASE 4: clear different table
std::string tablename1 = "partition:1";
std::string tablename2 = "partition:2";
std::string tablename3 = "partition:3";

s = kvStorage->SSet(tablename1, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->SSet(tablename2, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->SSet(tablename3, "key1", Value("value1"));
ASSERT_TRUE(s.ok());

// clear table1
s = kvStorage->SClear(tablename1);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->SSize(tablename1), 0);
ASSERT_EQ(kvStorage->SSize(tablename2), 1);
ASSERT_EQ(kvStorage->SSize(tablename3), 1);

// clear table2
s = kvStorage->SClear(tablename2);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->SSize(tablename1), 0);
ASSERT_EQ(kvStorage->SSize(tablename2), 0);
ASSERT_EQ(kvStorage->SSize(tablename3), 1);

// clear table3
s = kvStorage->SClear(tablename3);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->SSize(tablename1), 0);
ASSERT_EQ(kvStorage->SSize(tablename2), 0);
ASSERT_EQ(kvStorage->SSize(tablename3), 0);
}

void TestMixOperator(std::shared_ptr<KVStorage> kvStorage) {
Expand Down