From 68a3e6d2790d085888a1b0283ef8e3faaab32f8c Mon Sep 17 00:00:00 2001 From: Wine93 Date: Wed, 4 May 2022 21:19:32 +0800 Subject: [PATCH] curvefs/metaserver: fixed rocksdb storage memory leak caused by unreleasing iterator and transaction. (#1381) --- curvefs/conf/metaserver.conf | 6 +- curvefs/src/metaserver/metaserver.cpp | 3 + curvefs/src/metaserver/storage/config.h | 2 + .../metaserver/storage/rocksdb_storage.cpp | 67 +++++++++++---- .../src/metaserver/storage/rocksdb_storage.h | 27 ++++-- curvefs/src/metaserver/storage/utils.cpp | 2 + .../storage/rocksdb_storage_test.cpp | 12 ++- .../test/metaserver/storage/storage_test.cpp | 83 ++++++++++++++++++- 8 files changed, 167 insertions(+), 35 deletions(-) diff --git a/curvefs/conf/metaserver.conf b/curvefs/conf/metaserver.conf index 22253ebf7d..4ef00426fe 100644 --- a/curvefs/conf/metaserver.conf +++ b/curvefs/conf/metaserver.conf @@ -229,8 +229,10 @@ storage.rocksdb.ordered_write_buffer_size=134217728 # rocksdb column family's max_write_buffer_number # for store dentry and inode's s3chunkinfo list (default: 15) storage.rocksdb.ordered_max_write_buffer_number=15 -# rocksdb block cache(LRU) capacity (default: 128MB) -storage.rocksdb.block_cache_capacity=134217728 +# rocksdb block cache(LRU) capacity (default: 512MB) +storage.rocksdb.block_cache_capacity=536870912 +# rocksdb memtable prefix bloom size ratio (size=write_buffer_size*memtable_prefix_bloom_size_ratio) +storage.rocksdb.memtable_prefix_bloom_size_ratio=0.1 # if the number of inode's s3chunkinfo exceed the limit_size, # we will sending its with rpc streaming instead of # padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB) diff --git a/curvefs/src/metaserver/metaserver.cpp b/curvefs/src/metaserver/metaserver.cpp index 7d6e64510b..f2524358bc 100644 --- a/curvefs/src/metaserver/metaserver.cpp +++ b/curvefs/src/metaserver/metaserver.cpp @@ -500,6 +500,9 @@ void Metaserver::InitStorage() { LOG_IF(FATAL, !conf_->GetUInt64Value( "storage.rocksdb.block_cache_capacity", &storageOptions_.blockCacheCapacity)); + LOG_IF(FATAL, !conf_->GetDoubleValue( + "storage.rocksdb.memtable_prefix_bloom_size_ratio", + &storageOptions_.memtablePrefixBloomSizeRatio)); LOG_IF(FATAL, !conf_->GetUInt64Value( "storage.s3_meta_inside_inode.limit_size", &storageOptions_.s3MetaLimitSizeInsideInode)); diff --git a/curvefs/src/metaserver/storage/config.h b/curvefs/src/metaserver/storage/config.h index 52b5d0dc45..4e923926a9 100644 --- a/curvefs/src/metaserver/storage/config.h +++ b/curvefs/src/metaserver/storage/config.h @@ -54,6 +54,8 @@ struct StorageOptions { uint64_t blockCacheCapacity; + double memtablePrefixBloomSizeRatio; + // misc config item uint64_t s3MetaLimitSizeInsideInode; }; diff --git a/curvefs/src/metaserver/storage/rocksdb_storage.cpp b/curvefs/src/metaserver/storage/rocksdb_storage.cpp index 128cdf4e51..1a778cdf58 100644 --- a/curvefs/src/metaserver/storage/rocksdb_storage.cpp +++ b/curvefs/src/metaserver/storage/rocksdb_storage.cpp @@ -42,39 +42,66 @@ const std::string RocksDBOptions::kOrderedColumnFamilyName_ = // NOLINT RocksDBOptions::RocksDBOptions(StorageOptions options) { // db options - RocksDBStorageComparator cmp; - dbOptions_.comparator = &cmp; + // the database will be created if it is missing dbOptions_.create_if_missing = true; + // missing column families will be automatically created dbOptions_.create_missing_column_families = true; - dbOptions_.enable_blob_files = true; + // maximum number of concurrent background memtable flush jobs, + // submitted by default to the HIGH priority thread pool dbOptions_.max_background_flushes = 2; + // maximum number of concurrent background compaction jobs, + // submitted to the default LOW priority thread pool. dbOptions_.max_background_compactions = 4; + // allows OS to incrementally sync files to disk while they are being + // written, asynchronously, in the background. dbOptions_.bytes_per_sync = 1048576; - dbOptions_.compaction_pri = ROCKSDB_NAMESPACE::kMinOverlappingRatio; - dbOptions_.prefix_extractor.reset(NewCappedPrefixTransform(3)); // table options std::shared_ptr cache = NewLRUCache(options.blockCacheCapacity); BlockBasedTableOptions tableOptions; + tableOptions.block_size = 16 * 1024; // 16KB + // default: an 8MB internal cache tableOptions.block_cache = cache; - tableOptions.block_size = 16 * 1024; // 16MB + // whether to put index/filter blocks in the block cache tableOptions.cache_index_and_filter_blocks = true; + // only evicted from cache when the table reader is freed tableOptions.pin_l0_filter_and_index_blocks_in_cache = true; - tableOptions.filter_policy.reset(NewBloomFilterPolicy(10, true)); - dbOptions_.table_factory.reset(NewBlockBasedTableFactory(tableOptions)); + // reset bloom filter + tableOptions.filter_policy.reset(NewBloomFilterPolicy(10, false)); // column failmy options - auto unorderedCFOptions = ColumnFamilyOptions(); - auto orderedCFOptions = ColumnFamilyOptions(); + comparator_ = std::make_shared(); + ColumnFamilyOptions cfOptions = ColumnFamilyOptions(); + // user-defined key comparator + cfOptions.comparator = comparator_.get(); + // large values (blobs) are written to separate blob files, and + // only pointers to them are stored in SST files + cfOptions.enable_blob_files = true; + // RocksDB will pick target size of each level dynamically + cfOptions.level_compaction_dynamic_level_bytes = true; + cfOptions.compaction_pri = ROCKSDB_NAMESPACE::kMinOverlappingRatio; + // use the specified function to determine the prefixes for keys + cfOptions.prefix_extractor.reset(NewFixedPrefixTransform(sizeof(size_t))); + // The size in bytes of the filter for memtable is + // write_buffer_size * memtable_prefix_bloom_size_ratio + cfOptions.memtable_prefix_bloom_size_ratio = + options.memtablePrefixBloomSizeRatio; + // reset table options for column failmy + cfOptions.table_factory.reset(NewBlockBasedTableFactory(tableOptions)); + + ColumnFamilyOptions unorderedCFOptions = cfOptions; + // amount of data to build up in memory (backed by an unsorted log + // on disk) before converting to a sorted on-disk file. unorderedCFOptions.write_buffer_size = options.unorderedWriteBufferSize; + // the maximum number of write buffers that are built up in memory. unorderedCFOptions.max_write_buffer_number = options.unorderedMaxWriteBufferNumber; - unorderedCFOptions.level_compaction_dynamic_level_bytes = true; + + ColumnFamilyOptions orderedCFOptions = cfOptions; orderedCFOptions.write_buffer_size = options.orderedWriteBufferSize; orderedCFOptions.max_write_buffer_number = options.orderedMaxWriteBufferNumber; - orderedCFOptions.level_compaction_dynamic_level_bytes = true; columnFamilies_.push_back(ColumnFamilyDescriptor( ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, unorderedCFOptions)); @@ -210,6 +237,7 @@ Status RocksDBStorage::ToStorageStatus(const ROCKSDB_NAMESPACE::Status& s) { return Status::InternalError(); } +// "ordered:name" std::string RocksDBStorage::ToInternalName(const std::string& name, bool ordered) { std::ostringstream oss; @@ -225,7 +253,7 @@ std::string RocksDBStorage::FormatInternalKey(size_t num4name, } // NOTE: we will convert name to number for compare prefix -// eg: iname:key +// eg: Hash(iname):key std::string RocksDBStorage::ToInternalKey(const std::string& iname, const std::string& key) { size_t num4name = Hash(iname); @@ -382,8 +410,8 @@ Status RocksDBStorage::Clear(const std::string& name, bool ordered) { } auto handle = GetColumnFamilyHandle(ordered); - std::string iname = ToInternalName(name, ordered); - std::string beginKey = ToInternalKey(iname, ""); // "name:" + std::string iname = ToInternalName(name, ordered); // "1:name" + std::string beginKey = ToInternalKey(iname, ""); // "Hash(iname):" size_t beginNum = BinrayString2Number(beginKey); std::string endKey = FormatInternalKey(beginNum + 1, ""); ROCKSDB_NAMESPACE::Status s = db_->DeleteRange( @@ -404,7 +432,7 @@ std::shared_ptr RocksDBStorage::BeginTransaction() { } Status RocksDBStorage::Commit() { - if (!InTransaction_) { + if (!InTransaction_ || nullptr == txn_) { return Status::NotSupported(); } @@ -412,16 +440,19 @@ Status RocksDBStorage::Commit() { if (s.ok()) { CommitKeys(); } + delete txn_; return s; } Status RocksDBStorage::Rollback() { - if (!InTransaction_) { + if (!InTransaction_ || nullptr == txn_) { return Status::NotSupported(); } pending4set_.clear(); pending4del_.clear(); - return ToStorageStatus(txn_->Rollback()); + Status s = ToStorageStatus(txn_->Commit()); + delete txn_; + return s; } bool RocksDBStorage::GetStatistics(StorageStatistics* statistics) { diff --git a/curvefs/src/metaserver/storage/rocksdb_storage.h b/curvefs/src/metaserver/storage/rocksdb_storage.h index bb59776fe9..007fffdf60 100644 --- a/curvefs/src/metaserver/storage/rocksdb_storage.h +++ b/curvefs/src/metaserver/storage/rocksdb_storage.h @@ -58,7 +58,7 @@ using ROCKSDB_NAMESPACE::Transaction; using ROCKSDB_NAMESPACE::TransactionDB; using ROCKSDB_NAMESPACE::NewLRUCache; using ROCKSDB_NAMESPACE::NewBloomFilterPolicy; -using ROCKSDB_NAMESPACE::NewCappedPrefixTransform; +using ROCKSDB_NAMESPACE::NewFixedPrefixTransform; using ROCKSDB_NAMESPACE::NewBlockBasedTableFactory; using STORAGE_TYPE = KVStorage::STORAGE_TYPE; @@ -86,12 +86,17 @@ class RocksDBOptions { std::vector columnFamilies_; static const std::string kOrderedColumnFamilyName_; + + std::shared_ptr comparator_; }; class RocksDBStorageComparator : public rocksdb::Comparator { public: + // if slice1 < slice2, return -1 + // if slice1 > slice2, return 1 + // if slice1 == slice2, return 0 int Compare(const rocksdb::Slice& slice1, - const rocksdb::Slice& slice2) const { + const rocksdb::Slice& slice2) const override { std::string key1 = std::string(slice1.data(), slice1.size()); std::string key2 = std::string(slice2.data(), slice2.size()); size_t num1 = BinrayString2Number(key1); @@ -111,9 +116,12 @@ class RocksDBStorageComparator : public rocksdb::Comparator { } // Ignore the following methods for now - const char* Name() const { return "RocksDBStorageComparator"; } - void FindShortestSeparator(std::string*, const rocksdb::Slice&) const {} - void FindShortSuccessor(std::string*) const {} + const char* Name() const override { return "RocksDBStorageComparator"; } + + void FindShortestSeparator(std::string*, + const rocksdb::Slice&) const override {} + + void FindShortSuccessor(std::string*) const override {} }; class RocksDBStorage : public KVStorage, public StorageTransaction { @@ -336,7 +344,8 @@ class RocksDBStorageIterator : public Iterator { size_(size), status_(status), prefixChecking_(true), - ordered_(ordered) { + ordered_(ordered), + iter_(nullptr) { if (status_ == 0) { readOptions_ = storage_->ReadOptions(); if (storage_->InTransaction_) { @@ -375,9 +384,9 @@ class RocksDBStorageIterator : public Iterator { void SeekToFirst() { auto handler = storage_->GetColumnFamilyHandle(ordered_); if (storage_->InTransaction_) { - iter_ = storage_->txn_->GetIterator(readOptions_, handler); + iter_.reset(storage_->txn_->GetIterator(readOptions_, handler)); } else { - iter_ = storage_->db_->NewIterator(readOptions_, handler); + iter_.reset(storage_->db_->NewIterator(readOptions_, handler)); } iter_->Seek(prefix_); } @@ -420,7 +429,7 @@ class RocksDBStorageIterator : public Iterator { int status_; bool ordered_; bool prefixChecking_; - rocksdb::Iterator* iter_; + std::unique_ptr iter_; RocksDBStorage* storage_; rocksdb::ReadOptions readOptions_; }; diff --git a/curvefs/src/metaserver/storage/utils.cpp b/curvefs/src/metaserver/storage/utils.cpp index b94618394e..8f1609a0c0 100644 --- a/curvefs/src/metaserver/storage/utils.cpp +++ b/curvefs/src/metaserver/storage/utils.cpp @@ -56,6 +56,8 @@ std::string Number2BinaryString(size_t num) { size_t BinrayString2Number(const std::string& str) { if (str.size() < sizeof(size_t)) { + LOG(ERROR) << "The length of binray string must equal or greater than " + << sizeof(size_t) << ", but now is " << str.size(); return 0; } return *reinterpret_cast(str.c_str()); diff --git a/curvefs/test/metaserver/storage/rocksdb_storage_test.cpp b/curvefs/test/metaserver/storage/rocksdb_storage_test.cpp index 9c86fb4960..35f3d4e82e 100644 --- a/curvefs/test/metaserver/storage/rocksdb_storage_test.cpp +++ b/curvefs/test/metaserver/storage/rocksdb_storage_test.cpp @@ -54,9 +54,17 @@ class RocksDBStorageTest : public testing::Test { std::string ret; ASSERT_TRUE(ExecShell("mkdir -p " + dirname_, &ret)); + options_.maxMemoryQuotaBytes = 32212254720; + options_.maxDiskQuotaBytes = 2199023255552; options_.dataDir = dbpath_; - options_.maxMemoryQuotaBytes = 1024; - options_.maxDiskQuotaBytes = 1024; + options_.compression = false; + options_.unorderedWriteBufferSize = 134217728; + options_.unorderedMaxWriteBufferNumber = 5; + options_.orderedWriteBufferSize = 134217728; + options_.orderedMaxWriteBufferNumber = 15; + options_.blockCacheCapacity = 134217728; + options_.memtablePrefixBloomSizeRatio = 0.1; + kvStorage_ = std::make_shared(options_); ASSERT_TRUE(kvStorage_->Open()); } diff --git a/curvefs/test/metaserver/storage/storage_test.cpp b/curvefs/test/metaserver/storage/storage_test.cpp index a083a0106e..56d4b064c6 100644 --- a/curvefs/test/metaserver/storage/storage_test.cpp +++ b/curvefs/test/metaserver/storage/storage_test.cpp @@ -290,7 +290,7 @@ void TestHClear(std::shared_ptr kvStorage) { s = kvStorage->HGet("partition:1", "key1", &value); ASSERT_TRUE(s.IsNotFound()); - // CASE 2: iterator after clear + // CASE 2: get all after clear table iterator = kvStorage->HGetAll("partition:1"); ASSERT_EQ(iterator->Status(), 0); size = 0; @@ -300,7 +300,7 @@ void TestHClear(std::shared_ptr kvStorage) { ASSERT_EQ(size, 0); ASSERT_EQ(iterator->Size(), 0); - // CASE 3: invoke set after clear + // CASE 3: set key-value after clear table s = kvStorage->HSet("partition:1", "key1", Value("value1")); ASSERT_TRUE(s.ok()); s = kvStorage->HGet("partition:1", "key1", &value); @@ -315,6 +315,45 @@ void TestHClear(std::shared_ptr kvStorage) { } ASSERT_EQ(size, 1); ASSERT_EQ(iterator->Size(), 1); + + s = kvStorage->HClear("partition:1"); + ASSERT_TRUE(s.ok()); + + // CASE 4: clear different table + std::string tablename1 = "partition:1"; + std::string tablename2 = "partition:2"; + std::string tablename3 = "partition:3"; + + s = kvStorage->HSet(tablename1, "key1", Value("value1")); + ASSERT_TRUE(s.ok()); + s = kvStorage->HSet(tablename2, "key1", Value("value1")); + ASSERT_TRUE(s.ok()); + s = kvStorage->HSet(tablename3, "key1", Value("value1")); + ASSERT_TRUE(s.ok()); + + // clear table1 + s = kvStorage->HClear(tablename1); + ASSERT_TRUE(s.ok()); + + ASSERT_EQ(kvStorage->HSize(tablename1), 0); + ASSERT_EQ(kvStorage->HSize(tablename2), 1); + ASSERT_EQ(kvStorage->HSize(tablename3), 1); + + // clear table2 + s = kvStorage->HClear(tablename2); + ASSERT_TRUE(s.ok()); + + ASSERT_EQ(kvStorage->HSize(tablename1), 0); + ASSERT_EQ(kvStorage->HSize(tablename2), 0); + ASSERT_EQ(kvStorage->HSize(tablename3), 1); + + // clear table3 + s = kvStorage->HClear(tablename3); + ASSERT_TRUE(s.ok()); + + ASSERT_EQ(kvStorage->HSize(tablename1), 0); + ASSERT_EQ(kvStorage->HSize(tablename2), 0); + ASSERT_EQ(kvStorage->HSize(tablename3), 0); } void TestSGet(std::shared_ptr kvStorage) { @@ -645,7 +684,7 @@ void TestSClear(std::shared_ptr kvStorage) { s = kvStorage->SGet("partition:1", "key1", &value); ASSERT_TRUE(s.IsNotFound()); - // CASE 2: iterator after clear + // CASE 2: get all after clear table iterator = kvStorage->SGetAll("partition:1"); ASSERT_EQ(iterator->Status(), 0); size = 0; @@ -655,7 +694,7 @@ void TestSClear(std::shared_ptr kvStorage) { ASSERT_EQ(size, 0); ASSERT_EQ(iterator->Size(), 0); - // CASE 3: invoke set after clear + // CASE 3: set key-value after clear table s = kvStorage->SSet("partition:1", "key1", Value("value1")); ASSERT_TRUE(s.ok()); s = kvStorage->SGet("partition:1", "key1", &value); @@ -670,6 +709,42 @@ void TestSClear(std::shared_ptr kvStorage) { } ASSERT_EQ(size, 1); ASSERT_EQ(iterator->Size(), 1); + + // CASE 4: clear different table + std::string tablename1 = "partition:1"; + std::string tablename2 = "partition:2"; + std::string tablename3 = "partition:3"; + + s = kvStorage->SSet(tablename1, "key1", Value("value1")); + ASSERT_TRUE(s.ok()); + s = kvStorage->SSet(tablename2, "key1", Value("value1")); + ASSERT_TRUE(s.ok()); + s = kvStorage->SSet(tablename3, "key1", Value("value1")); + ASSERT_TRUE(s.ok()); + + // clear table1 + s = kvStorage->SClear(tablename1); + ASSERT_TRUE(s.ok()); + + ASSERT_EQ(kvStorage->SSize(tablename1), 0); + ASSERT_EQ(kvStorage->SSize(tablename2), 1); + ASSERT_EQ(kvStorage->SSize(tablename3), 1); + + // clear table2 + s = kvStorage->SClear(tablename2); + ASSERT_TRUE(s.ok()); + + ASSERT_EQ(kvStorage->SSize(tablename1), 0); + ASSERT_EQ(kvStorage->SSize(tablename2), 0); + ASSERT_EQ(kvStorage->SSize(tablename3), 1); + + // clear table3 + s = kvStorage->SClear(tablename3); + ASSERT_TRUE(s.ok()); + + ASSERT_EQ(kvStorage->SSize(tablename1), 0); + ASSERT_EQ(kvStorage->SSize(tablename2), 0); + ASSERT_EQ(kvStorage->SSize(tablename3), 0); } void TestMixOperator(std::shared_ptr kvStorage) {