Skip to content

Commit

Permalink
curvefs/metaserver: fixed rocksdb storage memory leak caused by unrel…
Browse files Browse the repository at this point in the history
…easing iterator and transaction. (#1381)
  • Loading branch information
Wine93 committed May 6, 2022
1 parent 7bffef5 commit f78f563
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 35 deletions.
6 changes: 4 additions & 2 deletions curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ storage.rocksdb.ordered_write_buffer_size=134217728
# rocksdb column family's max_write_buffer_number
# for store dentry and inode's s3chunkinfo list (default: 15)
storage.rocksdb.ordered_max_write_buffer_number=15
# rocksdb block cache(LRU) capacity (default: 128MB)
storage.rocksdb.block_cache_capacity=134217728
# rocksdb block cache(LRU) capacity (default: 512MB)
storage.rocksdb.block_cache_capacity=536870912
# rocksdb memtable prefix bloom size ratio (size=write_buffer_size*memtable_prefix_bloom_size_ratio)
storage.rocksdb.memtable_prefix_bloom_size_ratio=0.1
# if the number of inode's s3chunkinfo exceed the limit_size,
# we will sending its with rpc streaming instead of
# padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB)
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/metaserver/metaserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ void Metaserver::InitStorage() {
LOG_IF(FATAL, !conf_->GetUInt64Value(
"storage.rocksdb.block_cache_capacity",
&storageOptions_.blockCacheCapacity));
LOG_IF(FATAL, !conf_->GetDoubleValue(
"storage.rocksdb.memtable_prefix_bloom_size_ratio",
&storageOptions_.memtablePrefixBloomSizeRatio));
LOG_IF(FATAL, !conf_->GetUInt64Value(
"storage.s3_meta_inside_inode.limit_size",
&storageOptions_.s3MetaLimitSizeInsideInode));
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/metaserver/storage/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ struct StorageOptions {

uint64_t blockCacheCapacity;

double memtablePrefixBloomSizeRatio;

// misc config item
uint64_t s3MetaLimitSizeInsideInode;
};
Expand Down
67 changes: 49 additions & 18 deletions curvefs/src/metaserver/storage/rocksdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,39 +42,66 @@ const std::string RocksDBOptions::kOrderedColumnFamilyName_ = // NOLINT

RocksDBOptions::RocksDBOptions(StorageOptions options) {
// db options
RocksDBStorageComparator cmp;
dbOptions_.comparator = &cmp;
// the database will be created if it is missing
dbOptions_.create_if_missing = true;
// missing column families will be automatically created
dbOptions_.create_missing_column_families = true;
dbOptions_.enable_blob_files = true;
// maximum number of concurrent background memtable flush jobs,
// submitted by default to the HIGH priority thread pool
dbOptions_.max_background_flushes = 2;
// maximum number of concurrent background compaction jobs,
// submitted to the default LOW priority thread pool.
dbOptions_.max_background_compactions = 4;
// allows OS to incrementally sync files to disk while they are being
// written, asynchronously, in the background.
dbOptions_.bytes_per_sync = 1048576;
dbOptions_.compaction_pri = ROCKSDB_NAMESPACE::kMinOverlappingRatio;
dbOptions_.prefix_extractor.reset(NewCappedPrefixTransform(3));

// table options
std::shared_ptr<ROCKSDB_NAMESPACE::Cache> cache =
NewLRUCache(options.blockCacheCapacity);
BlockBasedTableOptions tableOptions;
tableOptions.block_size = 16 * 1024; // 16KB
// default: an 8MB internal cache
tableOptions.block_cache = cache;
tableOptions.block_size = 16 * 1024; // 16MB
// whether to put index/filter blocks in the block cache
tableOptions.cache_index_and_filter_blocks = true;
// only evicted from cache when the table reader is freed
tableOptions.pin_l0_filter_and_index_blocks_in_cache = true;
tableOptions.filter_policy.reset(NewBloomFilterPolicy(10, true));
dbOptions_.table_factory.reset(NewBlockBasedTableFactory(tableOptions));
// reset bloom filter
tableOptions.filter_policy.reset(NewBloomFilterPolicy(10, false));

// column failmy options
auto unorderedCFOptions = ColumnFamilyOptions();
auto orderedCFOptions = ColumnFamilyOptions();
comparator_ = std::make_shared<RocksDBStorageComparator>();
ColumnFamilyOptions cfOptions = ColumnFamilyOptions();
// user-defined key comparator
cfOptions.comparator = comparator_.get();
// large values (blobs) are written to separate blob files, and
// only pointers to them are stored in SST files
cfOptions.enable_blob_files = true;
// RocksDB will pick target size of each level dynamically
cfOptions.level_compaction_dynamic_level_bytes = true;
cfOptions.compaction_pri = ROCKSDB_NAMESPACE::kMinOverlappingRatio;
// use the specified function to determine the prefixes for keys
cfOptions.prefix_extractor.reset(NewFixedPrefixTransform(sizeof(size_t)));
// The size in bytes of the filter for memtable is
// write_buffer_size * memtable_prefix_bloom_size_ratio
cfOptions.memtable_prefix_bloom_size_ratio =
options.memtablePrefixBloomSizeRatio;
// reset table options for column failmy
cfOptions.table_factory.reset(NewBlockBasedTableFactory(tableOptions));

ColumnFamilyOptions unorderedCFOptions = cfOptions;
// amount of data to build up in memory (backed by an unsorted log
// on disk) before converting to a sorted on-disk file.
unorderedCFOptions.write_buffer_size = options.unorderedWriteBufferSize;
// the maximum number of write buffers that are built up in memory.
unorderedCFOptions.max_write_buffer_number =
options.unorderedMaxWriteBufferNumber;
unorderedCFOptions.level_compaction_dynamic_level_bytes = true;

ColumnFamilyOptions orderedCFOptions = cfOptions;
orderedCFOptions.write_buffer_size = options.orderedWriteBufferSize;
orderedCFOptions.max_write_buffer_number =
options.orderedMaxWriteBufferNumber;
orderedCFOptions.level_compaction_dynamic_level_bytes = true;

columnFamilies_.push_back(ColumnFamilyDescriptor(
ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, unorderedCFOptions));
Expand Down Expand Up @@ -210,6 +237,7 @@ Status RocksDBStorage::ToStorageStatus(const ROCKSDB_NAMESPACE::Status& s) {
return Status::InternalError();
}

// "ordered:name"
std::string RocksDBStorage::ToInternalName(const std::string& name,
bool ordered) {
std::ostringstream oss;
Expand All @@ -225,7 +253,7 @@ std::string RocksDBStorage::FormatInternalKey(size_t num4name,
}

// NOTE: we will convert name to number for compare prefix
// eg: iname:key
// eg: Hash(iname):key
std::string RocksDBStorage::ToInternalKey(const std::string& iname,
const std::string& key) {
size_t num4name = Hash(iname);
Expand Down Expand Up @@ -382,8 +410,8 @@ Status RocksDBStorage::Clear(const std::string& name, bool ordered) {
}

auto handle = GetColumnFamilyHandle(ordered);
std::string iname = ToInternalName(name, ordered);
std::string beginKey = ToInternalKey(iname, ""); // "name:"
std::string iname = ToInternalName(name, ordered); // "1:name"
std::string beginKey = ToInternalKey(iname, ""); // "Hash(iname):"
size_t beginNum = BinrayString2Number(beginKey);
std::string endKey = FormatInternalKey(beginNum + 1, "");
ROCKSDB_NAMESPACE::Status s = db_->DeleteRange(
Expand All @@ -404,24 +432,27 @@ std::shared_ptr<StorageTransaction> RocksDBStorage::BeginTransaction() {
}

Status RocksDBStorage::Commit() {
if (!InTransaction_) {
if (!InTransaction_ || nullptr == txn_) {
return Status::NotSupported();
}

Status s = ToStorageStatus(txn_->Commit());
if (s.ok()) {
CommitKeys();
}
delete txn_;
return s;
}

Status RocksDBStorage::Rollback() {
if (!InTransaction_) {
if (!InTransaction_ || nullptr == txn_) {
return Status::NotSupported();
}
pending4set_.clear();
pending4del_.clear();
return ToStorageStatus(txn_->Rollback());
Status s = ToStorageStatus(txn_->Commit());
delete txn_;
return s;
}

bool RocksDBStorage::GetStatistics(StorageStatistics* statistics) {
Expand Down
27 changes: 18 additions & 9 deletions curvefs/src/metaserver/storage/rocksdb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ using ROCKSDB_NAMESPACE::Transaction;
using ROCKSDB_NAMESPACE::TransactionDB;
using ROCKSDB_NAMESPACE::NewLRUCache;
using ROCKSDB_NAMESPACE::NewBloomFilterPolicy;
using ROCKSDB_NAMESPACE::NewCappedPrefixTransform;
using ROCKSDB_NAMESPACE::NewFixedPrefixTransform;
using ROCKSDB_NAMESPACE::NewBlockBasedTableFactory;
using STORAGE_TYPE = KVStorage::STORAGE_TYPE;

Expand Down Expand Up @@ -86,12 +86,17 @@ class RocksDBOptions {
std::vector<ColumnFamilyDescriptor> columnFamilies_;

static const std::string kOrderedColumnFamilyName_;

std::shared_ptr<rocksdb::Comparator> comparator_;
};

class RocksDBStorageComparator : public rocksdb::Comparator {
public:
// if slice1 < slice2, return -1
// if slice1 > slice2, return 1
// if slice1 == slice2, return 0
int Compare(const rocksdb::Slice& slice1,
const rocksdb::Slice& slice2) const {
const rocksdb::Slice& slice2) const override {
std::string key1 = std::string(slice1.data(), slice1.size());
std::string key2 = std::string(slice2.data(), slice2.size());
size_t num1 = BinrayString2Number(key1);
Expand All @@ -111,9 +116,12 @@ class RocksDBStorageComparator : public rocksdb::Comparator {
}

// Ignore the following methods for now
const char* Name() const { return "RocksDBStorageComparator"; }
void FindShortestSeparator(std::string*, const rocksdb::Slice&) const {}
void FindShortSuccessor(std::string*) const {}
const char* Name() const override { return "RocksDBStorageComparator"; }

void FindShortestSeparator(std::string*,
const rocksdb::Slice&) const override {}

void FindShortSuccessor(std::string*) const override {}
};

class RocksDBStorage : public KVStorage, public StorageTransaction {
Expand Down Expand Up @@ -336,7 +344,8 @@ class RocksDBStorageIterator : public Iterator {
size_(size),
status_(status),
prefixChecking_(true),
ordered_(ordered) {
ordered_(ordered),
iter_(nullptr) {
if (status_ == 0) {
readOptions_ = storage_->ReadOptions();
if (storage_->InTransaction_) {
Expand Down Expand Up @@ -375,9 +384,9 @@ class RocksDBStorageIterator : public Iterator {
void SeekToFirst() {
auto handler = storage_->GetColumnFamilyHandle(ordered_);
if (storage_->InTransaction_) {
iter_ = storage_->txn_->GetIterator(readOptions_, handler);
iter_.reset(storage_->txn_->GetIterator(readOptions_, handler));
} else {
iter_ = storage_->db_->NewIterator(readOptions_, handler);
iter_.reset(storage_->db_->NewIterator(readOptions_, handler));
}
iter_->Seek(prefix_);
}
Expand Down Expand Up @@ -420,7 +429,7 @@ class RocksDBStorageIterator : public Iterator {
int status_;
bool ordered_;
bool prefixChecking_;
rocksdb::Iterator* iter_;
std::unique_ptr<rocksdb::Iterator> iter_;
RocksDBStorage* storage_;
rocksdb::ReadOptions readOptions_;
};
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/metaserver/storage/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ std::string Number2BinaryString(size_t num) {

size_t BinrayString2Number(const std::string& str) {
if (str.size() < sizeof(size_t)) {
LOG(ERROR) << "The length of binray string must equal or greater than "
<< sizeof(size_t) << ", but now is " << str.size();
return 0;
}
return *reinterpret_cast<const size_t*>(str.c_str());
Expand Down
12 changes: 10 additions & 2 deletions curvefs/test/metaserver/storage/rocksdb_storage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,17 @@ class RocksDBStorageTest : public testing::Test {
std::string ret;
ASSERT_TRUE(ExecShell("mkdir -p " + dirname_, &ret));

options_.maxMemoryQuotaBytes = 32212254720;
options_.maxDiskQuotaBytes = 2199023255552;
options_.dataDir = dbpath_;
options_.maxMemoryQuotaBytes = 1024;
options_.maxDiskQuotaBytes = 1024;
options_.compression = false;
options_.unorderedWriteBufferSize = 134217728;
options_.unorderedMaxWriteBufferNumber = 5;
options_.orderedWriteBufferSize = 134217728;
options_.orderedMaxWriteBufferNumber = 15;
options_.blockCacheCapacity = 134217728;
options_.memtablePrefixBloomSizeRatio = 0.1;

kvStorage_ = std::make_shared<RocksDBStorage>(options_);
ASSERT_TRUE(kvStorage_->Open());
}
Expand Down
83 changes: 79 additions & 4 deletions curvefs/test/metaserver/storage/storage_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void TestHClear(std::shared_ptr<KVStorage> kvStorage) {
s = kvStorage->HGet("partition:1", "key1", &value);
ASSERT_TRUE(s.IsNotFound());

// CASE 2: iterator after clear
// CASE 2: get all after clear table
iterator = kvStorage->HGetAll("partition:1");
ASSERT_EQ(iterator->Status(), 0);
size = 0;
Expand All @@ -300,7 +300,7 @@ void TestHClear(std::shared_ptr<KVStorage> kvStorage) {
ASSERT_EQ(size, 0);
ASSERT_EQ(iterator->Size(), 0);

// CASE 3: invoke set after clear
// CASE 3: set key-value after clear table
s = kvStorage->HSet("partition:1", "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->HGet("partition:1", "key1", &value);
Expand All @@ -315,6 +315,45 @@ void TestHClear(std::shared_ptr<KVStorage> kvStorage) {
}
ASSERT_EQ(size, 1);
ASSERT_EQ(iterator->Size(), 1);

s = kvStorage->HClear("partition:1");
ASSERT_TRUE(s.ok());

// CASE 4: clear different table
std::string tablename1 = "partition:1";
std::string tablename2 = "partition:2";
std::string tablename3 = "partition:3";

s = kvStorage->HSet(tablename1, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->HSet(tablename2, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->HSet(tablename3, "key1", Value("value1"));
ASSERT_TRUE(s.ok());

// clear table1
s = kvStorage->HClear(tablename1);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->HSize(tablename1), 0);
ASSERT_EQ(kvStorage->HSize(tablename2), 1);
ASSERT_EQ(kvStorage->HSize(tablename3), 1);

// clear table2
s = kvStorage->HClear(tablename2);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->HSize(tablename1), 0);
ASSERT_EQ(kvStorage->HSize(tablename2), 0);
ASSERT_EQ(kvStorage->HSize(tablename3), 1);

// clear table3
s = kvStorage->HClear(tablename3);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->HSize(tablename1), 0);
ASSERT_EQ(kvStorage->HSize(tablename2), 0);
ASSERT_EQ(kvStorage->HSize(tablename3), 0);
}

void TestSGet(std::shared_ptr<KVStorage> kvStorage) {
Expand Down Expand Up @@ -645,7 +684,7 @@ void TestSClear(std::shared_ptr<KVStorage> kvStorage) {
s = kvStorage->SGet("partition:1", "key1", &value);
ASSERT_TRUE(s.IsNotFound());

// CASE 2: iterator after clear
// CASE 2: get all after clear table
iterator = kvStorage->SGetAll("partition:1");
ASSERT_EQ(iterator->Status(), 0);
size = 0;
Expand All @@ -655,7 +694,7 @@ void TestSClear(std::shared_ptr<KVStorage> kvStorage) {
ASSERT_EQ(size, 0);
ASSERT_EQ(iterator->Size(), 0);

// CASE 3: invoke set after clear
// CASE 3: set key-value after clear table
s = kvStorage->SSet("partition:1", "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->SGet("partition:1", "key1", &value);
Expand All @@ -670,6 +709,42 @@ void TestSClear(std::shared_ptr<KVStorage> kvStorage) {
}
ASSERT_EQ(size, 1);
ASSERT_EQ(iterator->Size(), 1);

// CASE 4: clear different table
std::string tablename1 = "partition:1";
std::string tablename2 = "partition:2";
std::string tablename3 = "partition:3";

s = kvStorage->SSet(tablename1, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->SSet(tablename2, "key1", Value("value1"));
ASSERT_TRUE(s.ok());
s = kvStorage->SSet(tablename3, "key1", Value("value1"));
ASSERT_TRUE(s.ok());

// clear table1
s = kvStorage->SClear(tablename1);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->SSize(tablename1), 0);
ASSERT_EQ(kvStorage->SSize(tablename2), 1);
ASSERT_EQ(kvStorage->SSize(tablename3), 1);

// clear table2
s = kvStorage->SClear(tablename2);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->SSize(tablename1), 0);
ASSERT_EQ(kvStorage->SSize(tablename2), 0);
ASSERT_EQ(kvStorage->SSize(tablename3), 1);

// clear table3
s = kvStorage->SClear(tablename3);
ASSERT_TRUE(s.ok());

ASSERT_EQ(kvStorage->SSize(tablename1), 0);
ASSERT_EQ(kvStorage->SSize(tablename2), 0);
ASSERT_EQ(kvStorage->SSize(tablename3), 0);
}

void TestMixOperator(std::shared_ptr<KVStorage> kvStorage) {
Expand Down

0 comments on commit f78f563

Please sign in to comment.