Skip to content

Commit

Permalink
add warmup list functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Ken Han <[email protected]>
  • Loading branch information
ken90242 committed Aug 14, 2023
1 parent 34a9d19 commit c12fbac
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 17 deletions.
5 changes: 4 additions & 1 deletion curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype);
const uint32_t MAX_XATTR_NAME_LENGTH = 255;
const uint32_t MAX_XATTR_VALUE_LENGTH = 64 * 1024;

const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op";
const char kCurveFsWarmupListXAttr[] = "curvefs.warmup.list";
const char kCurveFsWarmupQueryXAttr[] = "curvefs.warmup.query";
const char kCurveFsWarmupAddXAttr[] = "curvefs.warmup.add";
const char kCurveFsWarmupCancelXAttr[] = "curvefs.warmup.cancel";


constexpr int kWarmupOpNum = 6;
Expand Down
69 changes: 60 additions & 9 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
* Author: xuchaojie
*/

#include <string>
#include <memory>
#include <cstring>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -52,6 +53,10 @@ using ::curvefs::client::FuseClient;
using ::curvefs::client::FuseS3Client;
using ::curvefs::client::FuseVolumeClient;
using ::curvefs::client::common::FuseClientOption;
using ::curvefs::client::common::kCurveFsWarmupQueryXAttr;
using ::curvefs::client::common::kCurveFsWarmupAddXAttr;
using ::curvefs::client::common::kCurveFsWarmupListXAttr;
using ::curvefs::client::warmup::WarmupProgress;
using ::curvefs::client::rpcclient::MdsClientImpl;
using ::curvefs::client::rpcclient::MDSBaseClient;
using ::curvefs::client::metric::ClientOpMetric;
Expand Down Expand Up @@ -228,7 +233,7 @@ int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key, storageType,
result = g_ClientInstance->PutWarmFilelistTask(key, storageType, path,
mount_point, root);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
Expand All @@ -246,7 +251,7 @@ int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
}

void QueryWarmupTask(fuse_ino_t key, std::string *data) {
curvefs::client::warmup::WarmupProgress progress;
WarmupProgress progress;
bool ret = g_ClientInstance->GetWarmupProgress(key, &progress);
if (!ret) {
*data = "finished";
Expand All @@ -257,6 +262,29 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
VLOG(9) << "Warmup [" << key << "]" << *data;
}

void ListWarmupTasks(std::string *data) {
WarmupProgress progress;
std::unordered_map<std::string, WarmupProgress> filepath2progress;

bool ret = g_ClientInstance->GetAllWarmupProgress(&filepath2progress);

std::ostringstream filepath2warmupProgress;

for (auto it = filepath2progress.begin();
it != filepath2progress.end();
++it) {
std::string progressStr = std::to_string(it->second.GetFinished());
progressStr += "/" + std::to_string(it->second.GetTotal());
filepath2warmupProgress << it->first << ":" << progressStr << ";";
VLOG(9) << "Warmup [" << it->first << "]" << progressStr;
}
if (!ret) {
*data = "finished";
} else {
*data = filepath2warmupProgress.str();
}
}

int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {
/*
* value[0]: WarmupOpType: add, single
Expand Down Expand Up @@ -323,10 +351,20 @@ FuseClient* Client() {
return g_ClientInstance;
}

const char* warmupXAttr = ::curvefs::client::common::kCurveFsWarmupXAttr;
const char* warmupListXAttr = kCurveFsWarmupListXAttr;
const char* warmupAddXAttr = kCurveFsWarmupAddXAttr;
const char* warmupQueryXAttr = kCurveFsWarmupQueryXAttr;

bool IsWamupReq(const char* name) {
return strcmp(name, warmupXAttr) == 0;
bool IsWamupQueryReq(const char* name) {
return strcmp(name, warmupQueryXAttr) == 0;
}

bool IsWamupAddReq(const char* name) {
return strcmp(name, warmupAddXAttr) == 0;
}

bool IsWarmupListReq(const char* name) {
return strcmp(name, warmupListXAttr) == 0;
}

void TriggerWarmup(fuse_req_t req,
Expand All @@ -352,6 +390,17 @@ void QueryWarmup(fuse_req_t req, fuse_ino_t ino, size_t size) {
return fs->ReplyBuffer(req, data.data(), data.length());
}

void ListWarmup(fuse_req_t req, size_t size) {
auto fs = Client()->GetFileSystem();

std::string data;
ListWarmupTasks(&data);
if (size == 0) {
return fs->ReplyXattr(req, data.length());
}
return fs->ReplyBuffer(req, data.data(), data.length());
}

void ReadThrottleAdd(size_t size) { Client()->Add(true, size); }
void WriteThrottleAdd(size_t size) { Client()->Add(false, size); }

Expand Down Expand Up @@ -814,7 +863,7 @@ void FuseOpSetXattr(fuse_req_t req,
ino, name, size, flags, StrErr(rc));
});

if (IsWamupReq(name)) {
if (IsWamupAddReq(name)) {
return TriggerWarmup(req, ino, name, value, size);
}
rc = client->FuseOpSetXattr(req, ino, name, value, size, flags);
Expand All @@ -835,8 +884,10 @@ void FuseOpGetXattr(fuse_req_t req,
ino, name, size, StrErr(rc), value.size());
});

if (IsWamupReq(name)) {
if (IsWamupQueryReq(name)) {
return QueryWarmup(req, ino, size);
} else if (IsWarmupListReq(name)) {
return ListWarmup(req, size);
}

rc = Client()->FuseOpGetXattr(req, ino, name, &value, size);
Expand Down
16 changes: 14 additions & 2 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <memory>
#include <string>
#include <list>
#include <unordered_map>
#include <utility>
#include <vector>
#include <atomic>
Expand Down Expand Up @@ -90,6 +91,9 @@ using ::curvefs::client::filesystem::FileOut;

using curvefs::common::is_aligned;

using Filepath2WarmupProgressMap = std::unordered_map<std::string,
warmup::WarmupProgress>;

const uint32_t kMaxHostNameLength = 255u;

using mds::Mountpoint;
Expand Down Expand Up @@ -307,11 +311,12 @@ class FuseClient {

bool PutWarmFilelistTask(fuse_ino_t key,
common::WarmupStorageType type,
const std::string &path,
const std::string &mount_point,
const std::string &root) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFilelist(key, type, mount_point,
root);
return warmupManager_->AddWarmupFilelist(
key, type, path, mount_point, root);
} // only support s3
return true;
}
Expand All @@ -331,6 +336,13 @@ class FuseClient {
return false;
}

bool GetAllWarmupProgress(Filepath2WarmupProgressMap *filepath2progress) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->ListWarmupProgress(filepath2progress);
}
return false;
}

CURVEFS_ERROR SetMountStatus(const struct MountOption *mountOption);

void Add(bool isRead, size_t size) { throttle_.Add(isRead, size); }
Expand Down
6 changes: 4 additions & 2 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ using curve::common::WriteLockGuard;

bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key,
WarmupStorageType type,
const std::string &path,
const std::string& mount_point,
const std::string& root) {
if (!mounted_.load(std::memory_order_acquire)) {
LOG(ERROR) << "not mounted";
return false;
}
// add warmup Progress
if (AddWarmupProcess(key, type)) {
if (AddWarmupProcess(key, path, type)) {
VLOG(9) << "add warmup list task:" << key;
WriteLockGuard lock(warmupFilelistDequeMutex_);
auto iter = FindWarmupFilelistByKeyLocked(key);
Expand All @@ -84,7 +85,7 @@ bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, const std::string &path,
return false;
}
// add warmup Progress
if (AddWarmupProcess(key, type)) {
if (AddWarmupProcess(key, path, type)) {
VLOG(9) << "add warmup single task:" << key;
FetchDentryEnqueue(key, path);
}
Expand Down Expand Up @@ -603,6 +604,7 @@ void WarmupManagerS3Impl::ScanCleanWarmupProgress() {
for (auto iter = inode2Progress_.begin(); iter != inode2Progress_.end();) {
if (ProgressDone(iter->first)) {
VLOG(9) << "warmup key: " << iter->first << " done!";
inode2Filepath_.erase(iter->first);
iter = inode2Progress_.erase(iter);
} else {
++iter;
Expand Down
27 changes: 26 additions & 1 deletion curvefs/src/client/warmup/warmup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class WarmupManager {
virtual void UnInit() { ClearWarmupProcess(); }

virtual bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type,
const std::string &path,
const std::string& mount_point,
const std::string& root) = 0;
virtual bool AddWarmupFile(fuse_ino_t key, const std::string &path,
Expand All @@ -223,6 +224,8 @@ class WarmupManager {
kvClientManager_ = std::move(kvClientManager);
}

using Filepath2WarmupProgressMap = std::unordered_map<std::string,
WarmupProgress>;
/**
* @brief
*
Expand All @@ -243,6 +246,21 @@ class WarmupManager {
return ret;
}

bool ListWarmupProgress(Filepath2WarmupProgressMap *filepath2progress) {
ReadLockGuard lock(inode2ProgressMutex_);

for (auto filepathIt = inode2Filepath_.begin();
filepathIt != inode2Filepath_.end();
++filepathIt) {
auto progressIt = FindWarmupProgressByKeyLocked(filepathIt->first);

filepath2progress->emplace(filepathIt->second,
WarmupProgress(progressIt->second));
}

return !inode2Progress_.empty();
}

void CollectMetrics(InterfaceMetric *interface, int count, uint64_t start);

protected:
Expand All @@ -252,9 +270,12 @@ class WarmupManager {
* @return true
* @return false warmupProcess has been added
*/
virtual bool AddWarmupProcess(fuse_ino_t key, WarmupStorageType type) {
virtual bool AddWarmupProcess(fuse_ino_t key,
const std::string &path,
WarmupStorageType type) {
WriteLockGuard lock(inode2ProgressMutex_);
auto ret = inode2Progress_.emplace(key, WarmupProgress(type));
inode2Filepath_.emplace(key, path);
return ret.second;
}

Expand All @@ -272,6 +293,7 @@ class WarmupManager {
virtual void ClearWarmupProcess() {
WriteLockGuard lock(inode2ProgressMutex_);
inode2Progress_.clear();
inode2Filepath_.clear();
}

protected:
Expand All @@ -294,6 +316,8 @@ class WarmupManager {

// warmup progress
std::unordered_map<fuse_ino_t, WarmupProgress> inode2Progress_;
std::unordered_map<fuse_ino_t, std::string> inode2Filepath_;

BthreadRWLock inode2ProgressMutex_;

std::shared_ptr<KVClientManager> kvClientManager_ = nullptr;
Expand All @@ -316,6 +340,7 @@ class WarmupManagerS3Impl : public WarmupManager {
s3Adaptor_(std::move(s3Adaptor)) {}

bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type,
const std::string &path,
const std::string& mount_point,
const std::string& root) override;
bool AddWarmupFile(fuse_ino_t key, const std::string &path,
Expand Down
11 changes: 11 additions & 0 deletions curvefs/test/client/test_fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ TEST_F(TestFuseS3Client, warmUp_inodeBadFd) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");
warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -327,6 +328,7 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry01) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -387,6 +389,7 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry02) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -447,6 +450,7 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue__error_getinode) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -507,6 +511,7 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue_chunkempty) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -572,6 +577,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_TYPE_SYM_LINK) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -639,6 +645,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_error_TYPE_DIRECTORY) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -705,6 +712,7 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_multilevel) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -758,6 +766,7 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_unkown) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -817,6 +826,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_error_ListDentry) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down Expand Up @@ -907,6 +917,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) {
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk,
"",
"",
"");

warmup::WarmupProgress progress;
Expand Down
Loading

0 comments on commit c12fbac

Please sign in to comment.