Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

support balance data reset plan #463

Merged
merged 1 commit into from
Dec 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ nebula_add_library(
admin/SubmitJobExecutor.cpp
admin/BalanceExecutor.cpp
admin/StopBalanceExecutor.cpp
admin/ResetBalanceExecutor.cpp
admin/BalanceLeadersExecutor.cpp
admin/ShowBalanceExecutor.cpp
admin/ShowHostsExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "executor/admin/ListenerExecutor.h"
#include "executor/admin/SpaceExecutor.h"
#include "executor/admin/StopBalanceExecutor.h"
#include "executor/admin/ResetBalanceExecutor.h"
#include "executor/admin/SubmitJobExecutor.h"
#include "executor/admin/SwitchSpaceExecutor.h"
#include "executor/admin/UpdateUserExecutor.h"
Expand Down Expand Up @@ -362,6 +363,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kStopBalance: {
return pool->add(new StopBalanceExecutor(node, qctx));
}
case PlanNode::Kind::kResetBalance: {
return pool->add(new ResetBalanceExecutor(node, qctx));
}
case PlanNode::Kind::kShowBalance: {
return pool->add(new ShowBalanceExecutor(node, qctx));
}
Expand Down
2 changes: 1 addition & 1 deletion src/executor/admin/BalanceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ folly::Future<Status> BalanceExecutor::execute() {

folly::Future<Status> BalanceExecutor::balance() {
auto *bNode = asNode<Balance>(node());
return qctx()->getMetaClient()->balance(bNode->deleteHosts(), false)
return qctx()->getMetaClient()->balance(bNode->deleteHosts(), false, false)
.via(runner())
.then([this](StatusOr<int64_t> resp) {
SCOPED_TIMER(&execTime_);
Expand Down
34 changes: 34 additions & 0 deletions src/executor/admin/ResetBalanceExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "executor/admin/ResetBalanceExecutor.h"
#include "planner/Admin.h"

namespace nebula {
namespace graph {

folly::Future<Status> ResetBalanceExecutor::execute() {
SCOPED_TIMER(&execTime_);
return resetBalance();
}

folly::Future<Status> ResetBalanceExecutor::resetBalance() {
return qctx()->getMetaClient()->balance({}, false, true)
.via(runner())
.then([this](StatusOr<int64_t> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return resp.status();
}
DataSet v({"ID"});
v.emplace_back(Row({resp.value()}));
return finish(std::move(v));
});
}

} // namespace graph
} // namespace nebula
30 changes: 30 additions & 0 deletions src/executor/admin/ResetBalanceExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef EXECUTOR_ADMIN_RESETBALANCEEXECUTOR_H_
#define EXECUTOR_ADMIN_RESETBALANCEEXECUTOR_H_

#include "executor/Executor.h"
#include "context/QueryContext.h"

namespace nebula {
namespace graph {

class ResetBalanceExecutor final : public Executor {
public:
ResetBalanceExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("ResetBalanceExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
folly::Future<Status> resetBalance();
};

} // namespace graph
} // namespace nebula

#endif // EXECUTOR_ADMIN_RESETBALANCEEXECUTOR_H_
2 changes: 1 addition & 1 deletion src/executor/admin/StopBalanceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ folly::Future<Status> StopBalanceExecutor::execute() {
}

folly::Future<Status> StopBalanceExecutor::stopBalance() {
return qctx()->getMetaClient()->balance({}, true)
return qctx()->getMetaClient()->balance({}, true, false)
.via(runner())
.then([this](StatusOr<int64_t> resp) {
SCOPED_TIMER(&execTime_);
Expand Down
1 change: 1 addition & 0 deletions src/parser/AdminSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ class BalanceSentence final : public Sentence {
kLeader,
kData,
kDataStop,
kDataReset,
kShowBalancePlan,
};

Expand Down
7 changes: 6 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL;
%token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN
%token KW_ORDER KW_ASC KW_LIMIT KW_OFFSET
%token KW_DISTINCT KW_ALL KW_OF
%token KW_BALANCE KW_LEADER
%token KW_BALANCE KW_LEADER KW_RESET KW_PLAN
%token KW_SHORTEST KW_PATH KW_NOLOOP
%token KW_IS KW_NULL KW_DEFAULT
%token KW_SNAPSHOT KW_SNAPSHOTS KW_LOOKUP
Expand Down Expand Up @@ -477,6 +477,8 @@ unreserved_keyword
| KW_SIGN { $$ = new std::string("sign"); }
| KW_SERVICE { $$ = new std::string("service"); }
| KW_TEXT_SEARCH { $$ = new std::string("text_search"); }
| KW_RESET { $$ = new std::string("reset"); }
| KW_PLAN { $$ = new std::string("plan"); }
;

agg_function
Expand Down Expand Up @@ -2880,6 +2882,9 @@ balance_sentence
| KW_BALANCE KW_DATA KW_STOP {
$$ = new BalanceSentence(BalanceSentence::SubType::kDataStop);
}
| KW_BALANCE KW_DATA KW_RESET KW_PLAN {
$$ = new BalanceSentence(BalanceSentence::SubType::kDataReset);
}
| KW_BALANCE KW_DATA KW_REMOVE host_list {
$$ = new BalanceSentence(BalanceSentence::SubType::kData, $4);
}
Expand Down
2 changes: 2 additions & 0 deletions src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])
"SIGN" { return TokenType::KW_SIGN; }
"SERVICE" { return TokenType::KW_SERVICE; }
"TEXT_SEARCH" { return TokenType::KW_TEXT_SEARCH; }
"RESET" { return TokenType::KW_RESET; }
"PLAN" { return TokenType::KW_PLAN; }
"TRUE" { yylval->boolval = true; return TokenType::BOOL; }
"FALSE" { yylval->boolval = false; return TokenType::BOOL; }

Expand Down
30 changes: 30 additions & 0 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,12 @@ TEST(Parser, BalanceOperation) {
auto result = parser.parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
GQLParser parser;
std::string query = "BALANCE DATA RESET PLAN";
auto result = parser.parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
}

TEST(Parser, CrashByFuzzer) {
Expand Down Expand Up @@ -2921,6 +2927,30 @@ TEST(Parser, FullText) {
}

TEST(Parser, FullTextServiceTest) {
{
GQLParser parser;
std::string query = "ADD LISTENER ELASTICSEARCH 127.0.0.1:12000";
auto result = parser.parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
GQLParser parser;
std::string query = "ADD LISTENER ELASTICSEARCH 127.0.0.1:12000, 127.0.0.1:12001";
auto result = parser.parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
GQLParser parser;
std::string query = "REMOVE LISTENER ELASTICSEARCH";
auto result = parser.parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
GQLParser parser;
std::string query = "SHOW LISTENER";
auto result = parser.parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
GQLParser parser;
std::string query = "SIGN IN TEXT SERVICE (127.0.0.1:9200)";
Expand Down
6 changes: 6 additions & 0 deletions src/parser/test/ScannerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ TEST(Scanner, Basic) {
CHECK_SEMANTIC_TYPE("LEADER", TokenType::KW_LEADER),
CHECK_SEMANTIC_TYPE("Leader", TokenType::KW_LEADER),
CHECK_SEMANTIC_TYPE("leader", TokenType::KW_LEADER),
CHECK_SEMANTIC_TYPE("RESET", TokenType::KW_RESET),
CHECK_SEMANTIC_TYPE("reset", TokenType::KW_RESET),
CHECK_SEMANTIC_TYPE("Reset", TokenType::KW_RESET),
CHECK_SEMANTIC_TYPE("PLAN", TokenType::KW_PLAN),
CHECK_SEMANTIC_TYPE("plan", TokenType::KW_PLAN),
CHECK_SEMANTIC_TYPE("Plan", TokenType::KW_PLAN),
CHECK_SEMANTIC_TYPE("FETCH", TokenType::KW_FETCH),
CHECK_SEMANTIC_TYPE("Fetch", TokenType::KW_FETCH),
CHECK_SEMANTIC_TYPE("fetch", TokenType::KW_FETCH),
Expand Down
11 changes: 11 additions & 0 deletions src/planner/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,17 @@ class StopBalance final : public SingleDependencyNode {
: SingleDependencyNode(qctx, Kind::kStopBalance, dep) {}
};

class ResetBalance final : public SingleDependencyNode {
public:
static ResetBalance* make(QueryContext* qctx, PlanNode* dep) {
return qctx->objPool()->add(new ResetBalance(qctx, dep));
}

private:
explicit ResetBalance(QueryContext* qctx, PlanNode* dep)
: SingleDependencyNode(qctx, Kind::kResetBalance, dep) {}
};

class ShowBalance final : public SingleDependencyNode {
public:
static ShowBalance* make(QueryContext* qctx, PlanNode* dep, int64_t jobId) {
Expand Down
2 changes: 2 additions & 0 deletions src/planner/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "Balance";
case Kind::kStopBalance:
return "StopBalance";
case Kind::kResetBalance:
return "ResetBalance";
case Kind::kShowBalance:
return "ShowBalance";
case Kind::kSubmitJob:
Expand Down
1 change: 1 addition & 0 deletions src/planner/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class PlanNode {
kBalanceLeaders,
kBalance,
kStopBalance,
kResetBalance,
kShowBalance,
kSubmitJob,
kShowHosts,
Expand Down
3 changes: 3 additions & 0 deletions src/validator/BalanceValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Status BalanceValidator::toPlan() {
case BalanceSentence::SubType::kDataStop:
current = StopBalance::make(qctx_, nullptr);
break;
case BalanceSentence::SubType::kDataReset:
current = ResetBalance::make(qctx_, nullptr);
break;
case BalanceSentence::SubType::kShowBalancePlan:
current = ShowBalance::make(qctx_, nullptr, sentence->balanceId());
break;
Expand Down