Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mds: fix clone volume missing throttle params and may start two or more tasks to clean same segment #421

Merged
merged 1 commit into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/client/io_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ class CURVE_CACHELINE_ALIGNMENT IOTracker {
// 大IO被拆分成多个request,这些request放在reqlist中国保存
std::vector<RequestContext*> reqlist_;

std::vector<SegmentIndex> discardSegments_;
// store segment indices that can be discarded
std::unordered_set<SegmentIndex> discardSegments_;

// scheduler用来将用户线程与client自己的线程切分
// 大IO被切分之后,将切分的reqlist传给scheduler向下发送
Expand Down
2 changes: 1 addition & 1 deletion src/client/splitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ bool Splitor::MarkDiscardBitmap(IOTracker* iotracker, FileSegment* fileSegment,
fileSegment->SetBitmap(offset, len);

if (fileSegment->IsAllBitSet()) {
iotracker->discardSegments_.push_back(segmentIndex);
iotracker->discardSegments_.emplace(segmentIndex);
}

return true;
Expand Down
21 changes: 16 additions & 5 deletions src/mds/nameserver2/clean_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <vector>
#include "src/mds/nameserver2/clean_manager.h"
#include "src/common/concurrent/count_down_event.h"

namespace curve {
namespace mds {
Expand Down Expand Up @@ -63,7 +64,8 @@ bool CleanManager::SubmitDeleteCommonFileJob(const FileInfo &fileInfo) {

bool CleanManager::SubmitCleanDiscardSegmentJob(
const std::string& cleanSegmentKey,
const DiscardSegmentInfo& discardSegmentInfo) {
const DiscardSegmentInfo& discardSegmentInfo,
curve::common::CountDownEvent* counter) {
// get dlock
dlockOpts_->pfx = std::to_string(discardSegmentInfo.fileinfo().id());
dlock_ = std::make_shared<DLock>(*dlockOpts_);
Expand All @@ -77,7 +79,7 @@ bool CleanManager::SubmitCleanDiscardSegmentJob(
}

auto task = std::make_shared<SegmentCleanTask>(
cleanCore_, cleanSegmentKey, discardSegmentInfo, dlock_);
cleanCore_, cleanSegmentKey, discardSegmentInfo, counter, dlock_);
task->SetTaskID(reinterpret_cast<TaskIDType>(task.get()));
return taskMgr_->PushTask(task);
}
Expand Down Expand Up @@ -155,13 +157,22 @@ void CleanDiscardSegmentTask::ScanAndExecTask() {
continue;
}

auto count = discardSegments.size();
if (count == 0) {
continue;
}

curve::common::CountDownEvent counter(count);
for (const auto& kv : discardSegments) {
if (!cleanManager_->SubmitCleanDiscardSegmentJob(kv.first,
kv.second)) {
if (!cleanManager_->SubmitCleanDiscardSegmentJob(
kv.first, kv.second, &counter)) {
LOG(ERROR) << "SubmitCleanDiscardSegmentJob failed";
continue;
counter.Signal();
}
}

// wait all submitted jobs finish
counter.Wait();
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/mds/nameserver2/clean_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class CleanManagerInterface {

virtual bool SubmitCleanDiscardSegmentJob(
const std::string& cleanSegmentKey,
const DiscardSegmentInfo& discardSegmentInfo) = 0;
const DiscardSegmentInfo& discardSegmentInfo,
curve::common::CountDownEvent* counter) = 0;
};
/**
* CleanManager 用于异步清理 删除快照对应的数据
Expand All @@ -76,7 +77,8 @@ class CleanManager : public CleanManagerInterface {

bool SubmitCleanDiscardSegmentJob(
const std::string& cleanSegmentKey,
const DiscardSegmentInfo& discardSegmentInfo) override;
const DiscardSegmentInfo& discardSegmentInfo,
curve::common::CountDownEvent* counter) override;

bool RecoverCleanTasks(void);

Expand Down
12 changes: 12 additions & 0 deletions src/mds/nameserver2/clean_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "src/mds/nameserver2/clean_core.h"
#include "src/mds/nameserver2/async_delete_snapshot_entity.h"
#include "src/common/concurrent/dlock.h"
#include "src/common/concurrent/count_down_event.h"

using curve::common::DLock;

Expand Down Expand Up @@ -169,14 +170,24 @@ class SegmentCleanTask : public Task {
SegmentCleanTask(std::shared_ptr<CleanCore> cleanCore,
const std::string& cleanSegmentKey,
const DiscardSegmentInfo& discardSegmentInfo,
curve::common::CountDownEvent* counter,
std::shared_ptr<DLock> dlock)
: Task(),
cleanCore_(cleanCore),
cleanSegmentKey_(cleanSegmentKey),
discardSegmentInfo_(discardSegmentInfo),
counter_(counter),
dlock_(dlock) {}

void Run() override {
auto finish = [](curve::common::CountDownEvent* counter) {
counter->Signal();
};

// regardless of success or failure, mark this job finished in the end
std::unique_ptr<curve::common::CountDownEvent, decltype(finish)> guard(
counter_, finish);

if (EtcdErrCode::EtcdOK != dlock_->Lock()) {
LOG(ERROR) << "Get dlock failed in SegmentCleanTask, "
<< "dlock key is " << dlock_->GetPrefix();
Expand All @@ -194,6 +205,7 @@ class SegmentCleanTask : public Task {
std::shared_ptr<CleanCore> cleanCore_;
std::string cleanSegmentKey_;
DiscardSegmentInfo discardSegmentInfo_;
curve::common::CountDownEvent* counter_;
std::shared_ptr<DLock> dlock_;
};

Expand Down
43 changes: 41 additions & 2 deletions src/mds/nameserver2/curvefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "src/mds/nameserver2/curvefs.h"
#include <glog/logging.h>
#include <google/protobuf/util/message_differencer.h>
#include <memory>
#include <chrono> //NOLINT
#include <set>
Expand Down Expand Up @@ -311,7 +312,7 @@ StatusCode CurveFS::CreateFile(const std::string & fileName,

if (filetype == FileType::INODE_PAGEFILE) {
fileInfo.set_allocated_throttleparams(
new FileThrottleParams(GenerateThrottleParams(length)));
new FileThrottleParams(GenerateDefaultThrottleParams(length)));
}

ret = PutFile(fileInfo);
Expand Down Expand Up @@ -1057,7 +1058,16 @@ StatusCode CurveFS::ExtendFile(const std::string &filename,
<< fileInfo.segmentsize();
return StatusCode::kExtentUnitError;
}

const uint64_t oldLength = fileInfo.length();
fileInfo.set_length(newLength);

if (fileInfo.has_throttleparams() &&
IsDefaultThrottleParams(fileInfo.throttleparams(), oldLength)) {
fileInfo.set_allocated_throttleparams(new FileThrottleParams(
GenerateDefaultThrottleParams(newLength)));
}

return PutFile(fileInfo);
}
}
Expand Down Expand Up @@ -1754,6 +1764,9 @@ StatusCode CurveFS::CreateCloneFile(const std::string &fileName,
fileInfo.set_stripeunit(stripeUnit);
fileInfo.set_stripecount(stripeCount);

fileInfo.set_allocated_throttleparams(
new FileThrottleParams(GenerateDefaultThrottleParams(length)));

ret = PutFile(fileInfo);
if (ret == StatusCode::kOK && retFileInfo != nullptr) {
*retFileInfo = fileInfo;
Expand Down Expand Up @@ -2420,7 +2433,8 @@ StatusCode CurveFS::CheckStripeParam(uint64_t stripeUnit,
return StatusCode::kOK;
}

FileThrottleParams CurveFS::GenerateThrottleParams(uint64_t length) const {
FileThrottleParams CurveFS::GenerateDefaultThrottleParams(
uint64_t length) const {
FileThrottleParams params;

ThrottleParams iopsTotal;
Expand All @@ -2445,6 +2459,31 @@ FileThrottleParams CurveFS::GenerateThrottleParams(uint64_t length) const {
return params;
}

bool CurveFS::IsDefaultThrottleParams(const FileThrottleParams &params,
uint64_t length) const {
auto defaultParams = GenerateDefaultThrottleParams(length);
const auto defaultSize = defaultParams.throttleparams_size();
const auto paramsSize = params.throttleparams_size();

if (defaultSize != paramsSize) {
return false;
}

for (int i = 0; i < defaultSize; ++i) {
for (int j = 0; j < paramsSize; ++j) {
const auto& d = defaultParams.throttleparams()[i];
const auto& p = params.throttleparams()[j];

if (d.type() == p.type() &&
!::google::protobuf::util::MessageDifferencer::Equals(d, p)) {
return false;
}
}
}

return true;
}

CurveFS &kCurveFS = CurveFS::GetInstance();

uint64_t GetOpenFileNum(void *varg) {
Expand Down
5 changes: 4 additions & 1 deletion src/mds/nameserver2/curvefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,10 @@ class CurveFS {
StatusCode CheckStripeParam(uint64_t stripeUnit,
uint64_t stripeCount);

FileThrottleParams GenerateThrottleParams(uint64_t length) const;
FileThrottleParams GenerateDefaultThrottleParams(uint64_t length) const;

bool IsDefaultThrottleParams(const FileThrottleParams &params,
uint64_t length) const;

private:
FileInfo rootFileInfo_;
Expand Down
4 changes: 2 additions & 2 deletions src/mds/nameserver2/metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ void NameserverCacheMetrics::UpdateRemoveFromCacheBytes(uint64_t size) {
cacheBytes << (0 - size);
}

void SegmentDiscardMetric::OnReceiveDiscardRequest(uint64_t size) {
void SegmentDiscardMetric::OnReceiveDiscardRequest(int64_t size) {
pendingSegments_ << 1;
pendingSize_ << size;
}

void SegmentDiscardMetric::OnDiscardFinish(uint64_t size) {
void SegmentDiscardMetric::OnDiscardFinish(int64_t size) {
pendingSegments_ << -1;
pendingSize_ << -size;

Expand Down
12 changes: 6 additions & 6 deletions src/mds/nameserver2/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ class SegmentDiscardMetric {
public:
SegmentDiscardMetric()
: prefix_("mds_nameserver_discard"),
totalCleanedSegments_(prefix_ + "total_cleaned_segment_count"),
pendingSegments_(prefix_ + "pending_segment_count"),
totalCleanedSize_(prefix_ + "total_cleaned_size"),
pendingSize_(prefix_ + "pending_size") {}
totalCleanedSegments_(prefix_ + "_total_cleaned_segment_count"),
pendingSegments_(prefix_ + "_pending_segment_count"),
totalCleanedSize_(prefix_ + "_total_cleaned_size"),
pendingSize_(prefix_ + "_pending_size") {}

~SegmentDiscardMetric() = default;

void OnReceiveDiscardRequest(uint64_t size);
void OnDiscardFinish(uint64_t size);
void OnReceiveDiscardRequest(int64_t size);
void OnDiscardFinish(int64_t size);

public:
const std::string prefix_;
Expand Down
17 changes: 0 additions & 17 deletions src/mds/topology/topology_chunk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,3 @@ bool AllocateChunkPolicy::ChooseSingleLogicalPoolRandom(
} // namespace topology
} // namespace mds
} // namespace curve

















14 changes: 10 additions & 4 deletions test/mds/nameserver2/clean_discard_segment_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using ::testing::_;
using ::testing::DoAll;
using ::testing::Return;
using ::testing::SetArgPointee;
using ::testing::Invoke;

class CleanDiscardSegmentTaskTest : public ::testing::Test {
public:
Expand Down Expand Up @@ -82,8 +83,13 @@ TEST_F(CleanDiscardSegmentTaskTest, CommonTest) {
DoAll(SetArgPointee<0>(discardSegments),
Return(StoreStatus::OK)));

EXPECT_CALL(*cleanManager_, SubmitCleanDiscardSegmentJob(_, _))
.WillRepeatedly(Return(true));
EXPECT_CALL(*cleanManager_, SubmitCleanDiscardSegmentJob(_, _, _))
.WillRepeatedly(
Invoke([](const std::string& key, const DiscardSegmentInfo& info,
::curve::common::CountDownEvent* counter) {
counter->Signal();
return true;
}));

ASSERT_TRUE(task.Start());
ASSERT_FALSE(task.Start());
Expand All @@ -103,7 +109,7 @@ TEST_F(CleanDiscardSegmentTaskTest, TestListDiscardSegmentFailed) {
.WillRepeatedly(
Return(StoreStatus::InternalError));

EXPECT_CALL(*cleanManager_, SubmitCleanDiscardSegmentJob(_, _))
EXPECT_CALL(*cleanManager_, SubmitCleanDiscardSegmentJob(_, _, _))
.Times(0);

task.Start();
Expand All @@ -123,7 +129,7 @@ TEST_F(CleanDiscardSegmentTaskTest, TestSubmitJobFailed) {
DoAll(SetArgPointee<0>(discardSegments),
Return(StoreStatus::OK)));

EXPECT_CALL(*cleanManager_, SubmitCleanDiscardSegmentJob(_, _))
EXPECT_CALL(*cleanManager_, SubmitCleanDiscardSegmentJob(_, _, _))
.WillRepeatedly(Return(false));

task.Start();
Expand Down
Loading