Skip to content

Commit

Permalink
[feat] curvefs/client: warmup list and stop
Browse files Browse the repository at this point in the history
Signed-off-by: ken90242 <[email protected]>
  • Loading branch information
ken90242 committed Aug 7, 2023
1 parent 76bfade commit 2201c4c
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 43 deletions.
2 changes: 1 addition & 1 deletion curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const uint32_t MAX_XATTR_VALUE_LENGTH = 64 * 1024;
const char kCurveFsWarmupXAttr[] = "curvefs.warmup.op";


constexpr int kWarmupOpNum = 4;
constexpr int kWarmupOpNum = 6;

enum class WarmupOpType {
kWarmupOpUnknown = 0,
Expand Down
23 changes: 17 additions & 6 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,15 @@ void UnInitFuseClient() {
}

int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
const std::string &path,
curvefs::client::common::WarmupStorageType storageType) {
const std::string& path,
curvefs::client::common::WarmupStorageType storageType,
const std::string& mount_point, const std::string& root) {
int ret = 0;
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key, storageType);
result = g_ClientInstance->PutWarmFilelistTask(key, storageType,
mount_point, root);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
result = g_ClientInstance->PutWarmFileTask(key, path, storageType);
Expand Down Expand Up @@ -256,6 +258,15 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
}

int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {
/*
* value[0]: WarmupOpType: add, single
* value[1]: WarmupType: single, list
* value[2]: CurvefsPath
* value[3]: StorageType
* value[4]: MountPoint
* value[5]: Root
*/

// warmup
if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "warmup only support s3";
Expand All @@ -278,9 +289,9 @@ int Warmup(fuse_ino_t key, const std::string& name, const std::string& value) {
int ret = 0;
switch (curvefs::client::common::GetWarmupOpType(opTypePath[0])) {
case curvefs::client::common::WarmupOpType::kWarmupOpAdd:
ret =
AddWarmupTask(curvefs::client::common::GetWarmupType(opTypePath[1]),
key, opTypePath[2], storageType);
ret = AddWarmupTask(
curvefs::client::common::GetWarmupType(opTypePath[1]), key,
opTypePath[2], storageType, opTypePath[4], opTypePath[5]);
if (ret != 0) {
LOG(ERROR) << name << " has invalid xattr value " << value;
}
Expand Down
8 changes: 6 additions & 2 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,13 @@ class FuseClient {
enableSumInDir_ = enable;
}

bool PutWarmFilelistTask(fuse_ino_t key, common::WarmupStorageType type) {
bool PutWarmFilelistTask(fuse_ino_t key,
common::WarmupStorageType type,
const std::string &mount_point,
const std::string &root) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFilelist(key, type);
return warmupManager_->AddWarmupFilelist(key, type, mount_point,
root);
} // only support s3
return true;
}
Expand Down
20 changes: 17 additions & 3 deletions curvefs/src/client/warmup/warmup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ using curve::common::WriteLockGuard;
#define WARMUP_CHECKINTERVAL_US (1000 * 1000)

bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key,
WarmupStorageType type) {
WarmupStorageType type,
const std::string& mount_point,
const std::string& root) {
if (!mounted_.load(std::memory_order_acquire)) {
LOG(ERROR) << "not mounted";
return false;
Expand All @@ -69,7 +71,7 @@ bool WarmupManagerS3Impl::AddWarmupFilelist(fuse_ino_t key,
return false;
}
uint64_t len = inodeWrapper->GetLength();
warmupFilelistDeque_.emplace_back(key, len);
warmupFilelistDeque_.emplace_back(key, len, mount_point, root);
}
} // Skip already added
return true;
Expand Down Expand Up @@ -628,11 +630,23 @@ void WarmupManagerS3Impl::ScanWarmupFilelist() {
if (!warmupFilelistDeque_.empty()) {
WarmupFilelist warmupFilelist = warmupFilelistDeque_.front();
VLOG(9) << "warmup ino: " << warmupFilelist.GetKey()
<< " len is: " << warmupFilelist.GetFileLen();
<< " len is: " << warmupFilelist.GetFileLen()
<< " mount point is: " << warmupFilelist.GetMountPoint()
<< " fs root is: " << warmupFilelist.GetRoot();

std::vector<std::string> warmuplist;
GetWarmupList(warmupFilelist, &warmuplist);

for (auto filePath : warmuplist) {
size_t found = filePath.find(warmupFilelist.GetMountPoint());

if (found != std::string::npos) {
filePath.replace(
found,
warmupFilelist.GetMountPoint().length(),
warmupFilelist.GetRoot());
}

FetchDentryEnqueue(warmupFilelist.GetKey(), filePath);
}
warmupFilelistDeque_.pop_front();
Expand Down
17 changes: 13 additions & 4 deletions curvefs/src/client/warmup/warmup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,23 @@ using curvefs::client::common::WarmupStorageType;

class WarmupFile {
public:
explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0)
: key_(key), fileLen_(fileLen) {}
explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0,
const std::string& mountPoint = "", const std::string& root = "")
: key_(key), fileLen_(fileLen), mountPoint_(mountPoint), root_(root) {}

fuse_ino_t GetKey() const { return key_; }
uint64_t GetFileLen() const { return fileLen_; }
std::string GetMountPoint() const { return mountPoint_; }
std::string GetRoot() const { return root_; }
bool operator==(const WarmupFile &other) const {
return key_ == other.key_;
}

private:
fuse_ino_t key_;
uint64_t fileLen_;
std::string mountPoint_;
std::string root_;
};

using WarmupFilelist = WarmupFile;
Expand Down Expand Up @@ -188,7 +193,9 @@ class WarmupManager {
}
virtual void UnInit() { ClearWarmupProcess(); }

virtual bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type) = 0;
virtual bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type,
const std::string& mount_point,
const std::string& root) = 0;
virtual bool AddWarmupFile(fuse_ino_t key, const std::string &path,
WarmupStorageType type) = 0;

Expand Down Expand Up @@ -298,7 +305,9 @@ class WarmupManagerS3Impl : public WarmupManager {
std::move(readFunc), std::move(kvClientManager)),
s3Adaptor_(std::move(s3Adaptor)) {}

bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type) override;
bool AddWarmupFilelist(fuse_ino_t key, WarmupStorageType type,
const std::string& mount_point,
const std::string& root) override;
bool AddWarmupFile(fuse_ino_t key, const std::string &path,
WarmupStorageType type) override;

Expand Down
22 changes: 11 additions & 11 deletions curvefs/test/client/test_fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ TEST_F(TestFuseS3Client, warmUp_inodeBadFd) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");
warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
LOG(INFO) << "ret:" << ret << " Warmup progress: " << progress.ToString();
Expand Down Expand Up @@ -323,7 +323,7 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry01) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -381,7 +381,7 @@ TEST_F(TestFuseS3Client, warmUp_Warmfile_error_GetDentry02) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -439,7 +439,7 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue__error_getinode) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -497,7 +497,7 @@ TEST_F(TestFuseS3Client, warmUp_fetchDataEnqueue_chunkempty) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -560,7 +560,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_TYPE_SYM_LINK) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -625,7 +625,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchDentry_error_TYPE_DIRECTORY) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -689,7 +689,7 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_multilevel) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -740,7 +740,7 @@ TEST_F(TestFuseS3Client, warmUp_lookpath_unkown) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -797,7 +797,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_error_ListDentry) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down Expand Up @@ -885,7 +885,7 @@ TEST_F(TestFuseS3Client, warmUp_FetchChildDentry_suc_ListDentry) {
client_->GetFsInfo()->set_fstype(FSType::TYPE_S3);
client_->PutWarmFilelistTask(
inodeid,
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk);
curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk, "", "");

warmup::WarmupProgress progress;
bool ret = client_->GetWarmupProgress(inodeid, &progress);
Expand Down
27 changes: 11 additions & 16 deletions tools-v2/pkg/cli/command/curvefs/warmup/add/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (aCmd *AddCommand) Print(cmd *cobra.Command, args []string) error {
return output.FinalCmdOutput(&aCmd.FinalCurveCmd, aCmd)
}

func (aCmd *AddCommand) convertFilelist() *cmderror.CmdError {
func (aCmd *AddCommand) verifyFilelist() *cmderror.CmdError {
data, err := ioutil.ReadFile(aCmd.Path)
if err != nil {
readErr := cmderror.ErrReadFile()
Expand All @@ -170,36 +170,31 @@ func (aCmd *AddCommand) convertFilelist() *cmderror.CmdError {
}

lines := strings.Split(string(data), "\n")
validPath := ""

for _, line := range lines {
rel, err := filepath.Rel(aCmd.Mountpoint.MountPoint, line)
if err == nil && !strings.HasPrefix(rel, "..") {
// convert to curvefs path
curvefsAbspath := cobrautil.Path2CurvefsPath(line, aCmd.Mountpoint)
validPath += (curvefsAbspath + "\n")
} else {
convertFail := fmt.Sprintf("[%s] is not saved in curvefs", line)
aCmd.ConvertFails = append(aCmd.ConvertFails, convertFail)
if err != nil || strings.HasPrefix(rel, "..") {
convertFailMsg := fmt.Sprintf("[%s:%s] is not saved in curvefs", aCmd.Path, line)
convertErr := cmderror.ErrConverResult()
convertErr.Format(convertFailMsg, err.Error())
break
}
}
if err = ioutil.WriteFile(aCmd.Path, []byte(validPath), 0644); err != nil {
writeErr := cmderror.ErrWriteFile()
writeErr.Format(aCmd.Path, err.Error())
}

return cmderror.ErrSuccess()
}

func (aCmd *AddCommand) RunCommand(cmd *cobra.Command, args []string) error {
xattr := CURVEFS_WARMUP_OP_ADD_SINGLE
if !aCmd.Single {
convertErr := aCmd.convertFilelist()
convertErr := aCmd.verifyFilelist()
if convertErr.TypeCode() != cmderror.CODE_SUCCESS {
return convertErr.ToError()
}
xattr = CURVEFS_WARMUP_OP_ADD_LIST
}
value := fmt.Sprintf(xattr, aCmd.CurvefsPath, aCmd.StorageType)
err := unix.Setxattr(aCmd.Path, CURVEFS_WARMUP_OP_XATTR, []byte(value), 0)
values := fmt.Sprintf(xattr, aCmd.CurvefsPath, aCmd.StorageType, aCmd.Mountpoint.MountPoint, aCmd.Mountpoint.Root)
err := unix.Setxattr(aCmd.Path, CURVEFS_WARMUP_OP_XATTR, []byte(values), 0)
if err == unix.ENOTSUP || err == unix.EOPNOTSUPP {
return fmt.Errorf("filesystem does not support extended attributes")
} else if err != nil {
Expand Down

0 comments on commit 2201c4c

Please sign in to comment.