From d85d4ac8c9830a233b3c04974b780c9acf080b69 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 8 Dec 2020 11:13:07 +0800 Subject: [PATCH] support balance data reset plan --- src/executor/CMakeLists.txt | 1 + src/executor/Executor.cpp | 4 +++ src/executor/admin/BalanceExecutor.cpp | 2 +- src/executor/admin/ResetBalanceExecutor.cpp | 34 +++++++++++++++++++++ src/executor/admin/ResetBalanceExecutor.h | 30 ++++++++++++++++++ src/executor/admin/StopBalanceExecutor.cpp | 2 +- src/parser/AdminSentences.h | 1 + src/parser/parser.yy | 7 ++++- src/parser/scanner.lex | 2 ++ src/parser/test/ParserTest.cpp | 30 ++++++++++++++++++ src/parser/test/ScannerTest.cpp | 6 ++++ src/planner/Admin.h | 11 +++++++ src/planner/PlanNode.cpp | 2 ++ src/planner/PlanNode.h | 1 + src/validator/BalanceValidator.cpp | 3 ++ 15 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 src/executor/admin/ResetBalanceExecutor.cpp create mode 100644 src/executor/admin/ResetBalanceExecutor.h diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index f98f916b5..0758eb49c 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -46,6 +46,7 @@ nebula_add_library( admin/SubmitJobExecutor.cpp admin/BalanceExecutor.cpp admin/StopBalanceExecutor.cpp + admin/ResetBalanceExecutor.cpp admin/BalanceLeadersExecutor.cpp admin/ShowBalanceExecutor.cpp admin/ShowHostsExecutor.cpp diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 897d7d596..78d9fb608 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -32,6 +32,7 @@ #include "executor/admin/ListenerExecutor.h" #include "executor/admin/SpaceExecutor.h" #include "executor/admin/StopBalanceExecutor.h" +#include "executor/admin/ResetBalanceExecutor.h" #include "executor/admin/SubmitJobExecutor.h" #include "executor/admin/SwitchSpaceExecutor.h" #include "executor/admin/UpdateUserExecutor.h" @@ -362,6 +363,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kStopBalance: { return pool->add(new StopBalanceExecutor(node, qctx)); } + case PlanNode::Kind::kResetBalance: { + return pool->add(new ResetBalanceExecutor(node, qctx)); + } case PlanNode::Kind::kShowBalance: { return pool->add(new ShowBalanceExecutor(node, qctx)); } diff --git a/src/executor/admin/BalanceExecutor.cpp b/src/executor/admin/BalanceExecutor.cpp index 753261a96..4c5af86a4 100644 --- a/src/executor/admin/BalanceExecutor.cpp +++ b/src/executor/admin/BalanceExecutor.cpp @@ -17,7 +17,7 @@ folly::Future BalanceExecutor::execute() { folly::Future BalanceExecutor::balance() { auto *bNode = asNode(node()); - return qctx()->getMetaClient()->balance(bNode->deleteHosts(), false) + return qctx()->getMetaClient()->balance(bNode->deleteHosts(), false, false) .via(runner()) .then([this](StatusOr resp) { SCOPED_TIMER(&execTime_); diff --git a/src/executor/admin/ResetBalanceExecutor.cpp b/src/executor/admin/ResetBalanceExecutor.cpp new file mode 100644 index 000000000..a9551d7d4 --- /dev/null +++ b/src/executor/admin/ResetBalanceExecutor.cpp @@ -0,0 +1,34 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "executor/admin/ResetBalanceExecutor.h" +#include "planner/Admin.h" + +namespace nebula { +namespace graph { + +folly::Future ResetBalanceExecutor::execute() { + SCOPED_TIMER(&execTime_); + return resetBalance(); +} + +folly::Future ResetBalanceExecutor::resetBalance() { + return qctx()->getMetaClient()->balance({}, false, true) + .via(runner()) + .then([this](StatusOr resp) { + SCOPED_TIMER(&execTime_); + if (!resp.ok()) { + LOG(ERROR) << resp.status(); + return resp.status(); + } + DataSet v({"ID"}); + v.emplace_back(Row({resp.value()})); + return finish(std::move(v)); + }); +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/admin/ResetBalanceExecutor.h b/src/executor/admin/ResetBalanceExecutor.h new file mode 100644 index 000000000..ae920fe8d --- /dev/null +++ b/src/executor/admin/ResetBalanceExecutor.h @@ -0,0 +1,30 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef EXECUTOR_ADMIN_RESETBALANCEEXECUTOR_H_ +#define EXECUTOR_ADMIN_RESETBALANCEEXECUTOR_H_ + +#include "executor/Executor.h" +#include "context/QueryContext.h" + +namespace nebula { +namespace graph { + +class ResetBalanceExecutor final : public Executor { +public: + ResetBalanceExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("ResetBalanceExecutor", node, qctx) {} + + folly::Future execute() override; + +private: + folly::Future resetBalance(); +}; + +} // namespace graph +} // namespace nebula + +#endif // EXECUTOR_ADMIN_RESETBALANCEEXECUTOR_H_ diff --git a/src/executor/admin/StopBalanceExecutor.cpp b/src/executor/admin/StopBalanceExecutor.cpp index e0ac8f278..fdc58faae 100644 --- a/src/executor/admin/StopBalanceExecutor.cpp +++ b/src/executor/admin/StopBalanceExecutor.cpp @@ -16,7 +16,7 @@ folly::Future StopBalanceExecutor::execute() { } folly::Future StopBalanceExecutor::stopBalance() { - return qctx()->getMetaClient()->balance({}, true) + return qctx()->getMetaClient()->balance({}, true, false) .via(runner()) .then([this](StatusOr resp) { SCOPED_TIMER(&execTime_); diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 4686df0a6..2dee1c0f0 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -452,6 +452,7 @@ class BalanceSentence final : public Sentence { kLeader, kData, kDataStop, + kDataReset, kShowBalancePlan, }; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 7781aa770..a0edacb56 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -154,7 +154,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL; %token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN %token KW_ORDER KW_ASC KW_LIMIT KW_OFFSET %token KW_DISTINCT KW_ALL KW_OF -%token KW_BALANCE KW_LEADER +%token KW_BALANCE KW_LEADER KW_RESET KW_PLAN %token KW_SHORTEST KW_PATH KW_NOLOOP %token KW_IS KW_NULL KW_DEFAULT %token KW_SNAPSHOT KW_SNAPSHOTS KW_LOOKUP @@ -477,6 +477,8 @@ unreserved_keyword | KW_SIGN { $$ = new std::string("sign"); } | KW_SERVICE { $$ = new std::string("service"); } | KW_TEXT_SEARCH { $$ = new std::string("text_search"); } + | KW_RESET { $$ = new std::string("reset"); } + | KW_PLAN { $$ = new std::string("plan"); } ; agg_function @@ -2880,6 +2882,9 @@ balance_sentence | KW_BALANCE KW_DATA KW_STOP { $$ = new BalanceSentence(BalanceSentence::SubType::kDataStop); } + | KW_BALANCE KW_DATA KW_RESET KW_PLAN { + $$ = new BalanceSentence(BalanceSentence::SubType::kDataReset); + } | KW_BALANCE KW_DATA KW_REMOVE host_list { $$ = new BalanceSentence(BalanceSentence::SubType::kData, $4); } diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 6eb8f7801..8af4ee10e 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -234,6 +234,8 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) "SIGN" { return TokenType::KW_SIGN; } "SERVICE" { return TokenType::KW_SERVICE; } "TEXT_SEARCH" { return TokenType::KW_TEXT_SEARCH; } +"RESET" { return TokenType::KW_RESET; } +"PLAN" { return TokenType::KW_PLAN; } "TRUE" { yylval->boolval = true; return TokenType::BOOL; } "FALSE" { yylval->boolval = false; return TokenType::BOOL; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 5cc918c52..1fd29d00a 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -1970,6 +1970,12 @@ TEST(Parser, BalanceOperation) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "BALANCE DATA RESET PLAN"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } } TEST(Parser, CrashByFuzzer) { @@ -2921,6 +2927,30 @@ TEST(Parser, FullText) { } TEST(Parser, FullTextServiceTest) { + { + GQLParser parser; + std::string query = "ADD LISTENER ELASTICSEARCH 127.0.0.1:12000"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "ADD LISTENER ELASTICSEARCH 127.0.0.1:12000, 127.0.0.1:12001"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "REMOVE LISTENER ELASTICSEARCH"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "SHOW LISTENER"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } { GQLParser parser; std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200)"; diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index 8764ddedc..15e73655a 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -381,6 +381,12 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("LEADER", TokenType::KW_LEADER), CHECK_SEMANTIC_TYPE("Leader", TokenType::KW_LEADER), CHECK_SEMANTIC_TYPE("leader", TokenType::KW_LEADER), + CHECK_SEMANTIC_TYPE("RESET", TokenType::KW_RESET), + CHECK_SEMANTIC_TYPE("reset", TokenType::KW_RESET), + CHECK_SEMANTIC_TYPE("Reset", TokenType::KW_RESET), + CHECK_SEMANTIC_TYPE("PLAN", TokenType::KW_PLAN), + CHECK_SEMANTIC_TYPE("plan", TokenType::KW_PLAN), + CHECK_SEMANTIC_TYPE("Plan", TokenType::KW_PLAN), CHECK_SEMANTIC_TYPE("FETCH", TokenType::KW_FETCH), CHECK_SEMANTIC_TYPE("Fetch", TokenType::KW_FETCH), CHECK_SEMANTIC_TYPE("fetch", TokenType::KW_FETCH), diff --git a/src/planner/Admin.h b/src/planner/Admin.h index 36bdfcd94..78d912b24 100644 --- a/src/planner/Admin.h +++ b/src/planner/Admin.h @@ -812,6 +812,17 @@ class StopBalance final : public SingleDependencyNode { : SingleDependencyNode(qctx, Kind::kStopBalance, dep) {} }; +class ResetBalance final : public SingleDependencyNode { +public: + static ResetBalance* make(QueryContext* qctx, PlanNode* dep) { + return qctx->objPool()->add(new ResetBalance(qctx, dep)); + } + +private: + explicit ResetBalance(QueryContext* qctx, PlanNode* dep) + : SingleDependencyNode(qctx, Kind::kResetBalance, dep) {} +}; + class ShowBalance final : public SingleDependencyNode { public: static ShowBalance* make(QueryContext* qctx, PlanNode* dep, int64_t jobId) { diff --git a/src/planner/PlanNode.cpp b/src/planner/PlanNode.cpp index c8813e334..88c120ac2 100644 --- a/src/planner/PlanNode.cpp +++ b/src/planner/PlanNode.cpp @@ -163,6 +163,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "Balance"; case Kind::kStopBalance: return "StopBalance"; + case Kind::kResetBalance: + return "ResetBalance"; case Kind::kShowBalance: return "ShowBalance"; case Kind::kSubmitJob: diff --git a/src/planner/PlanNode.h b/src/planner/PlanNode.h index 1b9219efd..f95873356 100644 --- a/src/planner/PlanNode.h +++ b/src/planner/PlanNode.h @@ -79,6 +79,7 @@ class PlanNode { kBalanceLeaders, kBalance, kStopBalance, + kResetBalance, kShowBalance, kSubmitJob, kShowHosts, diff --git a/src/validator/BalanceValidator.cpp b/src/validator/BalanceValidator.cpp index e3081cb4f..d8f34540d 100644 --- a/src/validator/BalanceValidator.cpp +++ b/src/validator/BalanceValidator.cpp @@ -36,6 +36,9 @@ Status BalanceValidator::toPlan() { case BalanceSentence::SubType::kDataStop: current = StopBalance::make(qctx_, nullptr); break; + case BalanceSentence::SubType::kDataReset: + current = ResetBalance::make(qctx_, nullptr); + break; case BalanceSentence::SubType::kShowBalancePlan: current = ShowBalance::make(qctx_, nullptr, sentence->balanceId()); break;