From 4401cce9c5447ea2a66f52528f3fe1744d11e481 Mon Sep 17 00:00:00 2001 From: Wine93 Date: Sun, 24 Apr 2022 16:55:20 +0800 Subject: [PATCH] curvefs/metaserver: speed up getting inode by padding inode's s3chunkinfo which small enought instead of invoke RefreshS3ChunkInfo(). --- .gitignore | 1 + curvefs/conf/metaserver.conf | 4 + curvefs/proto/metaserver.proto | 4 + curvefs/src/client/inode_cache_manager.cpp | 20 +- .../client/rpcclient/metaserver_client.cpp | 5 +- .../src/client/rpcclient/metaserver_client.h | 4 +- .../src/metaserver/copyset/meta_operator.cpp | 5 +- curvefs/src/metaserver/inode_manager.cpp | 5 +- curvefs/src/metaserver/inode_manager.h | 4 +- curvefs/src/metaserver/inode_storage.cpp | 29 ++- curvefs/src/metaserver/inode_storage.h | 31 ++- curvefs/src/metaserver/metaserver.cpp | 3 + curvefs/src/metaserver/metastore.cpp | 33 ++- curvefs/src/metaserver/partition.cpp | 12 + curvefs/src/metaserver/partition.h | 5 + curvefs/src/metaserver/s3compact_wq_impl.cpp | 2 +- curvefs/src/metaserver/storage/config.h | 3 + curvefs/src/metaserver/storage/converter.cpp | 17 +- curvefs/src/metaserver/storage/converter.h | 4 +- .../src/metaserver/storage/memory_storage.cpp | 4 + .../src/metaserver/storage/memory_storage.h | 2 + .../metaserver/storage/rocksdb_storage.cpp | 8 +- .../src/metaserver/storage/rocksdb_storage.h | 20 +- curvefs/src/metaserver/storage/storage.h | 2 + curvefs/test/client/mock_metaserver_client.h | 4 +- .../rpcclient/metaserver_client_test.cpp | 15 +- .../test/client/test_inode_cache_manager.cpp | 6 +- .../test/metaserver/inode_storage_test.cpp | 60 +++-- curvefs/test/metaserver/metastore_test.cpp | 228 +++++++++++++++++- 29 files changed, 460 insertions(+), 80 deletions(-) diff --git a/.gitignore b/.gitignore index 687ad0abb6..60b2fcfe92 100755 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,4 @@ docker/*/curvebs curvefs/docker/curvefs curvefs/docker/*/curvefs +storage_* diff --git a/curvefs/conf/metaserver.conf b/curvefs/conf/metaserver.conf index e1ab2845ed..22253ebf7d 100644 --- a/curvefs/conf/metaserver.conf +++ b/curvefs/conf/metaserver.conf @@ -231,3 +231,7 @@ storage.rocksdb.ordered_write_buffer_size=134217728 storage.rocksdb.ordered_max_write_buffer_number=15 # rocksdb block cache(LRU) capacity (default: 128MB) storage.rocksdb.block_cache_capacity=134217728 +# if the number of inode's s3chunkinfo exceed the limit_size, +# we will sending its with rpc streaming instead of +# padding its into inode (default: 25000, about 25000 * 41 (byte) = 1MB) +storage.s3_meta_inside_inode.limit_size=25000 diff --git a/curvefs/proto/metaserver.proto b/curvefs/proto/metaserver.proto index 103c14ca31..5bb9c60aff 100644 --- a/curvefs/proto/metaserver.proto +++ b/curvefs/proto/metaserver.proto @@ -46,6 +46,7 @@ enum MetaStatusCode { PARSE_FROM_STRING_FAILED = 23; STORAGE_INTERNAL_ERROR = 24; RPC_STREAM_ERROR = 25; + INODE_S3_META_TOO_LARGE = 26; } // dentry interface @@ -148,6 +149,7 @@ message GetInodeRequest { required uint32 fsId = 4; required uint64 inodeId = 5; optional uint64 appliedIndex = 6; + optional bool supportStreaming = 7; // for backward compatibility } enum FsFileType { @@ -212,6 +214,7 @@ message GetInodeResponse { required MetaStatusCode statusCode = 1; optional Inode inode = 2; optional uint64 appliedIndex = 3; + optional bool streaming = 4; } message CreateInodeRequest { @@ -335,6 +338,7 @@ message GetOrModifyS3ChunkInfoRequest { required bool returnS3ChunkInfoMap = 8; optional bool fromS3Compaction = 9; // todo: we only need a bit flag to indicate a lot of bool + optional bool supportStreaming = 10; // for backward compatibility } message GetOrModifyS3ChunkInfoResponse { diff --git a/curvefs/src/client/inode_cache_manager.cpp b/curvefs/src/client/inode_cache_manager.cpp index be194b90c9..8615708b41 100644 --- a/curvefs/src/client/inode_cache_manager.cpp +++ b/curvefs/src/client/inode_cache_manager.cpp @@ -61,8 +61,10 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeid, } Inode inode; + bool streaming; - MetaStatusCode ret2 = metaClient_->GetInode(fsId_, inodeid, &inode); + MetaStatusCode ret2 = metaClient_->GetInode( + fsId_, inodeid, &inode, &streaming); if (ret2 != MetaStatusCode::OK) { LOG_IF(ERROR, ret2 != MetaStatusCode::NOT_FOUND) << "metaClient_ GetInode failed, MetaStatusCode = " << ret2 @@ -74,13 +76,15 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeid, out = std::make_shared( std::move(inode), metaClient_); - // NOTE: now the s3chunkinfo in inode is empty for - // we had store it with alone, so we should invoke - // RefreshS3ChunkInfo() to padding inode's s3chunkinfo. - CURVEFS_ERROR rc = out->RefreshS3ChunkInfo(); - if (rc != CURVEFS_ERROR::OK) { - LOG(ERROR) << "RefreshS3ChunkInfo() failed, retCode = " << rc; - return rc; + // NOTE: if the s3chunkinfo inside inode is too large, + // we should invoke RefreshS3ChunkInfo() to receive s3chunkinfo + // by streaming and padding its into inode. + if (streaming) { + CURVEFS_ERROR rc = out->RefreshS3ChunkInfo(); + if (rc != CURVEFS_ERROR::OK) { + LOG(ERROR) << "RefreshS3ChunkInfo() failed, retCode = " << rc; + return rc; + } } std::shared_ptr eliminatedOne; diff --git a/curvefs/src/client/rpcclient/metaserver_client.cpp b/curvefs/src/client/rpcclient/metaserver_client.cpp index 21c6adc48e..37480b84fc 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.cpp +++ b/curvefs/src/client/rpcclient/metaserver_client.cpp @@ -412,7 +412,7 @@ MetaServerClientImpl::PrepareRenameTx(const std::vector &dentrys) { } MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid, - Inode *out) { + Inode *out, bool* streaming) { auto task = RPCTask { metaserverClientMetric_->getInode.qps.count << 1; LatencyUpdater updater(&metaserverClientMetric_->getInode.latency); @@ -425,6 +425,7 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid, request.set_inodeid(inodeid); request.set_appliedindex( metaCache_->GetApplyIndex(CopysetGroupID(poolID, copysetID))); + request.set_supportstreaming(true); curvefs::metaserver::MetaServerService_Stub stub(channel); stub.GetInode(cntl, &request, &response, nullptr); @@ -454,6 +455,7 @@ MetaStatusCode MetaServerClientImpl::GetInode(uint32_t fsId, uint64_t inodeid, return -1; } + *streaming = response.has_streaming() ? response.streaming() : false; auto &s3chunkinfoMap = response.inode().s3chunkinfomap(); for (auto &item : s3chunkinfoMap) { VLOG(9) << "inodeInfo, inodeId:" << inodeid @@ -912,6 +914,7 @@ MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo( request.set_inodeid(inodeId); request.set_returns3chunkinfomap(returnS3ChunkInfoMap); *(request.mutable_s3chunkinfoadd()) = s3ChunkInfos; + request.set_supportstreaming(true); curvefs::metaserver::MetaServerService_Stub stub(channel); diff --git a/curvefs/src/client/rpcclient/metaserver_client.h b/curvefs/src/client/rpcclient/metaserver_client.h index 931819c732..e72d1c2921 100644 --- a/curvefs/src/client/rpcclient/metaserver_client.h +++ b/curvefs/src/client/rpcclient/metaserver_client.h @@ -89,7 +89,7 @@ class MetaServerClient { PrepareRenameTx(const std::vector &dentrys) = 0; virtual MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeid, - Inode *out) = 0; + Inode *out, bool* streaming) = 0; virtual MetaStatusCode BatchGetInodeAttr(uint32_t fsId, std::set *inodeIds, @@ -162,7 +162,7 @@ class MetaServerClientImpl : public MetaServerClient { MetaStatusCode PrepareRenameTx(const std::vector &dentrys) override; MetaStatusCode GetInode(uint32_t fsId, uint64_t inodeid, - Inode *out) override; + Inode *out, bool* streaming) override; MetaStatusCode BatchGetInodeAttr(uint32_t fsId, std::set *inodeIds, diff --git a/curvefs/src/metaserver/copyset/meta_operator.cpp b/curvefs/src/metaserver/copyset/meta_operator.cpp index d203a77910..8c2ca5c1fc 100644 --- a/curvefs/src/metaserver/copyset/meta_operator.cpp +++ b/curvefs/src/metaserver/copyset/meta_operator.cpp @@ -182,7 +182,6 @@ void GetOrModifyS3ChunkInfoOperator::OnApply(int64_t index, MetaStatusCode rc; auto request = static_cast(request_); auto response = static_cast(response_); - bool streaming = request->returns3chunkinfomap(); auto metastore = node_->GetMetaStore(); std::shared_ptr connection; std::shared_ptr iterator; @@ -205,7 +204,9 @@ void GetOrModifyS3ChunkInfoOperator::OnApply(int64_t index, } brpc::Controller* cntl = static_cast(cntl_); - if (rc != MetaStatusCode::OK || !streaming) { + if (rc != MetaStatusCode::OK || + !request->returns3chunkinfomap() || + !request->supportstreaming()) { return; } diff --git a/curvefs/src/metaserver/inode_manager.cpp b/curvefs/src/metaserver/inode_manager.cpp index 14aaea369e..ef998f1ffd 100644 --- a/curvefs/src/metaserver/inode_manager.cpp +++ b/curvefs/src/metaserver/inode_manager.cpp @@ -343,10 +343,11 @@ MetaStatusCode InodeManager::GetOrModifyS3ChunkInfo( MetaStatusCode InodeManager::PaddingInodeS3ChunkInfo(int32_t fsId, uint64_t inodeId, - Inode* inode) { + S3ChunkInfoMap* m, + uint64_t limit) { VLOG(1) << "PaddingInodeS3ChunkInfo, fsId: " << fsId << ", inodeId: " << inodeId; - return inodeStorage_->PaddingInodeS3ChunkInfo(fsId, inodeId, inode); + return inodeStorage_->PaddingInodeS3ChunkInfo(fsId, inodeId, m, limit); } MetaStatusCode InodeManager::UpdateInodeWhenCreateOrRemoveSubNode( diff --git a/curvefs/src/metaserver/inode_manager.h b/curvefs/src/metaserver/inode_manager.h index daa1d2d30c..55b2e94ef2 100644 --- a/curvefs/src/metaserver/inode_manager.h +++ b/curvefs/src/metaserver/inode_manager.h @@ -34,7 +34,6 @@ using ::curve::common::NameLock; using ::curvefs::metaserver::S3ChunkInfoList; -using S3ChunkInfoMap = google::protobuf::Map; namespace curvefs { namespace metaserver { @@ -81,7 +80,8 @@ class InodeManager { MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId, uint64_t inodeId, - Inode* inode); + S3ChunkInfoMap* m, + uint64_t limit = 0); MetaStatusCode UpdateInodeWhenCreateOrRemoveSubNode(uint32_t fsId, uint64_t inodeId, bool isCreate); diff --git a/curvefs/src/metaserver/inode_storage.cpp b/curvefs/src/metaserver/inode_storage.cpp index 617c989d87..55f218eb9f 100644 --- a/curvefs/src/metaserver/inode_storage.cpp +++ b/curvefs/src/metaserver/inode_storage.cpp @@ -41,6 +41,8 @@ using ::curvefs::metaserver::storage::Prefix4InodeS3ChunkInfoList; using ::curvefs::metaserver::storage::Prefix4AllInode; using Transaction = std::shared_ptr; +using S3ChunkInfoMap = google::protobuf::Map; + InodeStorage::InodeStorage(std::shared_ptr kvStorage, const std::string& tablename) : kvStorage_(kvStorage), @@ -184,7 +186,7 @@ MetaStatusCode InodeStorage::AddS3ChunkInfoList( uint64_t firstChunkId = list2add.s3chunks(0).chunkid(); uint64_t lastChunkId = list2add.s3chunks(size - 1).chunkid(); Key4S3ChunkInfoList key(fsId, inodeId, chunkIndex, - firstChunkId, lastChunkId); + firstChunkId, lastChunkId, size); std::string skey = conv_->SerializeToString(key); Status s = txn->SSet(table4s3chunkinfo_, skey, list2add); @@ -198,7 +200,8 @@ MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn, uint32_t fsId, uint64_t inodeId, uint64_t chunkIndex, - uint64_t minChunkId) { + uint64_t minChunkId, + uint64_t* size4del) { Prefix4ChunkIndexS3ChunkInfoList prefix(fsId, inodeId, chunkIndex); std::string sprefix = conv_->SerializeToString(prefix); auto iterator = txn->SSeek(table4s3chunkinfo_, sprefix); @@ -206,6 +209,7 @@ MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn, return MetaStatusCode::STORAGE_INTERNAL_ERROR; } + *size4del = 0; uint64_t lastChunkId; Key4S3ChunkInfoList key; std::vector key2del; @@ -221,6 +225,7 @@ MetaStatusCode InodeStorage::RemoveS3ChunkInfoList(Transaction txn, // firstChunkId < minChunkId key2del.push_back(skey); + *size4del += key.size; } for (const auto& skey : key2del) { @@ -249,10 +254,13 @@ MetaStatusCode InodeStorage::AppendS3ChunkInfoList( } MetaStatusCode rc; + uint64_t size4add = list2add.s3chunks_size(); + uint64_t size4del = 0; rc = AddS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, list2add); if (rc == MetaStatusCode::OK && compaction) { uint64_t minChunkId = list2add.s3chunks(0).chunkid(); - rc = RemoveS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, minChunkId); + rc = RemoveS3ChunkInfoList(txn, fsId, inodeId, chunkIndex, + minChunkId, &size4del); } if (rc != MetaStatusCode::OK) { @@ -260,13 +268,25 @@ MetaStatusCode InodeStorage::AppendS3ChunkInfoList( } else if (!txn->Commit().ok()) { rc = MetaStatusCode::STORAGE_INTERNAL_ERROR; } + + if (rc == MetaStatusCode::OK && + !UpdateInodeS3MetaSize(fsId, inodeId, size4add, size4del)) { + rc = MetaStatusCode::STORAGE_INTERNAL_ERROR; + LOG(ERROR) << "UpdateInodeS3MetaSize() failed, size4add=" << size4add + << ", size4del" << size4del; + } return rc; } MetaStatusCode InodeStorage::PaddingInodeS3ChunkInfo(int32_t fsId, uint64_t inodeId, - Inode* inode) { + S3ChunkInfoMap* m, + uint64_t limit) { ReadLockGuard readLockGuard(rwLock_); + if (limit != 0 && GetInodeS3MetaSize(fsId, inodeId) > limit) { + return MetaStatusCode::INODE_S3_META_TOO_LARGE; + } + auto iterator = GetInodeS3ChunkInfoList(fsId, inodeId); if (iterator->Status() != 0) { LOG(ERROR) << "Get inode s3chunkinfo failed"; @@ -282,7 +302,6 @@ MetaStatusCode InodeStorage::PaddingInodeS3ChunkInfo(int32_t fsId, Key4S3ChunkInfoList key; S3ChunkInfoList list; - auto m = inode->mutable_s3chunkinfomap(); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { std::string skey = iterator->Key(); std::string svalue = iterator->Value(); diff --git a/curvefs/src/metaserver/inode_storage.h b/curvefs/src/metaserver/inode_storage.h index 639ddc61e3..433e42d5e4 100644 --- a/curvefs/src/metaserver/inode_storage.h +++ b/curvefs/src/metaserver/inode_storage.h @@ -51,6 +51,7 @@ namespace metaserver { using ::curvefs::metaserver::storage::Key4Inode; using ::curvefs::metaserver::storage::Converter; +using S3ChunkInfoMap = google::protobuf::Map; enum TABLE_TYPE : unsigned char { kTypeInode = 1, @@ -115,7 +116,8 @@ class InodeStorage { MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId, uint64_t inodeId, - Inode* inode); + S3ChunkInfoMap* m, + uint64_t limit = 0); std::shared_ptr GetInodeS3ChunkInfoList(uint32_t fsId, uint64_t inodeId); @@ -143,7 +145,8 @@ class InodeStorage { uint32_t fsId, uint64_t inodeId, uint64_t chunkIndex, - uint64_t minChunkId); + uint64_t minChunkId, + uint64_t* size4del); std::string RealTablename(TABLE_TYPE type, std::string tablename) { std::ostringstream oss; @@ -151,6 +154,28 @@ class InodeStorage { return oss.str(); } + static std::string InodeS3MetaSizeKey(uint32_t fsId, uint64_t inodeId) { + std::ostringstream oss; + oss << fsId << ":" << inodeId; + return oss.str(); + } + + bool UpdateInodeS3MetaSize(uint32_t fsId, uint64_t inodeId, + uint64_t size4add, uint64_t size4del) { + std::string key = InodeS3MetaSizeKey(fsId, inodeId); + uint64_t size = inodeS3MetaSize_[key] + size4add; + if (size < size4del) { + return false; + } + inodeS3MetaSize_[key] = size - size4del; + return true; + } + + uint64_t GetInodeS3MetaSize(uint32_t fsId, uint64_t inodeId) { + std::string key = InodeS3MetaSizeKey(fsId, inodeId); + return inodeS3MetaSize_[key]; + } + bool FindKey(const std::string& key) { return keySet_.find(key) != keySet_.end(); } @@ -170,6 +195,8 @@ class InodeStorage { std::string table4s3chunkinfo_; std::shared_ptr conv_; std::unordered_set keySet_; + // key: Hash(inode), value: the number of inode's chunkinfo size + std::unordered_map inodeS3MetaSize_; }; } // namespace metaserver diff --git a/curvefs/src/metaserver/metaserver.cpp b/curvefs/src/metaserver/metaserver.cpp index 2822e34da9..7d6e64510b 100644 --- a/curvefs/src/metaserver/metaserver.cpp +++ b/curvefs/src/metaserver/metaserver.cpp @@ -500,6 +500,9 @@ void Metaserver::InitStorage() { LOG_IF(FATAL, !conf_->GetUInt64Value( "storage.rocksdb.block_cache_capacity", &storageOptions_.blockCacheCapacity)); + LOG_IF(FATAL, !conf_->GetUInt64Value( + "storage.s3_meta_inside_inode.limit_size", + &storageOptions_.s3MetaLimitSizeInsideInode)); bool succ = ::curvefs::metaserver::storage::InitStorage(storageOptions_); LOG_IF(FATAL, !succ) << "Init storage failed"; diff --git a/curvefs/src/metaserver/metastore.cpp b/curvefs/src/metaserver/metastore.cpp index b2afb3cc0e..beacf5312b 100644 --- a/curvefs/src/metaserver/metastore.cpp +++ b/curvefs/src/metaserver/metastore.cpp @@ -397,13 +397,29 @@ MetaStatusCode MetaStoreImpl::GetInode(const GetInodeRequest* request, return status; } - MetaStatusCode status = - partition->GetInode(fsId, inodeId, response->mutable_inode()); - if (status != MetaStatusCode::OK) { + Inode* inode = response->mutable_inode(); + MetaStatusCode rc = partition->GetInode(fsId, inodeId, inode); + // NOTE: the following two cases we should padding inode's s3chunkinfo: + // (1): for RPC requests which unsupport streaming + // (2): inode's s3chunkinfo is small enough + if (rc == MetaStatusCode::OK) { + uint64_t limit = 0; + if (request->supportstreaming()) { + limit = kvStorage_->GetStorageOptions().s3MetaLimitSizeInsideInode; + } + rc = partition->PaddingInodeS3ChunkInfo( + fsId, inodeId, inode->mutable_s3chunkinfomap(), limit); + if (rc == MetaStatusCode::INODE_S3_META_TOO_LARGE) { + response->set_streaming(true); + rc = MetaStatusCode::OK; + } + } + + if (rc != MetaStatusCode::OK) { response->clear_inode(); } - response->set_statuscode(status); - return status; + response->set_statuscode(rc); + return rc; } MetaStatusCode MetaStoreImpl::BatchGetInodeAttr( @@ -517,6 +533,13 @@ MetaStatusCode MetaStoreImpl::GetOrModifyS3ChunkInfo( request->returns3chunkinfomap(), request->froms3compaction()); } + + if (rc == MetaStatusCode::OK && !request->supportstreaming()) { + rc = partition->PaddingInodeS3ChunkInfo( + request->fsid(), request->inodeid(), + response->mutable_s3chunkinfomap(), 0); + } + response->set_statuscode(rc); return rc; } diff --git a/curvefs/src/metaserver/partition.cpp b/curvefs/src/metaserver/partition.cpp index 3d4b568320..08cfe5a6e1 100644 --- a/curvefs/src/metaserver/partition.cpp +++ b/curvefs/src/metaserver/partition.cpp @@ -278,6 +278,18 @@ MetaStatusCode Partition::GetOrModifyS3ChunkInfo( fsId, inodeId, list2add, iterator, returnS3ChunkInfoMap, compaction); } +MetaStatusCode Partition::PaddingInodeS3ChunkInfo(int32_t fsId, + uint64_t inodeId, + S3ChunkInfoMap* m, + uint64_t limit) { + if (!IsInodeBelongs(fsId, inodeId)) { + return MetaStatusCode::PARTITION_ID_MISSMATCH; + } else if (GetStatus() == PartitionStatus::DELETING) { + return MetaStatusCode::PARTITION_DELETING; + } + return inodeManager_->PaddingInodeS3ChunkInfo(fsId, inodeId, m, limit); +} + MetaStatusCode Partition::InsertInode(const Inode& inode) { if (!IsInodeBelongs(inode.fsid(), inode.inodeid())) { return MetaStatusCode::PARTITION_ID_MISSMATCH; diff --git a/curvefs/src/metaserver/partition.h b/curvefs/src/metaserver/partition.h index f360727c85..6a0ce60704 100644 --- a/curvefs/src/metaserver/partition.h +++ b/curvefs/src/metaserver/partition.h @@ -97,6 +97,11 @@ class Partition { bool returnS3ChunkInfoMap, bool compaction); + MetaStatusCode PaddingInodeS3ChunkInfo(int32_t fsId, + uint64_t inodeId, + S3ChunkInfoMap* m, + uint64_t limit = 0); + MetaStatusCode InsertInode(const Inode& inode); bool GetInodeIdList(std::list* InodeIdList); diff --git a/curvefs/src/metaserver/s3compact_wq_impl.cpp b/curvefs/src/metaserver/s3compact_wq_impl.cpp index 0910d38014..384fc5d7bc 100644 --- a/curvefs/src/metaserver/s3compact_wq_impl.cpp +++ b/curvefs/src/metaserver/s3compact_wq_impl.cpp @@ -443,7 +443,7 @@ bool S3CompactWorkQueueImpl::CompactPrecheck( auto inodeKey = task.inodeKey; auto inodeManager = task.inodeManager; MetaStatusCode rc = inodeManager->PaddingInodeS3ChunkInfo( - inodeKey.fsId, inodeKey.inodeId, inode); + inodeKey.fsId, inodeKey.inodeId, inode->mutable_s3chunkinfomap()); if (rc != MetaStatusCode::OK) { LOG(ERROR) << "Padding inode s3chunkinfo failed, " << "retCode = " << MetaStatusCode_Name(ret); diff --git a/curvefs/src/metaserver/storage/config.h b/curvefs/src/metaserver/storage/config.h index 2a191d5657..52b5d0dc45 100644 --- a/curvefs/src/metaserver/storage/config.h +++ b/curvefs/src/metaserver/storage/config.h @@ -53,6 +53,9 @@ struct StorageOptions { uint64_t orderedMaxWriteBufferNumber; uint64_t blockCacheCapacity; + + // misc config item + uint64_t s3MetaLimitSizeInsideInode; }; } // namespace storage diff --git a/curvefs/src/metaserver/storage/converter.cpp b/curvefs/src/metaserver/storage/converter.cpp index 0898fc98ec..fc715be110 100644 --- a/curvefs/src/metaserver/storage/converter.cpp +++ b/curvefs/src/metaserver/storage/converter.cpp @@ -90,18 +90,21 @@ Key4S3ChunkInfoList::Key4S3ChunkInfoList() inodeId(0), chunkIndex(0), firstChunkId(0), - lastChunkId(0) {} + lastChunkId(0), + size(0) {} Key4S3ChunkInfoList::Key4S3ChunkInfoList(uint32_t fsId, uint64_t inodeId, uint64_t chunkIndex, uint64_t firstChunkId, - uint64_t lastChunkId) + uint64_t lastChunkId, + uint64_t size) : fsId(fsId), inodeId(inodeId), chunkIndex(chunkIndex), firstChunkId(firstChunkId), - lastChunkId(lastChunkId) {} + lastChunkId(lastChunkId), + size(size) {} std::string Key4S3ChunkInfoList::SerializeToString() const { std::ostringstream oss; @@ -109,18 +112,20 @@ std::string Key4S3ChunkInfoList::SerializeToString() const { << inodeId << ":" << chunkIndex << ":" << std::setw(kMaxUint64Length_) << std::setfill('0') << firstChunkId << ":" - << std::setw(kMaxUint64Length_) << std::setfill('0') << lastChunkId; + << std::setw(kMaxUint64Length_) << std::setfill('0') << lastChunkId + << ":" << size; return oss.str(); } bool Key4S3ChunkInfoList::ParseFromString(const std::string& value) { std::vector items; SplitString(value, ":", &items); - return items.size() == 6 && CompareType(items[0], keyType_) && + return items.size() == 7 && CompareType(items[0], keyType_) && StringToUl(items[1], &fsId) && StringToUll(items[2], &inodeId) && StringToUll(items[3], &chunkIndex) && StringToUll(items[4], &firstChunkId) && - StringToUll(items[5], &lastChunkId); + StringToUll(items[5], &lastChunkId) && + StringToUll(items[6], &size); } Prefix4ChunkIndexS3ChunkInfoList::Prefix4ChunkIndexS3ChunkInfoList() diff --git a/curvefs/src/metaserver/storage/converter.h b/curvefs/src/metaserver/storage/converter.h index f3a2a65926..ce1d1c67b9 100644 --- a/curvefs/src/metaserver/storage/converter.h +++ b/curvefs/src/metaserver/storage/converter.h @@ -94,7 +94,8 @@ class Key4S3ChunkInfoList : public StorageKey { uint64_t inodeId, uint64_t chunkIndex, uint64_t firstChunkId, - uint64_t lastChunkId); + uint64_t lastChunkId, + uint64_t size); std::string SerializeToString() const override; @@ -109,6 +110,7 @@ class Key4S3ChunkInfoList : public StorageKey { uint64_t chunkIndex; uint64_t firstChunkId; uint64_t lastChunkId; + uint64_t size; }; class Prefix4ChunkIndexS3ChunkInfoList : public StorageKey { diff --git a/curvefs/src/metaserver/storage/memory_storage.cpp b/curvefs/src/metaserver/storage/memory_storage.cpp index f0f6def6ac..8e4b41906b 100644 --- a/curvefs/src/metaserver/storage/memory_storage.cpp +++ b/curvefs/src/metaserver/storage/memory_storage.cpp @@ -321,6 +321,10 @@ bool MemoryStorage::GetStatistics(StorageStatistics* statistics) { return true; } +StorageOptions MemoryStorage::GetStorageOptions() const { + return options_; +} + } // namespace storage } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/storage/memory_storage.h b/curvefs/src/metaserver/storage/memory_storage.h index 8ffac3f0d1..1a5791b6b7 100644 --- a/curvefs/src/metaserver/storage/memory_storage.h +++ b/curvefs/src/metaserver/storage/memory_storage.h @@ -97,6 +97,8 @@ class MemoryStorage : public KVStorage, public StorageTransaction { bool GetStatistics(StorageStatistics* Statistics) override; + StorageOptions GetStorageOptions() const override; + Status HGet(const std::string& name, const std::string& key, ValueType* value) override; diff --git a/curvefs/src/metaserver/storage/rocksdb_storage.cpp b/curvefs/src/metaserver/storage/rocksdb_storage.cpp index 2bb4fb40d3..5b4107408e 100644 --- a/curvefs/src/metaserver/storage/rocksdb_storage.cpp +++ b/curvefs/src/metaserver/storage/rocksdb_storage.cpp @@ -281,7 +281,7 @@ Status RocksDBStorage::Get(const std::string& name, std::string iname = ToInternalName(name, ordered); std::string ikey = ToInternalKey(iname, key); - if (!FindKey(iname, ikey)) { + if (!InTransaction_ && !FindKey(iname, ikey)) { return Status::NotFound(); } @@ -332,7 +332,7 @@ Status RocksDBStorage::Del(const std::string& name, std::string iname = ToInternalName(name, ordered); std::string ikey = ToInternalKey(iname, key); - if (!counter_->Find(iname, ikey)) { + if (!InTransaction_ && !counter_->Find(iname, ikey)) { return Status::NotFound(); } @@ -442,6 +442,10 @@ bool RocksDBStorage::GetStatistics(StorageStatistics* statistics) { return true; } +StorageOptions RocksDBStorage::GetStorageOptions() const { + return options_; +} + } // namespace storage } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/storage/rocksdb_storage.h b/curvefs/src/metaserver/storage/rocksdb_storage.h index fbb4d22b4e..b7434cb6b2 100644 --- a/curvefs/src/metaserver/storage/rocksdb_storage.h +++ b/curvefs/src/metaserver/storage/rocksdb_storage.h @@ -133,6 +133,8 @@ class RocksDBStorage : public KVStorage, public StorageTransaction { bool GetStatistics(StorageStatistics* Statistics) override; + StorageOptions GetStorageOptions() const override; + // unordered Status HGet(const std::string& name, const std::string& key, @@ -335,13 +337,21 @@ class RocksDBStorageIterator : public Iterator { ordered_(ordered) { if (status_ == 0) { readOptions_ = storage_->ReadOptions(); - readOptions_.snapshot = storage_->db_->GetSnapshot(); + if (storage_->InTransaction_) { + readOptions_.snapshot = storage_->txn_->GetSnapshot(); + } else { + readOptions_.snapshot = storage_->db_->GetSnapshot(); + } } } ~RocksDBStorageIterator() { if (status_ == 0) { - storage_->db_->ReleaseSnapshot(readOptions_.snapshot); + if (storage_->InTransaction_) { + storage_->txn_->ClearSnapshot(); + } else { + storage_->db_->ReleaseSnapshot(readOptions_.snapshot); + } } } @@ -362,7 +372,11 @@ class RocksDBStorageIterator : public Iterator { void SeekToFirst() { auto handler = storage_->GetColumnFamilyHandle(ordered_); - iter_ = storage_->db_->NewIterator(readOptions_, handler); + if (storage_->InTransaction_) { + iter_ = storage_->txn_->GetIterator(readOptions_, handler); + } else { + iter_ = storage_->db_->NewIterator(readOptions_, handler); + } iter_->Seek(prefix_); } diff --git a/curvefs/src/metaserver/storage/storage.h b/curvefs/src/metaserver/storage/storage.h index efc718f452..dc84d8189f 100644 --- a/curvefs/src/metaserver/storage/storage.h +++ b/curvefs/src/metaserver/storage/storage.h @@ -110,6 +110,8 @@ class KVStorage : public BaseStorage { virtual bool GetStatistics(StorageStatistics* Statistics) = 0; + virtual StorageOptions GetStorageOptions() const = 0; + virtual std::shared_ptr BeginTransaction() = 0; }; diff --git a/curvefs/test/client/mock_metaserver_client.h b/curvefs/test/client/mock_metaserver_client.h index 1cb7c48b51..92dc7131dc 100644 --- a/curvefs/test/client/mock_metaserver_client.h +++ b/curvefs/test/client/mock_metaserver_client.h @@ -72,8 +72,8 @@ class MockMetaServerClient : public MetaServerClient { MOCK_METHOD1(PrepareRenameTx, MetaStatusCode(const std::vector& dentrys)); - MOCK_METHOD3(GetInode, MetaStatusCode( - uint32_t fsId, uint64_t inodeid, Inode *out)); + MOCK_METHOD4(GetInode, MetaStatusCode( + uint32_t fsId, uint64_t inodeid, Inode *out, bool* streaming)); MOCK_METHOD3(BatchGetInodeAttr, MetaStatusCode( uint32_t fsId, std::set *inodeIds, diff --git a/curvefs/test/client/rpcclient/metaserver_client_test.cpp b/curvefs/test/client/rpcclient/metaserver_client_test.cpp index b0bd273991..bd636948fe 100644 --- a/curvefs/test/client/rpcclient/metaserver_client_test.cpp +++ b/curvefs/test/client/rpcclient/metaserver_client_test.cpp @@ -627,6 +627,8 @@ TEST_F(MetaServerClientImplTest, test_GetInode) { out.set_rdev(0); out.set_symlink("test9"); + bool streaming; + curvefs::metaserver::GetInodeResponse response; // test0: rpc error @@ -637,7 +639,8 @@ TEST_F(MetaServerClientImplTest, test_GetInode) { .WillRepeatedly(DoAll(SetArgPointee<2>(target_), SetArgPointee<3>(applyIndex), Return(true))); - MetaStatusCode status = metaserverCli_.GetInode(fsid, inodeid, &out); + MetaStatusCode status = metaserverCli_.GetInode( + fsid, inodeid, &out, &streaming); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); // test1: get inode ok @@ -652,7 +655,7 @@ TEST_F(MetaServerClientImplTest, test_GetInode) { Invoke(SetRpcService))); EXPECT_CALL(*mockMetacache_.get(), UpdateApplyIndex(_, _)); - status = metaserverCli_.GetInode(fsid, inodeid, &out); + status = metaserverCli_.GetInode(fsid, inodeid, &out, &streaming); ASSERT_EQ(MetaStatusCode::OK, status); // test2: get inode with not found error @@ -661,7 +664,7 @@ TEST_F(MetaServerClientImplTest, test_GetInode) { .WillOnce( DoAll(SetArgPointee<2>(response), Invoke(SetRpcService))); - status = metaserverCli_.GetInode(fsid, inodeid, &out); + status = metaserverCli_.GetInode(fsid, inodeid, &out, &streaming); ASSERT_EQ(MetaStatusCode::NOT_FOUND, status); // test3: test response do not have applyindex @@ -672,20 +675,20 @@ TEST_F(MetaServerClientImplTest, test_GetInode) { DoAll(SetArgPointee<2>(response), Invoke(SetRpcService))); - status = metaserverCli_.GetInode(fsid, inodeid, &out); + status = metaserverCli_.GetInode(fsid, inodeid, &out, &streaming); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); // test4: test response do not have inode response.set_appliedindex(10); response.clear_inode(); - status = metaserverCli_.GetInode(fsid, inodeid, &out); + status = metaserverCli_.GetInode(fsid, inodeid, &out, &streaming); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); // test5: do not have both dentrys and appliedindex response.clear_inode(); response.clear_appliedindex(); - status = metaserverCli_.GetInode(fsid, inodeid, &out); + status = metaserverCli_.GetInode(fsid, inodeid, &out, &streaming); ASSERT_EQ(MetaStatusCode::RPC_ERROR, status); } diff --git a/curvefs/test/client/test_inode_cache_manager.cpp b/curvefs/test/client/test_inode_cache_manager.cpp index aef119ed24..14ca574f7a 100644 --- a/curvefs/test/client/test_inode_cache_manager.cpp +++ b/curvefs/test/client/test_inode_cache_manager.cpp @@ -83,7 +83,7 @@ TEST_F(TestInodeCacheManager, GetInode) { inode.set_fsid(fsId_); inode.set_length(fileLength); - EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _)) + EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _, _)) .WillOnce(Return(MetaStatusCode::NOT_FOUND)) .WillOnce(DoAll(SetArgPointee<2>(inode), Return(MetaStatusCode::OK))); @@ -109,7 +109,7 @@ TEST_F(TestInodeCacheManager, GetInode) { curvefs::client::common::FLAGS_enableCto = true; inodeWrapper->SetOpenCount(0); - EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _)) + EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _, _)) .WillOnce(Return(MetaStatusCode::NOT_FOUND)); ret = iCacheManager_->GetInode(inodeId, inodeWrapper); ASSERT_EQ(CURVEFS_ERROR::NOTEXIST, ret); @@ -118,7 +118,7 @@ TEST_F(TestInodeCacheManager, GetInode) { ASSERT_EQ(fileLength, out.length()); inodeWrapper->SetOpenCount(1); - EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _)) + EXPECT_CALL(*metaClient_, GetInode(fsId_, inodeId, _, _)) .WillOnce(Return(MetaStatusCode::NOT_FOUND)); ret = iCacheManager_->GetInode(inodeId, inodeWrapper); ASSERT_EQ(CURVEFS_ERROR::NOTEXIST, ret); diff --git a/curvefs/test/metaserver/inode_storage_test.cpp b/curvefs/test/metaserver/inode_storage_test.cpp index 5737463aba..5d432509dc 100644 --- a/curvefs/test/metaserver/inode_storage_test.cpp +++ b/curvefs/test/metaserver/inode_storage_test.cpp @@ -155,9 +155,8 @@ class InodeStorageTest : public ::testing::Test { Key4S3ChunkInfoList key; S3ChunkInfoList list4get; for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { - LOG(INFO) << "checking chunkIndex(" << size << "), key=" - << iterator->Key(); ASSERT_TRUE(conv_->ParseFromString(iterator->Key(), &key)); + LOG(INFO) << "key" << size << "=" << iterator->Key(); ASSERT_TRUE(iterator->ParseFromValue(&list4get)); ASSERT_EQ(key.chunkIndex, chunkIndexs[size]); ASSERT_TRUE(EqualS3ChunkInfoList(list4get, lists[size])); @@ -435,6 +434,7 @@ TEST_F(InodeStorageTest, PaddingInodeS3ChunkInfo) { uint32_t fsId = 1; uint64_t inodeId = 1; InodeStorage storage(kvStorage_, tablename_); + S3ChunkInfoList list2del; // step1: insert inode Inode inode = GenInode(fsId, inodeId); @@ -442,7 +442,7 @@ TEST_F(InodeStorageTest, PaddingInodeS3ChunkInfo) { // step2: append s3chunkinfo std::vector chunkIndexs{ 1, 3, 2, 1, 2 }; - std::vector lists{ + std::vector lists2add{ GenS3ChunkInfoList(100, 109), GenS3ChunkInfoList(300, 310), GenS3ChunkInfoList(200, 209), @@ -450,22 +450,54 @@ TEST_F(InodeStorageTest, PaddingInodeS3ChunkInfo) { GenS3ChunkInfoList(210, 220), }; - for (size_t size = 0; size < chunkIndexs.size(); size++) { + for (size_t i = 0; i < chunkIndexs.size(); i++) { MetaStatusCode rc = storage.AppendS3ChunkInfoList( - fsId, inodeId, chunkIndexs[size], lists[size], false); + fsId, inodeId, chunkIndexs[i], lists2add[i], false); ASSERT_EQ(rc, MetaStatusCode::OK); } - - // step3: padding inode s3chunkinfo ASSERT_EQ(inode.mutable_s3chunkinfomap()->size(), 0); - MetaStatusCode rc = storage.PaddingInodeS3ChunkInfo(fsId, inodeId, &inode); - ASSERT_EQ(rc, MetaStatusCode::OK); - auto m = inode.s3chunkinfomap(); - ASSERT_EQ(m.size(), 3); - ASSERT_TRUE(EqualS3ChunkInfoList(m[1], GenS3ChunkInfoList(100, 120))); - ASSERT_TRUE(EqualS3ChunkInfoList(m[2], GenS3ChunkInfoList(200, 220))); - ASSERT_TRUE(EqualS3ChunkInfoList(m[3], GenS3ChunkInfoList(300, 310))); + // CASE 1: padding inode s3chunkinfo success + { + LOG(INFO) << "CASE 1: padding inode s3chunkinfo success"; + Inode out; + MetaStatusCode rc = storage.PaddingInodeS3ChunkInfo( + fsId, inodeId, out.mutable_s3chunkinfomap()); + ASSERT_EQ(rc, MetaStatusCode::OK); + + auto m = out.s3chunkinfomap(); + ASSERT_EQ(m.size(), 3); + ASSERT_TRUE(EqualS3ChunkInfoList(m[1], GenS3ChunkInfoList(100, 120))); + ASSERT_TRUE(EqualS3ChunkInfoList(m[2], GenS3ChunkInfoList(200, 220))); + ASSERT_TRUE(EqualS3ChunkInfoList(m[3], GenS3ChunkInfoList(300, 310))); + } + + // CASE 2: padding inode s3chunkinfo within limit + { + LOG(INFO) << "CASE 2: padding inode s3chunkinfo within limit"; + Inode out; + MetaStatusCode rc = storage.PaddingInodeS3ChunkInfo( + fsId, inodeId, out.mutable_s3chunkinfomap(), 53); + ASSERT_EQ(rc, MetaStatusCode::OK); + + auto m = out.s3chunkinfomap(); + ASSERT_EQ(m.size(), 3); + ASSERT_TRUE(EqualS3ChunkInfoList(m[1], GenS3ChunkInfoList(100, 120))); + ASSERT_TRUE(EqualS3ChunkInfoList(m[2], GenS3ChunkInfoList(200, 220))); + ASSERT_TRUE(EqualS3ChunkInfoList(m[3], GenS3ChunkInfoList(300, 310))); + } + + // CASE 3: padding inode s3chunkinfo exceed limit + { + LOG(INFO) << "CASE 3: padding inode s3chunkinfo exceed limit"; + Inode out; + MetaStatusCode rc = storage.PaddingInodeS3ChunkInfo( + fsId, inodeId, out.mutable_s3chunkinfomap(), 52); + ASSERT_EQ(rc, MetaStatusCode::INODE_S3_META_TOO_LARGE); + + auto m = out.s3chunkinfomap(); + ASSERT_EQ(m.size(), 0); + } } TEST_F(InodeStorageTest, GetAllS3ChunkInfoList) { diff --git a/curvefs/test/metaserver/metastore_test.cpp b/curvefs/test/metaserver/metastore_test.cpp index 7af9617030..ba0e6e7301 100644 --- a/curvefs/test/metaserver/metastore_test.cpp +++ b/curvefs/test/metaserver/metastore_test.cpp @@ -58,6 +58,7 @@ class MetastoreTest : public ::testing::Test { dataDir_ = RandomStoragePath();; StorageOptions options; options.dataDir = dataDir_; + options.s3MetaLimitSizeInsideInode = 100; kvStorage_ = std::make_shared(options); ASSERT_TRUE(kvStorage_->Open()); @@ -188,6 +189,24 @@ class MetastoreTest : public ::testing::Test { return list; } + void CHECK_ITERATOR_S3CHUNKINFOLIST( + std::shared_ptr iterator, + const std::vector chunkIndexs, + const std::vector lists) { + size_t size = 0; + Key4S3ChunkInfoList key; + S3ChunkInfoList list4get; + ASSERT_EQ(iterator->Status(), 0); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(conv_->ParseFromString(iterator->Key(), &key)); + ASSERT_TRUE(conv_->ParseFromString(iterator->Value(), &list4get)); + ASSERT_EQ(key.chunkIndex, chunkIndexs[size]); + ASSERT_TRUE(EqualS3ChunkInfoList(list4get, lists[size])); + size++; + } + ASSERT_EQ(size, chunkIndexs.size()); + } + class OnSnapshotSaveDoneImpl : public OnSnapshotSaveDoneClosure { public: void SetSuccess() { @@ -1397,6 +1416,7 @@ TEST_F(MetastoreTest, GetOrModifyS3ChunkInfo) { // CASE 1: partition not found -> failed { + LOG(INFO) << "CASE 1: partition not found -> failed"; GetOrModifyS3ChunkInfoRequest request; GetOrModifyS3ChunkInfoResponse response; request.set_partitionid(100); @@ -1409,18 +1429,127 @@ TEST_F(MetastoreTest, GetOrModifyS3ChunkInfo) { // CASE 2: GetOrModifyS3ChunkInfo success { + LOG(INFO) << "CASE 2: GetOrModifyS3ChunkInfo success"; + GetOrModifyS3ChunkInfoRequest request; + GetOrModifyS3ChunkInfoResponse response; std::vector chunkIndexs{ 1, 2 }; - std::vector list2add{ + std::vector lists2add{ GenS3ChunkInfoList(100, 200), GenS3ChunkInfoList(300, 400), }; + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_inodeid(inodeId); + request.set_supportstreaming(true); + request.set_returns3chunkinfomap(true); + for (size_t i = 0; i < chunkIndexs.size(); i++) { + request.mutable_s3chunkinfoadd()->insert( + { chunkIndexs[i], lists2add[i] }); + } + + std::shared_ptr iterator; + MetaStatusCode rc = metastore.GetOrModifyS3ChunkInfo( + &request, &response, &iterator); + ASSERT_EQ(rc, MetaStatusCode::OK); + ASSERT_EQ(response.statuscode(), rc); + ASSERT_EQ(response.mutable_s3chunkinfomap()->size(), 0); + + CHECK_ITERATOR_S3CHUNKINFOLIST(iterator, chunkIndexs, lists2add); + } + + // CASE 3: GetOrModifyS3ChunkInfo success with unsupport streaming + { + LOG(INFO) << "CASE 3: GetOrModifyS3ChunkInfo success" + << " with unsupport streaming"; GetOrModifyS3ChunkInfoRequest request; GetOrModifyS3ChunkInfoResponse response; + std::vector chunkIndexs{ 1, 2 }; + std::vector lists2add{ + GenS3ChunkInfoList(100, 200), + GenS3ChunkInfoList(300, 400), + }; + request.set_partitionid(partitionId); request.set_fsid(fsId); request.set_inodeid(inodeId); + request.set_supportstreaming(false); request.set_returns3chunkinfomap(true); + for (size_t i = 0; i < chunkIndexs.size(); i++) { + request.mutable_s3chunkinfoadd()->insert( + { chunkIndexs[i], lists2add[i] }); + } + + std::shared_ptr iterator; + MetaStatusCode rc = metastore.GetOrModifyS3ChunkInfo( + &request, &response, &iterator); + ASSERT_EQ(rc, MetaStatusCode::OK); + ASSERT_EQ(response.statuscode(), rc); + ASSERT_EQ(response.mutable_s3chunkinfomap()->size(), 2); + } +} + +TEST_F(MetastoreTest, GetInodeWithPaddingS3Meta) { + MetaStoreImpl metastore(nullptr, kvStorage_); + uint32_t poolId = 1; + uint32_t copysetId = 1; + uint32_t partitionId = 1; + uint32_t fsId = 1; + uint64_t inodeId = 1; + + // init: create partition + { + CreatePartitionRequest request; + CreatePartitionResponse response; + + PartitionInfo partitionInfo; + partitionInfo.set_poolid(poolId); + partitionInfo.set_copysetid(copysetId); + partitionInfo.set_partitionid(partitionId); + partitionInfo.set_fsid(fsId); + partitionInfo.set_start(1); + partitionInfo.set_end(100); + request.mutable_partition()->CopyFrom(partitionInfo); + MetaStatusCode rc = metastore.CreatePartition(&request, &response); + ASSERT_EQ(rc, MetaStatusCode::OK); + ASSERT_EQ(response.statuscode(), rc); + } + + // step1: create inode + { + CreateInodeRequest request; + CreateInodeResponse response; + + request.set_poolid(poolId); + request.set_copysetid(copysetId); + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_length(1); + request.set_uid(1); + request.set_gid(1); + request.set_mode(777); + request.set_type(FsFileType::TYPE_FILE); + + auto rc = metastore.CreateInode(&request, &response); + ASSERT_EQ(response.statuscode(), MetaStatusCode::OK); + inodeId = response.inode().inodeid(); + } + + // step2: append s3chunkinfo within limit + { + GetOrModifyS3ChunkInfoRequest request; + GetOrModifyS3ChunkInfoResponse response; + + std::vector chunkIndexs{ 1, 2 }; + std::vector list2add{ + GenS3ChunkInfoList(100, 149), + GenS3ChunkInfoList(200, 249), + }; + + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_inodeid(inodeId); + request.set_returns3chunkinfomap(false); for (size_t i = 0; i < chunkIndexs.size(); i++) { request.mutable_s3chunkinfoadd()->insert( { chunkIndexs[i], list2add[i] }); @@ -1432,20 +1561,93 @@ TEST_F(MetastoreTest, GetOrModifyS3ChunkInfo) { &request, &response, &iterator); ASSERT_EQ(rc, MetaStatusCode::OK); ASSERT_EQ(response.statuscode(), rc); + } - size_t size = 0; - Key4S3ChunkInfoList key; - S3ChunkInfoList list4get; - for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { - LOG(INFO) << "check chunkIndex(" << size << ")" - << ", key=" << iterator->Key(); - ASSERT_TRUE(conv_->ParseFromString(iterator->Key(), &key)); - ASSERT_TRUE(conv_->ParseFromString(iterator->Value(), &list4get)); - ASSERT_EQ(key.chunkIndex, chunkIndexs[size]); - ASSERT_TRUE(EqualS3ChunkInfoList(list2add[size], list4get)); - size++; + // step3: get inode with support streaming + { + GetInodeRequest request; + GetInodeResponse response; + + request.set_poolid(poolId); + request.set_copysetid(copysetId); + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_inodeid(inodeId); + request.set_supportstreaming(true); + + auto rc = metastore.GetInode(&request, &response); + ASSERT_EQ(response.statuscode(), MetaStatusCode::OK); + ASSERT_EQ(rc, MetaStatusCode::OK); + auto inode = response.mutable_inode(); + ASSERT_EQ(response.streaming(), false); + ASSERT_EQ(inode->mutable_s3chunkinfomap()->size(), 2); + } + + // step4: append s3chunkinfo exceed limit + { + GetOrModifyS3ChunkInfoRequest request; + GetOrModifyS3ChunkInfoResponse response; + + std::vector chunkIndexs{ 3 }; + std::vector list2add{ + GenS3ChunkInfoList(1, 1), + }; + + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_inodeid(inodeId); + request.set_returns3chunkinfomap(false); + for (size_t i = 0; i < chunkIndexs.size(); i++) { + request.mutable_s3chunkinfoadd()->insert( + { chunkIndexs[i], list2add[i] }); } - ASSERT_EQ(size, 2); + + // ModifyS3ChunkInfo() + std::shared_ptr iterator; + MetaStatusCode rc = metastore.GetOrModifyS3ChunkInfo( + &request, &response, &iterator); + ASSERT_EQ(rc, MetaStatusCode::OK); + ASSERT_EQ(response.statuscode(), rc); + } + + // step5: get inode with support straming + { + GetInodeRequest request; + GetInodeResponse response; + + request.set_poolid(poolId); + request.set_copysetid(copysetId); + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_inodeid(inodeId); + request.set_supportstreaming(true); + + auto rc = metastore.GetInode(&request, &response); + ASSERT_EQ(response.statuscode(), MetaStatusCode::OK); + ASSERT_EQ(rc, MetaStatusCode::OK); + auto inode = response.mutable_inode(); + ASSERT_EQ(response.streaming(), true); + ASSERT_EQ(inode->mutable_s3chunkinfomap()->size(), 0); + } + + // step6: get inode without unsupport streaming + { + GetInodeRequest request; + GetInodeResponse response; + + request.set_poolid(poolId); + request.set_copysetid(copysetId); + request.set_partitionid(partitionId); + request.set_fsid(fsId); + request.set_inodeid(inodeId); + request.set_supportstreaming(false); + + auto rc = metastore.GetInode(&request, &response); + ASSERT_EQ(response.statuscode(), MetaStatusCode::OK); + ASSERT_EQ(rc, MetaStatusCode::OK); + auto inode = response.mutable_inode(); + ASSERT_EQ(response.streaming(), false); + ASSERT_EQ(inode->mutable_s3chunkinfomap()->size(), 3); } }