Skip to content

Commit

Permalink
issue_3170: drop space after stop all AdminTaskJobs (#3406)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nivras authored Dec 15, 2021
1 parent df3781c commit 9fbe164
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 14 deletions.
4 changes: 4 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,

void NebulaStore::removeSpace(GraphSpaceID spaceId, bool isListener) {
folly::RWSpinLock::WriteHolder wh(&lock_);
if (beforeRemoveSpace_) {
beforeRemoveSpace_(spaceId);
}

if (!isListener) {
auto spaceIt = this->spaces_.find(spaceId);
if (spaceIt != this->spaces_.end()) {
Expand Down
7 changes: 7 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ class NebulaStore : public KVStore, public Handler {

void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); }

void registerBeforeRemoveSpace(std::function<void(GraphSpaceID)> func) {
beforeRemoveSpace_ = func;
}

void unregisterBeforeRemoveSpace() { beforeRemoveSpace_ = nullptr; }

private:
void loadPartFromDataPath();

Expand Down Expand Up @@ -343,6 +349,7 @@ class NebulaStore : public KVStore, public Handler {
std::shared_ptr<DiskManager> diskMan_;
folly::ConcurrentHashMap<std::string, std::function<void(std::shared_ptr<Part>&)>>
onNewPartAdded_;
std::function<void(GraphSpaceID)> beforeRemoveSpace_{nullptr};
};

} // namespace kvstore
Expand Down
10 changes: 10 additions & 0 deletions src/storage/admin/AdminTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class AdminTask {
ctx_.jobId_,
ctx_.taskId_,
apache::thrift::util::enumNameSafe(rc).c_str());
running_ = false;
nebula::meta::cpp2::StatsItem statsItem;
ctx_.onFinish_(rc, statsItem);
}
Expand All @@ -85,6 +86,8 @@ class AdminTask {

virtual int getTaskId() { return ctx_.taskId_; }

virtual GraphSpaceID getSpaceId() { return ctx_.parameters_.get_space_id(); }

virtual void setConcurrentReq(int concurrentReq) {
if (concurrentReq > 0) {
ctx_.concurrentReq_ = concurrentReq;
Expand All @@ -102,20 +105,27 @@ class AdminTask {

virtual void cancel() {
FLOG_INFO("task(%d, %d) cancelled", ctx_.jobId_, ctx_.taskId_);
canceled_ = true;
auto suc = nebula::cpp2::ErrorCode::SUCCEEDED;
rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL);
}

virtual bool isRunning() { return running_; }

virtual bool isCanceled() { return canceled_; }

meta::cpp2::AdminCmd cmdType() { return ctx_.cmd_; }

public:
std::atomic<size_t> unFinishedSubTask_;
SubTaskQueue subtasks_;
std::atomic<bool> running_{false};

protected:
StorageEnv* env_;
TaskContext ctx_;
std::atomic<nebula::cpp2::ErrorCode> rc_{nebula::cpp2::ErrorCode::SUCCEEDED};
std::atomic<bool> canceled_{false};
};

class AdminTaskFactory {
Expand Down
52 changes: 51 additions & 1 deletion src/storage/admin/AdminTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ bool AdminTaskManager::init() {
auto threadFactory = std::make_shared<folly::NamedThreadFactory>("TaskManager");
pool_ = std::make_unique<ThreadPool>(FLAGS_max_concurrent_subtasks, threadFactory);
bgThread_ = std::make_unique<thread::GenericWorker>();
if (env_ != nullptr) {
static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_)
->registerBeforeRemoveSpace(
[this](GraphSpaceID spaceId) { this->waitCancelTasks(spaceId); });
}
if (!bgThread_->start()) {
LOG(ERROR) << "background thread start failed";
return false;
Expand Down Expand Up @@ -112,7 +117,12 @@ void AdminTaskManager::handleUnreportedTasks() {
jobId,
taskId,
fut.value().status().toString());
ifAnyUnreported_ = true;
if (fut.value().status() == Status::Error("Space not existed!")) {
// space has been droped, remove the task status.
keys.emplace_back(key.data(), key.size());
} else {
ifAnyUnreported_ = true;
}
continue;
}
rc = fut.value().value();
Expand Down Expand Up @@ -249,6 +259,14 @@ void AdminTaskManager::schedule() {
}

auto task = it->second;
if (task->isCanceled()) {
LOG(INFO) << folly::sformat("job {} has been calceled", task->getJobId());
task->finish(nebula::cpp2::ErrorCode::E_USER_CANCEL);
tasks_.erase(handle);
continue;
}

task->running_ = true;
auto errOrSubTasks = task->genSubTasks();
if (!nebula::ok(errOrSubTasks)) {
LOG(ERROR) << folly::sformat(
Expand Down Expand Up @@ -353,5 +371,37 @@ bool AdminTaskManager::isFinished(JobID jobID, TaskID taskID) {
return iter->second->unFinishedSubTask_ == 0;
}

void AdminTaskManager::cancelTasks(GraphSpaceID spaceId) {
auto it = tasks_.begin();
while (it != tasks_.end()) {
if (it->second->getSpaceId() == spaceId) {
it->second->cancel();
removeTaskStatus(it->second->getJobId(), it->second->getTaskId());
FLOG_INFO("cancel task(%d, %d), spaceId: %d", it->first.first, it->first.second, spaceId);
}
++it;
}
}

int32_t AdminTaskManager::runningTaskCnt(GraphSpaceID spaceId) {
int32_t jobCnt = 0;
for (const auto& task : tasks_) {
auto taskSpaceId = task.second->getSpaceId();
if (taskSpaceId == spaceId) {
if (task.second->isRunning()) {
jobCnt++;
}
}
}
return jobCnt;
}

void AdminTaskManager::waitCancelTasks(GraphSpaceID spaceId) {
cancelTasks(spaceId);
while (runningTaskCnt(spaceId) != 0) {
usleep(1000 * 100);
}
}

} // namespace storage
} // namespace nebula
14 changes: 14 additions & 0 deletions src/storage/admin/AdminTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
#include <gtest/gtest_prod.h>

#include "clients/meta/MetaClient.h"
#include "common/base/Base.h"
#include "interface/gen-cpp2/storage_types.h"
#include "kvstore/NebulaStore.h"
Expand All @@ -37,6 +38,12 @@ class AdminTaskManager {
return &sAdminTaskManager;
}

~AdminTaskManager() {
if (metaClient_ != nullptr) {
metaClient_ = nullptr;
}
}

// Caller must make sure JobId + TaskId is unique
void addAsyncTask(std::shared_ptr<AdminTask> task);

Expand All @@ -45,6 +52,10 @@ class AdminTaskManager {
nebula::cpp2::ErrorCode cancelJob(JobID jobId);
nebula::cpp2::ErrorCode cancelTask(JobID jobId, TaskID taskId = -1);

void cancelTasks(GraphSpaceID spaceId);
int32_t runningTaskCnt(GraphSpaceID spaceId);
void waitCancelTasks(GraphSpaceID spaceId);

bool init();

void shutdown();
Expand All @@ -67,6 +78,9 @@ class AdminTaskManager {
nebula::cpp2::ErrorCode rc,
const nebula::meta::cpp2::StatsItem& result);

protected:
meta::MetaClient* metaClient_{nullptr};

private:
void schedule();
void runSubTask(TaskHandle handle);
Expand Down
8 changes: 4 additions & 4 deletions src/storage/admin/RebuildEdgeIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
PartitionID part,
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Rebuild Edge Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

auto vidSizeRet = env_->schemaMan_->getSpaceVidLen(space);
Expand Down Expand Up @@ -64,9 +64,9 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
RowReaderWrapper reader;
size_t batchSize = 0;
while (iter && iter->valid()) {
if (canceled_) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Rebuild Edge Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

if (batchSize >= FLAGS_rebuild_index_batch_size) {
Expand Down
7 changes: 6 additions & 1 deletion src/storage/admin/RebuildIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations(
GraphSpaceID space, PartitionID part, kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part);
return nebula::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

while (true) {
if (UNLIKELY(canceled_)) {
LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part);
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

std::unique_ptr<kvstore::KVIterator> operationIter;
auto operationPrefix = OperationKeyUtils::operationPrefix(part);
auto operationRet = env_->kvstore_->prefix(space, part, operationPrefix, &operationIter);
Expand Down
7 changes: 5 additions & 2 deletions src/storage/admin/RebuildIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ class RebuildIndexTask : public AdminTask {
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) = 0;

void cancel() override { canceled_ = true; }
void cancel() override {
canceled_ = true;
auto suc = nebula::cpp2::ErrorCode::SUCCEEDED;
rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL);
}

nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space,
PartitionID part,
Expand All @@ -59,7 +63,6 @@ class RebuildIndexTask : public AdminTask {
nebula::cpp2::ErrorCode invoke(GraphSpaceID space, PartitionID part, const IndexItems& items);

protected:
std::atomic<bool> canceled_{false};
GraphSpaceID space_;
};

Expand Down
8 changes: 4 additions & 4 deletions src/storage/admin/RebuildTagIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space
PartitionID part,
const IndexItems& items,
kvstore::RateLimiter* rateLimiter) {
if (canceled_) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Rebuild Tag Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

auto vidSizeRet = env_->schemaMan_->getSpaceVidLen(space);
Expand Down Expand Up @@ -64,9 +64,9 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space
RowReaderWrapper reader;
size_t batchSize = 0;
while (iter && iter->valid()) {
if (canceled_) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Rebuild Tag Index is Canceled";
return nebula::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

if (batchSize >= FLAGS_rebuild_index_batch_size) {
Expand Down
15 changes: 15 additions & 0 deletions src/storage/admin/StatsTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
PartitionID part,
std::unordered_map<TagID, std::string> tags,
std::unordered_map<EdgeType, std::string> edges) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Stats task is canceled";
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

auto vIdLenRet = env_->schemaMan_->getSpaceVidLen(spaceId);
if (!vIdLenRet.ok()) {
LOG(ERROR) << "Get space vid length failed";
Expand Down Expand Up @@ -140,6 +145,11 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
// 2 5
// 3 1
while (vertexIter && vertexIter->valid()) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Stats task is canceled";
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

auto key = vertexIter->key();
auto vId = NebulaKeyUtils::getVertexId(vIdLen, key).str();
auto tagId = NebulaKeyUtils::getTagId(vIdLen, key);
Expand Down Expand Up @@ -168,6 +178,11 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
// 2 2 1 3 (invalid data, for example, edge data without edge
// schema) 2 3 1 4 2 3 1 5
while (edgeIter && edgeIter->valid()) {
if (UNLIKELY(canceled_)) {
LOG(ERROR) << "Stats task is canceled";
return nebula::cpp2::ErrorCode::E_USER_CANCEL;
}

auto key = edgeIter->key();

auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen, key);
Expand Down
7 changes: 5 additions & 2 deletions src/storage/admin/StatsTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ class StatsTask : public AdminTask {
void finish(nebula::cpp2::ErrorCode rc) override;

protected:
void cancel() override { canceled_ = true; }
void cancel() override {
canceled_ = true;
auto suc = nebula::cpp2::ErrorCode::SUCCEEDED;
rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL);
}

nebula::cpp2::ErrorCode genSubTask(GraphSpaceID space,
PartitionID part,
Expand All @@ -38,7 +42,6 @@ class StatsTask : public AdminTask {
nebula::cpp2::ErrorCode getSchemas(GraphSpaceID spaceId);

protected:
std::atomic<bool> canceled_{false};
GraphSpaceID spaceId_;

// All tagIds and tagName of the spaceId
Expand Down

0 comments on commit 9fbe164

Please sign in to comment.