Skip to content

Commit

Permalink
fix balance plan that make redundant part
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Jan 11, 2022
1 parent 2e366a8 commit 8d9109e
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 35 deletions.
30 changes: 30 additions & 0 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,36 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::save(const std::string& k, const std
return rc;
}

void BalanceJobExecutor::insertOneTask(
const BalanceTask& task, std::map<PartitionID, std::vector<BalanceTask>>* existTasks) {
std::vector<BalanceTask>& taskVec = existTasks->operator[](task.getPartId());
if (taskVec.empty()) {
taskVec.emplace_back(task);
} else {
for (auto it = taskVec.begin(); it != taskVec.end(); it++) {
if (task.getDstHost() == it->getSrcHost() && task.getSrcHost() == it->getDstHost()) {
taskVec.erase(it);
return;
} else if (task.getDstHost() == it->getSrcHost()) {
BalanceTask newTask(task);
newTask.setDstHost(it->getDstHost());
taskVec.erase(it);
insertOneTask(newTask, existTasks);
return;
} else if (task.getSrcHost() == it->getDstHost()) {
BalanceTask newTask(task);
newTask.setSrcHost(it->getSrcHost());
taskVec.erase(it);
insertOneTask(newTask, existTasks);
return;
} else {
continue;
}
}
taskVec.emplace_back(task);
}
}

nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVStore* kvstore) {
spaceId_ = spaceId;
std::string spaceKey = MetaKeyUtils::spaceKey(spaceId);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class BalanceJobExecutor : public MetaJobExecutor {
return Status::OK();
}

void insertOneTask(const BalanceTask& task,
std::map<PartitionID, std::vector<BalanceTask>>* existTasks);

protected:
std::unique_ptr<BalancePlan> plan_;
std::unique_ptr<folly::Executor> executor_;
Expand Down
5 changes: 5 additions & 0 deletions src/meta/processors/job/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class BalancePlan {

void setFinishCallBack(std::function<void(meta::cpp2::JobStatus)> func);

template <typename InputIterator>
void insertTask(InputIterator first, InputIterator last) {
tasks_.insert(tasks_.end(), first, last);
}

private:
JobDescription jobDescription_;
kvstore::KVStore* kv_ = nullptr;
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/job/BalanceTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class BalanceTask {
FRIEND_TEST(BalanceTest, TryToRecoveryTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, StopPlanTest);
FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest);

public:
BalanceTask() = default;
Expand Down Expand Up @@ -68,6 +69,26 @@ class BalanceTask {
return ret_;
}

const HostAddr& getSrcHost() const {
return src_;
}

const HostAddr& getDstHost() const {
return dst_;
}

void setSrcHost(const HostAddr& host) {
src_ = host;
}

void setDstHost(const HostAddr& host) {
dst_ = host;
}

PartitionID getPartId() const {
return partId_;
}

private:
std::string buildTaskId() {
return folly::stringPrintf("%d, %d:%d", jobId_, spaceId_, partId_);
Expand Down
55 changes: 31 additions & 24 deletions src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
return l->parts_.size() < r->parts_.size();
});
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
std::map<PartitionID, std::vector<BalanceTask>> existTasks;
// move parts of lost hosts to active hosts in the same zone
for (auto& zoneHostEntry : lostZoneHost) {
const std::string& zoneName = zoneHostEntry.first;
Expand All @@ -76,13 +76,13 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
for (PartitionID partId : host->parts_) {
Host* dstHost = activeVec.front();
dstHost->parts_.insert(partId);
plan_->addTask(BalanceTask(jobId_,
spaceInfo_.spaceId_,
partId,
host->host_,
dstHost->host_,
kvstore_,
adminClient_));
existTasks[partId].emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
host->host_,
dstHost->host_,
kvstore_,
adminClient_);
for (size_t i = 0; i < activeVec.size() - 1; i++) {
if (activeVec[i]->parts_.size() > activeVec[i + 1]->parts_.size()) {
std::swap(activeVec[i], activeVec[i + 1]);
Expand All @@ -96,23 +96,22 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
}
lostZoneHost.clear();
// rebalance for hosts in a zone
auto balanceHostVec = [this](std::vector<Host*>& hostVec) -> std::vector<BalanceTask> {
auto balanceHostVec = [this, &existTasks](std::vector<Host*>& hostVec) {
size_t totalPartNum = 0;
size_t avgPartNum = 0;
for (Host* h : hostVec) {
totalPartNum += h->parts_.size();
}
if (hostVec.empty()) {
LOG(ERROR) << "rebalance error: zone has no host";
return {};
return;
}
avgPartNum = totalPartNum / hostVec.size();
size_t remainder = totalPartNum - avgPartNum * hostVec.size();
size_t leftBegin = 0;
size_t leftEnd = 0;
size_t rightBegin = 0;
size_t rightEnd = hostVec.size();
std::vector<BalanceTask> tasks;
for (size_t i = 0; i < hostVec.size(); i++) {
if (avgPartNum <= hostVec[i]->parts_.size()) {
leftEnd = i;
Expand All @@ -139,13 +138,14 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
PartitionID partId = *(srcHost->parts_.begin());
hostVec[leftBegin]->parts_.insert(partId);
srcHost->parts_.erase(partId);
tasks.emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
srcHost->host_,
hostVec[leftBegin]->host_,
kvstore_,
adminClient_);
insertOneTask(BalanceTask(jobId_,
spaceInfo_.spaceId_,
partId,
srcHost->host_,
hostVec[leftBegin]->host_,
kvstore_,
adminClient_),
&existTasks);
size_t leftIndex = leftBegin;
for (; leftIndex < leftEnd - 1; leftIndex++) {
if (hostVec[leftIndex]->parts_.size() > hostVec[leftIndex + 1]->parts_.size()) {
Expand All @@ -161,18 +161,25 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
leftEnd = rightBegin;
}
}
return tasks;
};
for (auto& pair : activeSortedHost) {
std::vector<Host*>& hvec = pair.second;
std::vector<BalanceTask> tasks = balanceHostVec(hvec);
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
balanceHostVec(hvec);
}
if (plan_->tasks().empty()) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
plan_->insertTask(p.second.begin(), p.second.end());
});
nebula::cpp2::ErrorCode rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
Expand Down
29 changes: 19 additions & 10 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ HostAddr ZoneBalanceJobExecutor::insertPartIntoZone(
nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks) {
std::map<PartitionID, std::vector<BalanceTask>>* existTasks) {
std::vector<Zone*>& sortedActiveZonesRef = *sortedActiveZones;
std::map<std::string, std::vector<Host*>>& sortedZoneHostsRef = *sortedZoneHosts;
int32_t totalPartNum = 0;
Expand Down Expand Up @@ -147,8 +147,9 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) {
if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) {
HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId);
tasks->emplace_back(
jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_);
insertOneTask(
BalanceTask(jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_),
existTasks);
int32_t newLeftIndex = leftIndex;
for (; newLeftIndex < leftEnd - 1; newLeftIndex++) {
if (sortedActiveZonesRef[newLeftIndex]->partNum_ >
Expand Down Expand Up @@ -242,7 +243,6 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
if (activeSize < spaceInfo_.replica_) {
return Status::Error("Not enough alive zones to hold replica");
}
std::vector<BalanceTask> tasks;

std::vector<Zone*> sortedActiveZones;
sortedActiveZones.reserve(activeZones.size());
Expand Down Expand Up @@ -285,6 +285,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
return ha;
};

std::map<PartitionID, std::vector<BalanceTask>> existTasks;
// move parts of lost zones to active zones
for (auto& zoneMapEntry : lostZones) {
Zone* zone = zoneMapEntry.second;
Expand All @@ -293,7 +294,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
Host& host = hostMapEntry.second;
for (PartitionID partId : host.parts_) {
HostAddr dst = chooseZoneToInsert(partId);
tasks.emplace_back(
existTasks[partId].emplace_back(
jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_);
}
host.parts_.clear();
Expand All @@ -302,15 +303,23 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
}

// all parts of lost zones have moved to active zones, then rebalance the active zones
nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks);
nebula::cpp2::ErrorCode rc =
rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks);

if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
plan_->insertTask(p.second.begin(), p.second.end());
});
rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
Expand Down
5 changes: 4 additions & 1 deletion src/meta/processors/job/ZoneBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
FRIEND_TEST(BalanceTest, BalanceZoneRemainderPlanTest);
FRIEND_TEST(BalanceTest, NormalZoneTest);
FRIEND_TEST(BalanceTest, StopPlanTest);
FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest);
FRIEND_TEST(BalanceTest, NormalZoneComplexTest);

public:
ZoneBalanceJobExecutor(JobDescription jobDescription,
Expand All @@ -25,6 +27,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
const std::vector<std::string>& params)
: BalanceJobExecutor(jobDescription.getJobId(), kvstore, adminClient, params),
jobDescription_(jobDescription) {}

nebula::cpp2::ErrorCode prepare() override;
nebula::cpp2::ErrorCode stop() override;

Expand All @@ -38,7 +41,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
nebula::cpp2::ErrorCode rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks);
std::map<PartitionID, std::vector<BalanceTask>>* existTasks);

private:
std::vector<std::string> lostZones_;
Expand Down
67 changes: 67 additions & 0 deletions src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,45 @@ TEST(BalanceTest, BalanceZonePlanTest) {
EXPECT_EQ(balancer.spaceInfo_.zones_["zone5"].partNum_, 9);
}

TEST(BalanceTest, BalanceZonePlanComplexTest) {
fs::TempDir rootPath("/tmp/BalanceZoneTest.XXXXXX");
std::unique_ptr<kvstore::NebulaStore> store = MockCluster::initMetaKV(rootPath.path());
std::pair<std::string, std::vector<std::pair<HostAddr, std::vector<PartitionID>>>> pair1(
"z1", {{{"127.0.0.1", 11}, {}}});
std::pair<std::string, std::vector<std::pair<HostAddr, std::vector<PartitionID>>>> pair2(
"z2", {{{"127.0.0.1", 12}, {}}});
std::pair<std::string, std::vector<std::pair<HostAddr, std::vector<PartitionID>>>> pair3(
"z3", {{{"127.0.0.1", 13}, {}}});
std::pair<std::string, std::vector<std::pair<HostAddr, std::vector<PartitionID>>>> pair4(
"z4", {{{"127.0.0.1", 14}, {}}});
std::pair<std::string, std::vector<std::pair<HostAddr, std::vector<PartitionID>>>> pair5(
"z5", {{{"127.0.0.1", 15}, {}}});
for (int32_t i = 1; i <= 512; i++) {
pair1.second.front().second.push_back(i);
pair2.second.front().second.push_back(i);
pair3.second.front().second.push_back(i);
}
SpaceInfo spaceInfo = createSpaceInfo("space1", 1, 3, {pair1, pair2, pair3, pair4, pair5});
ZoneBalanceJobExecutor balancer(JobDescription(), store.get(), nullptr, {});
balancer.spaceInfo_ = spaceInfo;
Status status = balancer.buildBalancePlan();
EXPECT_EQ(status, Status::OK());
EXPECT_EQ(balancer.spaceInfo_.zones_["z1"].partNum_, 308);
EXPECT_EQ(balancer.spaceInfo_.zones_["z2"].partNum_, 307);
EXPECT_EQ(balancer.spaceInfo_.zones_["z3"].partNum_, 307);
EXPECT_EQ(balancer.spaceInfo_.zones_["z4"].partNum_, 307);
EXPECT_EQ(balancer.spaceInfo_.zones_["z5"].partNum_, 307);
balancer.lostZones_ = {"z1"};
status = balancer.buildBalancePlan();
EXPECT_EQ(status, Status::OK());
EXPECT_EQ(balancer.plan_->tasks().size(), 389);
auto tasks = balancer.plan_->tasks();
EXPECT_EQ(balancer.spaceInfo_.zones_["z2"].partNum_, 384);
EXPECT_EQ(balancer.spaceInfo_.zones_["z3"].partNum_, 384);
EXPECT_EQ(balancer.spaceInfo_.zones_["z4"].partNum_, 384);
EXPECT_EQ(balancer.spaceInfo_.zones_["z5"].partNum_, 384);
}

TEST(BalanceTest, BalanceZoneRemainderPlanTest) {
fs::TempDir rootPath("/tmp/BalanceZoneTest.XXXXXX");
std::unique_ptr<kvstore::NebulaStore> store = MockCluster::initMetaKV(rootPath.path());
Expand Down Expand Up @@ -668,6 +707,34 @@ void verifyMetaZone(kvstore::KVStore* kv,
EXPECT_EQ(zoneSet, expectZones);
}

void verifyZonePartNum(kvstore::KVStore* kv,
GraphSpaceID spaceId,
const std::map<std::string, int32_t>& zones) {
std::map<HostAddr, std::string> hostZone;
std::map<std::string, int32_t> zoneNum;
std::unique_ptr<kvstore::KVIterator> iter;
for (const auto& [zone, num] : zones) {
std::string zoneKey = MetaKeyUtils::zoneKey(zone);
std::string value;
kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &value);
auto hosts = MetaKeyUtils::parseZoneHosts(value);
for (auto& host : hosts) {
hostZone[host] = zone;
}
zoneNum[zone] = 0;
}
const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId);
kv->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter);
while (iter->valid()) {
auto hosts = MetaKeyUtils::parsePartVal(iter->val());
for (auto& host : hosts) {
zoneNum[hostZone[host]]++;
}
iter->next();
}
EXPECT_EQ(zoneNum, zones);
}

JobDescription makeJobDescription(kvstore::KVStore* kv, cpp2::AdminCmd cmd) {
JobDescription jd(testJobId.fetch_add(1, std::memory_order_relaxed), cmd, {});
std::vector<nebula::kvstore::KV> data;
Expand Down
Loading

0 comments on commit 8d9109e

Please sign in to comment.