Skip to content

Commit

Permalink
add warmup list functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Ken Han <[email protected]>
  • Loading branch information
ken90242 committed Aug 10, 2023
1 parent 34a9d19 commit d4dd8de
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 27 deletions.
5 changes: 4 additions & 1 deletion curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype);
const uint32_t MAX_XATTR_NAME_LENGTH = 255;
const uint32_t MAX_XATTR_VALUE_LENGTH = 64 * 1024;

const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op";
const char kCurveFsWarmupListXAttr[] = "curvefs.warmup.list";
const char kCurveFsWarmupQueryXAttr[] = "curvefs.warmup.query";
const char kCurveFsWarmupAddXAttr[] = "curvefs.warmup.add";
const char kCurveFsWarmupCancelXAttr[] = "curvefs.warmup.cancel";


constexpr int kWarmupOpNum = 6;
Expand Down
62 changes: 54 additions & 8 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
* Author: xuchaojie
*/

#include <string>
#include <memory>
#include <cstring>
#include <json/json.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -228,7 +229,7 @@ int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key, storageType,
result = g_ClientInstance->PutWarmFilelistTask(key, storageType, path,
mount_point, root);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
Expand Down Expand Up @@ -257,6 +258,27 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
VLOG(9) << "Warmup [" << key << "]" << *data;
}

void ListWarmupTasks(std::string *data) {
curvefs::client::warmup::WarmupProgress progress;
std::unordered_map<std::string, curvefs::client::warmup::WarmupProgress> filepath2progress;

bool ret = g_ClientInstance->GetAllWarmupProgress(&filepath2progress);

Json::Value filepath2warmupProgress;

for (auto it = filepath2progress.begin(); it != filepath2progress.end(); ++it) {
std::string progressStr = std::to_string(it.second().GetFinished()) + "/" + std::to_string(it.second.GetTotal());
filepath2warmupProgress[it->first()] = progressStr
VLOG(9) << "Warmup [" << it->first() << "]" << progressStr;
}
if (!ret) {
*data = "finished";
} else {
Json::StreamWriterBuilder writer;
*data = Json::writeString(writer, filepath2warmupProgress)
}
}

int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {
/*
* value[0]: WarmupOpType: add, single
Expand Down Expand Up @@ -323,10 +345,20 @@ FuseClient* Client() {
return g_ClientInstance;
}

const char* warmupXAttr = ::curvefs::client::common::kCurveFsWarmupXAttr;
const char* warmupListXAttr = ::curvefs::client::common::kCurveFsWarmupListXAttr;
const char* warmupAddXAttr = ::curvefs::client::common::kCurveFsWarmupAddXAttr;
const char* warmupQueryXAttr = ::curvefs::client::common::kCurveFsWarmupQueryXAttr;

bool IsWamupReq(const char* name) {
return strcmp(name, warmupXAttr) == 0;
bool IsWamupQueryReq(const char* name) {
return strcmp(name, warmupQueryXAttr) == 0;
}

bool IsWamupAddReq(const char* name) {
return strcmp(name, warmupAddXAttr) == 0;
}

bool IsWarmupListReq(const char* name) {
return strcmp(name, warmupListXAttr) == 0;
}

void TriggerWarmup(fuse_req_t req,
Expand All @@ -352,6 +384,17 @@ void QueryWarmup(fuse_req_t req, fuse_ino_t ino, size_t size) {
return fs->ReplyBuffer(req, data.data(), data.length());
}

void ListWarmup(fuse_req_t req, size_t size) {
auto fs = Client()->GetFileSystem();

std::string data;
ListWarmupTasks(&data);
if (size == 0) {
return fs->ReplyXattr(req, data.length());
}
return fs->ReplyBuffer(req, data.data(), data.length());
}

void ReadThrottleAdd(size_t size) { Client()->Add(true, size); }
void WriteThrottleAdd(size_t size) { Client()->Add(false, size); }

Expand Down Expand Up @@ -814,7 +857,7 @@ void FuseOpSetXattr(fuse_req_t req,
ino, name, size, flags, StrErr(rc));
});

if (IsWamupReq(name)) {
if (IsWamupAddReq(name)) {
return TriggerWarmup(req, ino, name, value, size);
}
rc = client->FuseOpSetXattr(req, ino, name, value, size, flags);
Expand All @@ -835,9 +878,12 @@ void FuseOpGetXattr(fuse_req_t req,
ino, name, size, StrErr(rc), value.size());
});

if (IsWamupReq(name)) {
if (IsWamupQueryReq(name)) {
return QueryWarmup(req, ino, size);
}
else if (IsWarmupListReq(name)) {
return ListWarmup(req, size);
}

rc = Client()->FuseOpGetXattr(req, ino, name, &value, size);
if (rc != CURVEFS_ERROR::OK) {
Expand Down
12 changes: 10 additions & 2 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,12 @@ class FuseClient {

bool PutWarmFilelistTask(fuse_ino_t key,
common::WarmupStorageType type,
const std::string &path,
const std::string &mount_point,
const std::string &root) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFilelist(key, type, mount_point,
root);
return warmupManager_->AddWarmupFilelist(
key, type, path, mount_point, root);
} // only support s3
return true;
}
Expand All @@ -331,6 +332,13 @@ class FuseClient {
return false;
}

bool GetAllWarmupProgress(std::unordered_map<std::string, warmup::WarmupProgress> *filepath2progress) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->ListWarmupProgress(filepath2progress);
}
return false;
}

CURVEFS_ERROR SetMountStatus(const struct MountOption *mountOption);

void Add(bool isRead, size_t size) { throttle_.Add(isRead, size); }
Expand Down
15 changes: 8 additions & 7 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ using curve::common::WriteLockGuard;

bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key,
WarmupStorageType type,
const std::string &path,
const std::string& mount_point,
const std::string& root) {
if (!mounted_.load(std::memory_order_acquire)) {
LOG(ERROR) << "not mounted";
return false;
}
// add warmup Progress
if (AddWarmupProcess(key, type)) {
if (AddWarmupProcess(key, path, type)) {
VLOG(9) << "add warmup list task:" << key;
WriteLockGuard lock(warmupFilelistDequeMutex_);
auto iter = FindWarmupFilelistByKeyLocked(key);
Expand All @@ -84,7 +85,7 @@ bool WarmupManagerS3Impl::AddWarmupFile(fuse_ino_t key, const std::string &path,
return false;
}
// add warmup Progress
if (AddWarmupProcess(key, type)) {
if (AddWarmupProcess(key, path, type)) {
VLOG(9) << "add warmup single task:" << key;
FetchDentryEnqueue(key, path);
}
Expand Down Expand Up @@ -347,8 +348,8 @@ void WarmupManagerS3Impl::TravelChunks(
{
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter != inode2Progress_.end()) {
iter->second.AddTotal(prefetchObjs.size());
if (iter != inode2FilepathAndProgress_.end()) {
(iter->second).second.AddTotal(prefetchObjs.size());
} else {
LOG(ERROR) << "no such warmup progress: " << key;
}
Expand Down Expand Up @@ -600,10 +601,10 @@ void WarmupManagerS3Impl::ScanCleanFetchS3ObjectsPool() {
void WarmupManagerS3Impl::ScanCleanWarmupProgress() {
// clean done warmupProgress
ReadLockGuard lock(inode2ProgressMutex_);
for (auto iter = inode2Progress_.begin(); iter != inode2Progress_.end();) {
for (auto iter = inode2FilepathAndProgress_.begin(); iter != inode2FilepathAndProgress_.end();) {
if (ProgressDone(iter->first)) {
VLOG(9) << "warmup key: " << iter->first << " done!";
iter = inode2Progress_.erase(iter);
iter = inode2FilepathAndProgress_.erase(iter);
} else {
++iter;
}
Expand Down Expand Up @@ -695,7 +696,7 @@ void WarmupManagerS3Impl::PutObjectToCache(
fuse_ino_t key, const std::shared_ptr<GetObjectAsyncContext> &context) {
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter == inode2Progress_.end()) {
if (iter == inode2FilepathAndProgress_.end()) {
VLOG(9) << "no this warmup task progress: " << key;
return;
}
Expand Down
26 changes: 19 additions & 7 deletions curvefs/src/client/warmup/warmup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class WarmupManager {
virtual void UnInit() { ClearWarmupProcess(); }

virtual bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type,
const std::string &path,
const std::string& mount_point,
const std::string& root) = 0;
virtual bool AddWarmupFile(fuse_ino_t key, const std::string &path,
Expand Down Expand Up @@ -235,14 +236,24 @@ class WarmupManager {
bool ret = true;
ReadLockGuard lock(inode2ProgressMutex_);
auto iter = FindWarmupProgressByKeyLocked(key);
if (iter != inode2Progress_.end()) {
*progress = iter->second;
if (iter != inode2FilepathAndProgress_.end()) {
*progress = (iter->second).second;
} else {
ret = false;
}
return ret;
}

bool ListWarmupProgress(std::unordered_map<std::string, WarmupProgress> *filepath2progress) {
ReadLockGuard lock(inode2ProgressMutex_);

for (auto iter = inode2FilepathAndProgress_.begin(); iter != inode2FilepathAndProgress_.end(); ++iter) {
filepath2progress.emplace((iter->second).first, WarmupProgress((iter->second).second));
}

return !inode2FilepathAndProgress_.empty();
}

void CollectMetrics(InterfaceMetric *interface, int count, uint64_t start);

protected:
Expand All @@ -252,9 +263,9 @@ class WarmupManager {
* @return true
* @return false warmupProcess has been added
*/
virtual bool AddWarmupProcess(fuse_ino_t key, WarmupStorageType type) {
virtual bool AddWarmupProcess(fuse_ino_t key, const std::string &path, WarmupStorageType type) {
WriteLockGuard lock(inode2ProgressMutex_);
auto ret = inode2Progress_.emplace(key, WarmupProgress(type));
auto ret = inode2FilepathAndProgress_.emplace(key, std::make_pair(path, WarmupProgress(type)));
return ret.second;
}

Expand All @@ -266,12 +277,12 @@ class WarmupManager {
*/
std::unordered_map<fuse_ino_t, WarmupProgress>::iterator
FindWarmupProgressByKeyLocked(fuse_ino_t key) {
return inode2Progress_.find(key);
return inode2FilepathAndProgress_.find(key);
}

virtual void ClearWarmupProcess() {
WriteLockGuard lock(inode2ProgressMutex_);
inode2Progress_.clear();
inode2FilepathAndProgress_.clear();
}

protected:
Expand All @@ -293,7 +304,7 @@ class WarmupManager {
FuseOpReadFunctionType fuseOpRead_;

// warmup progress
std::unordered_map<fuse_ino_t, WarmupProgress> inode2Progress_;
std::unordered_map<fuse_ino_t, std:pair<std::string, WarmupProgress>> inode2FilepathAndProgress_;
BthreadRWLock inode2ProgressMutex_;

std::shared_ptr<KVClientManager> kvClientManager_ = nullptr;
Expand All @@ -316,6 +327,7 @@ class WarmupManagerS3Impl : public WarmupManager {
s3Adaptor_(std::move(s3Adaptor)) {}

bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type,
const std::string &path,
const std::string& mount_point,
const std::string& root) override;
bool AddWarmupFile(fuse_ino_t key, const std::string &path,
Expand Down
3 changes: 3 additions & 0 deletions tools-v2/internal/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ var (
ErrVerifyError = func() *CmdError {
return NewInternalCmdError(73, "verify fail, err: %s")
}
ErrListWarmup = func() *CmdError {
return NewInternalCmdError(43, "list warmup progress fail, err: %s")
}

// http error
ErrHttpUnreadableResult = func() *CmdError {
Expand Down
2 changes: 1 addition & 1 deletion tools-v2/pkg/cli/command/curvefs/warmup/add/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ $ curve fs warmup add /mnt/warmup # warmup all files in /mnt/warmup`
)

const (
CURVEFS_WARMUP_OP_XATTR = "curvefs.warmup.op"
CURVEFS_WARMUP_OP_XATTR = "curvefs.warmup.add"
CURVEFS_WARMUP_OP_ADD_SINGLE = "add\nsingle\n%s\n%s"
CURVEFS_WARMUP_OP_ADD_LIST = "add\nlist\n%s\n%s"
)
Expand Down
2 changes: 1 addition & 1 deletion tools-v2/pkg/cli/command/curvefs/warmup/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
const (
queryExample = `$ curve fs warmup query /mnt/warmup `

CURVEFS_WARMUP_OP_XATTR = "curvefs.warmup.op"
CURVEFS_WARMUP_OP_XATTR = "curvefs.warmup.query"
)

type QueryCommand struct {
Expand Down

0 comments on commit d4dd8de

Please sign in to comment.