Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotation for Graph Service #3724

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
#include <folly/String.h>
#include <gflags/gflags.h>

#include <algorithm>
#include <cstdio>
#include <fstream>
#include <regex>

#include "common/fs/FileUtils.h"

DEFINE_bool(containerized, false, "Whether run this process inside the docker container");
Expand Down Expand Up @@ -42,7 +37,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
uint64_t cacheSize = 0;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
cacheSize += std::stoul(sm[2].str(), NULL);
cacheSize += std::stoul(sm[2].str(), nullptr);
}

std::string limitPath =
Expand All @@ -64,7 +59,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
std::vector<uint64_t> memorySize;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
memorySize.emplace_back(std::stoul(sm[2].str(), NULL) << 10);
memorySize.emplace_back(std::stoul(sm[2].str(), nullptr) << 10);
}
std::sort(memorySize.begin(), memorySize.end());
if (memorySize.size() >= 2u) {
Expand Down
3 changes: 2 additions & 1 deletion src/graph/service/Authenticator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
namespace nebula {
namespace graph {

// Interface for authentication
class Authenticator {
public:
virtual ~Authenticator() {}
virtual ~Authenticator() = default;

virtual Status NG_MUST_USE_RESULT auth(const std::string &user, const std::string &password) = 0;
};
Expand Down
8 changes: 2 additions & 6 deletions src/graph/service/CloudAuthenticator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,10 @@ Status CloudAuthenticator::auth(const std::string& user, const std::string& pass
std::string userAndPasswd = user + ":" + password;
std::string base64Str = proxygen::base64Encode(folly::StringPiece(userAndPasswd));

std::string header = "-H \"Content-Type: application/json\" -H \"Authorization:Nebula ";
std::string header = R"(-H "Content-Type: application/json" -H "Authorization:Nebula )";
header = header + base64Str + "\"";
auto result = http::HttpClient::post(FLAGS_cloud_http_url, header);

if (!result.ok()) {
LOG(ERROR) << result.status();
return result.status();
}
NG_LOG_AND_RETURN_IF_ERROR(result);

try {
auto json = folly::parseJson(result.value());
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ DECLARE_uint32(failed_login_attempts);
// The deault value is 0. A value of 0 disables the option.
DECLARE_uint32(password_lock_time_in_secs);

// optimizer
// Optimizer
DECLARE_bool(enable_optimizer);

DECLARE_int64(max_allowed_connections);
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/GraphServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bool GraphServer::start() {
return false;
}

// Init worker id for snowflake generating unique id
nebula::Snowflake::initWorkerId(interface->metaClient_.get());

graphThread_ = std::make_unique<std::thread>([&] {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class GraphServer {

void stop();

// used for signal handler to set an internal stop flag
// Used for signal handler to set an internal stop flag
void notifyStop();

void waitUntilStop();
Expand Down
5 changes: 3 additions & 2 deletions src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <proxygen/lib/utils/CryptUtil.h>

#include <boost/filesystem.hpp>
#include <memory>

#include "clients/storage/StorageClient.h"
#include "common/base/Base.h"
Expand Down Expand Up @@ -41,7 +42,7 @@ Status GraphService::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecuto

metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()), options);

// load data try 3 time
// Load data try 3 time
bool loadDataOk = metaClient_->waitForMetadReady(3);
if (!loadDataOk) {
// Resort to retrying in the background
Expand Down Expand Up @@ -69,7 +70,7 @@ folly::Future<AuthResponse> GraphService::future_authenticate(const std::string&

auto ctx = std::make_unique<RequestContext<AuthResponse>>();
auto future = ctx->future();
// check username and password failed
// Check username and password failed
auto authResult = auth(username, password);
if (!authResult.ok()) {
ctx->resp().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphService.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace graph {
class GraphService final : public cpp2::GraphServiceSvIf {
public:
GraphService() = default;
~GraphService() = default;
~GraphService() override = default;

Status NG_MUST_USE_RESULT init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor,
const HostAddr& hostAddr);
Expand Down
13 changes: 6 additions & 7 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ namespace graph {
* Special operation : kShow, kChangePassword
*/

// static
Status PermissionCheck::permissionCheck(ClientSession *session,
Sentence *sentence,
ValidateContext *vctx,
GraphSpaceID targetSpace) {
/* static */ Status PermissionCheck::permissionCheck(ClientSession *session,
Sentence *sentence,
ValidateContext *vctx,
GraphSpaceID targetSpace) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down Expand Up @@ -165,7 +164,7 @@ Status PermissionCheck::permissionCheck(ClientSession *session,
case Sentence::Kind::kShowMetaLeader:
case Sentence::Kind::kShowHosts: {
/**
* all roles can be show for above operations.
* All roles can be show for above operations.
*/
return Status::OK();
}
Expand Down Expand Up @@ -206,7 +205,7 @@ Status PermissionCheck::permissionCheck(ClientSession *session,
return Status::OK();
}
case Sentence::Kind::kExplain:
// everyone could explain
// Everyone could explain
return Status::OK();
case Sentence::Kind::kSequential: {
// No permission checking for sequential sentence.
Expand Down
24 changes: 10 additions & 14 deletions src/graph/service/PermissionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
namespace nebula {
namespace graph {

// static
Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spaceId) {
/* static */ Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spaceId) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -35,8 +34,8 @@ Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spac
return Status::PermissionError("No permission to read space.");
}

// static
Status PermissionManager::canReadSchemaOrData(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canReadSchemaOrData(ClientSession *session,
ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -60,8 +59,7 @@ Status PermissionManager::canReadSchemaOrData(ClientSession *session, ValidateCo
return Status::PermissionError("No permission to read schema/data.");
}

// static
Status PermissionManager::canWriteSpace(ClientSession *session) {
/* static */ Status PermissionManager::canWriteSpace(ClientSession *session) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -71,8 +69,8 @@ Status PermissionManager::canWriteSpace(ClientSession *session) {
return Status::PermissionError("No permission to write space.");
}

// static
Status PermissionManager::canWriteSchema(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canWriteSchema(ClientSession *session,
ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -97,8 +95,7 @@ Status PermissionManager::canWriteSchema(ClientSession *session, ValidateContext
return Status::PermissionError("No permission to write schema.");
}

// static
Status PermissionManager::canWriteUser(ClientSession *session) {
/* static */ Status PermissionManager::canWriteUser(ClientSession *session) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -113,8 +110,8 @@ Status PermissionManager::canWriteUser(ClientSession *session) {
}
}

// static
Status PermissionManager::canReadUser(ClientSession *session, const std::string &targetUser) {
/* static */ Status PermissionManager::canReadUser(ClientSession *session,
const std::string &targetUser) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down Expand Up @@ -177,8 +174,7 @@ Status PermissionManager::canWriteRole(ClientSession *session,
targetUser.c_str());
}

// static
Status PermissionManager::canWriteData(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canWriteData(ClientSession *session, ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/PermissionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace nebula {
namespace graph {

// This module is responsible for checking the permission of the user
class PermissionManager final {
public:
PermissionManager() = delete;
Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/QueryEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DEFINE_int32(check_memory_interval_in_secs, 1, "Memory check interval in seconds
namespace nebula {
namespace graph {

// Register planners, init optimizer and set memory monitor
Status QueryEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor,
meta::MetaClient* metaClient) {
metaClient_ = metaClient;
Expand Down Expand Up @@ -54,6 +55,7 @@ void QueryEngine::execute(RequestContextPtr rctx) {
instance->execute();
}

// Setup memory monitor thread
Status QueryEngine::setupMemoryMonitorThread() {
memoryMonitorThread_ = std::make_unique<thread::GenericWorker>();
if (!memoryMonitorThread_ || !memoryMonitorThread_->start("graph-memory-monitor")) {
Expand Down
18 changes: 14 additions & 4 deletions src/graph/service/QueryInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ void QueryInstance::execute() {
return;
}

if (!explainOrContinue()) {
// Sentence is explain query, finish
if (explainAndFinish()) {
onFinish();
return;
}

// The execution engine converts the physical execution plan generated by the Planner into a
// series of Executors through the Scheduler to drive the execution of the Executors.
scheduler_->schedule()
.thenValue([this](Status s) {
if (s.ok()) {
Expand All @@ -62,10 +65,12 @@ void QueryInstance::execute() {
[this](const std::exception &e) { onError(Status::Error("%s", e.what())); });
}

// Get query from GQLParser, validate and optimize the query
Status QueryInstance::validateAndOptimize() {
auto *rctx = qctx()->rctx();
auto &spaceName = rctx->session()->space().name;
VLOG(1) << "Parsing query: " << rctx->query();
// Result of parsing, get the parsing tree
auto result = GQLParser(qctx()).parse(rctx->query());
NG_RETURN_IF_ERROR(result);
sentence_ = std::move(result).value();
Expand All @@ -84,7 +89,9 @@ Status QueryInstance::validateAndOptimize() {
}
}

// Validate the query, if failed, return
NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx()));
// Optimize the query, and get the execution plan
NG_RETURN_IF_ERROR(findBestPlan());
stats::StatsManager::addValue(kOptimizerLatencyUs, *(qctx_->plan()->optimizeTimeInUs()));
if (FLAGS_enable_space_level_metrics && spaceName != "") {
Expand All @@ -95,9 +102,10 @@ Status QueryInstance::validateAndOptimize() {
return Status::OK();
}

bool QueryInstance::explainOrContinue() {
// Sentence is explain query, return the description of plan , and then finish
bool QueryInstance::explainAndFinish() {
if (sentence_->kind() != Sentence::Kind::kExplain) {
return true;
return false;
}
auto &resp = qctx_->rctx()->resp();
resp.planDesc = std::make_unique<PlanDescription>();
Expand Down Expand Up @@ -209,6 +217,7 @@ void QueryInstance::addSlowQueryStats(uint64_t latency, const std::string &space
}
}

// Get result from query context and fill the response
void QueryInstance::fillRespData(ExecutionResponse *resp) {
auto ectx = DCHECK_NOTNULL(qctx_->ectx());
auto plan = DCHECK_NOTNULL(qctx_->plan());
Expand All @@ -218,7 +227,7 @@ void QueryInstance::fillRespData(ExecutionResponse *resp) {
auto &&value = ectx->moveValue(name);
if (!value.isDataSet()) return;

// fill dataset
// Fill dataset
auto result = value.moveDataSet();
if (!result.colNames.empty()) {
resp->data = std::make_unique<DataSet>(std::move(result));
Expand All @@ -229,6 +238,7 @@ void QueryInstance::fillRespData(ExecutionResponse *resp) {
}
}

// The entry point of the optimizer
Status QueryInstance::findBestPlan() {
auto plan = qctx_->plan();
SCOPED_TIMER(plan->optimizeTimeInUs());
Expand Down
5 changes: 3 additions & 2 deletions src/graph/service/QueryInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class QueryInstance final : public boost::noncopyable, public cpp::NonMovable {
explicit QueryInstance(std::unique_ptr<QueryContext> qctx, opt::Optimizer* optimizer);
~QueryInstance() = default;

// Entrance of the Validate, Optimize, Schedule, Execute process
void execute();

QueryContext* qctx() const {
Expand All @@ -51,8 +52,8 @@ class QueryInstance final : public boost::noncopyable, public cpp::NonMovable {
void onError(Status);

Status validateAndOptimize();
// return true if continue to execute
bool explainOrContinue();
// Return true if continue to execute
bool explainAndFinish();
void addSlowQueryStats(uint64_t latency, const std::string& spaceName) const;
void fillRespData(ExecutionResponse* resp);
Status findBestPlan();
Expand Down
4 changes: 2 additions & 2 deletions src/graph/service/RequestContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RequestContext final : public boost::noncopyable, public cpp::NonMovable {
RequestContext() = default;
~RequestContext() {
if (session_ != nullptr) {
// keep the session active
// Keep the session active
session_->charge();
}
}
Expand All @@ -57,7 +57,7 @@ class RequestContext final : public boost::noncopyable, public cpp::NonMovable {
void setSession(std::shared_ptr<ClientSession> session) {
session_ = std::move(session);
if (session_ != nullptr) {
// keep the session active
// Keep the session active
session_->charge();
}
}
Expand Down