Skip to content

Commit

Permalink
Merge branch 'master' into add_api_for_master
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince authored Jan 25, 2024
2 parents 0ae2cf1 + daa9101 commit 1689851
Show file tree
Hide file tree
Showing 26 changed files with 592 additions and 168 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ git clone https://github.com/vesoft-inc/nebula-cpp.git
### build

```bash
bash> cd nebula-clients/cpp && mkdir build && cd build
bash> cd nebula-cpp && mkdir build && cd build
bash> cmake ..
bash> make && sudo make install
```
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ add_executable(storage_client_example
target_link_libraries(session_example
PRIVATE
nebula_graph_client
-pthread
)

target_link_libraries(session_pool_example
PRIVATE
nebula_graph_client
-pthread
)

target_link_libraries(storage_client_example
PRIVATE
nebula_storage_client
-pthread
)
4 changes: 3 additions & 1 deletion examples/SessionPoolExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <atomic>
#include <chrono>
#include <thread>

#include "nebula/client/Config.h"

int main(int argc, char* argv[]) {
Expand All @@ -33,12 +34,13 @@ int main(int argc, char* argv[]) {
config.spaceName_ = "session_pool_test";
config.maxSize_ = 10;
nebula::SessionPool pool(config);
pool.init();
assert(pool.init());

std::vector<std::thread> threads;
for (std::size_t i = 0; i < config.maxSize_; ++i) {
threads.emplace_back([&pool]() {
auto resp = pool.execute("YIELD 1");
std::cout << "resp's error code: " << resp.errorCode << std::endl;
assert(resp.errorCode == nebula::ErrorCode::SUCCEEDED);
std::cout << "Result: " << *resp.data << std::endl;
});
Expand Down
24 changes: 22 additions & 2 deletions examples/StorageClientExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nebula/sclient/ScanEdgeIter.h>
#include <common/Init.h>
#include <nebula/sclient/StorageClient.h>
#include "common/graph/Response.h"

int main(int argc, char* argv[]) {
nebula::init(&argc, &argv);
Expand All @@ -30,8 +31,27 @@ int main(int argc, char* argv[]) {
std::cout << "scan edge..." << std::endl;
while (scanEdgeIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanEdgeIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanEdgeIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

nebula::ScanVertexIter scanVertexIter = c.scanVertexWithPart("nba",
1,
{{"player", std::vector<std::string>{"name"}}},
10,
0,
std::numeric_limits<int64_t>::max(),
"",
true,
true);
std::cout << "scan vertex..." << std::endl;
while (scanVertexIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanVertexIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand Down
1 change: 1 addition & 0 deletions exported-symbols.map
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ global:
nebula::ExecutionResponse::*;
nebula::StorageClient::*;
nebula::ScanEdgeIter::*;
nebula::ScanVertexIter::*;
*::_S_empty_rep_storage;
};
local:
Expand Down
30 changes: 28 additions & 2 deletions include/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,22 @@ enum class ErrorCode { ErrorCodeEnums };

#undef X

const char *getErrorCode(ErrorCode code);
#define X(EnumName, EnumNumber) \
case ErrorCode::EnumName: \
return #EnumName;

static inline const char* getErrorCode(ErrorCode code) {
switch (code) { ErrorCodeEnums }
return "Unknown error";
}

static inline std::ostream &operator<<(std::ostream &os, ErrorCode code) {
os << getErrorCode(code);
return os;
}

#undef X

template <typename T>
bool inline checkPointer(const T *lhs, const T *rhs) {
if (lhs == rhs) {
Expand Down Expand Up @@ -419,7 +428,24 @@ struct PlanNodeDescription {
__clear();
}

bool operator==(const PlanNodeDescription &rhs) const;
bool operator==(const PlanNodeDescription &rhs) const {
if (name != rhs.name) {
return false;
}
if (id != rhs.id) {
return false;
}
if (!checkPointer(description.get(), rhs.description.get())) {
return false;
}
if (!checkPointer(profiles.get(), rhs.profiles.get())) {
return false;
}
if (!checkPointer(branchInfo.get(), rhs.branchInfo.get())) {
return false;
}
return checkPointer(dependencies.get(), rhs.dependencies.get());
}

std::string name;
int64_t id{-1};
Expand Down
18 changes: 15 additions & 3 deletions include/nebula/client/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <common/datatypes/DataSet.h>

#include <atomic>

#include "nebula/client/Connection.h"

namespace nebula {
Expand All @@ -25,14 +27,16 @@ class Session {
const std::string &username,
const std::string &password,
const std::string &timezoneName,
int32_t offsetSecs)
int32_t offsetSecs,
bool retryConnect)
: sessionId_(sessionId),
conn_(std::move(conn)),
pool_(pool),
username_(username),
password_(password),
timezoneName_(timezoneName),
offsetSecs_(offsetSecs) {}
offsetSecs_(offsetSecs),
retryConnect_(retryConnect) {}
Session(const Session &) = delete; // no copy
Session(Session &&session)
: sessionId_(session.sessionId_),
Expand All @@ -41,7 +45,8 @@ class Session {
username_(std::move(session.username_)),
password_(std::move(session.password_)),
timezoneName_(std::move(session.timezoneName_)),
offsetSecs_(session.offsetSecs_) {
offsetSecs_(session.offsetSecs_),
retryConnect_(session.retryConnect_) {
session.sessionId_ = -1;
session.pool_ = nullptr;
session.offsetSecs_ = 0;
Expand Down Expand Up @@ -105,6 +110,11 @@ class Session {
return toLocal(data, offsetSecs_);
}

static bool isSessionError(const ExecutionResponse &resp) {
return resp.errorCode == ErrorCode::E_SESSION_INVALID ||
resp.errorCode == ErrorCode::E_SESSION_NOT_FOUND;
}

// convert the time to specific time zone
static void toLocal(DataSet &data, int32_t offsetSecs);

Expand All @@ -117,6 +127,8 @@ class Session {
// empty means not a named timezone
std::string timezoneName_;
int32_t offsetSecs_;
bool retryConnect_{true};
std::atomic<bool> connectionIsBroken_{false};
};

} // namespace nebula
4 changes: 2 additions & 2 deletions include/nebula/client/SessionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ struct SessionPoolConfig {
std::string password_;
std::vector<std::string> addrs_; // the list of graph addresses
std::string spaceName_;
// Socket timeout and Socket connection timeout, unit: seconds
// Socket timeout and Socket connection timeout, unit: milliseconds
std::uint32_t timeout_{0};
// The idleTime of the connection, unit: seconds
// The idleTime of the connection, unit: milliseconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
std::uint32_t idleTime_{0};
Expand Down
9 changes: 9 additions & 0 deletions include/nebula/mclient/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ListSpacesReq;
class ListSpacesResp;
class IdName;
class EdgeItem;
class TagItem;
class ListEdgesReq;
class ListEdgesResp;

Expand All @@ -61,6 +62,9 @@ using SpaceNameIdMap = std::unordered_map<std::string, GraphSpaceID>;
using SpaceEdgeNameTypeMap =
std::unordered_map<std::pair<GraphSpaceID, std::string>, EdgeType, pair_hash>;

using SpaceTagNameTypeMap =
std::unordered_map<std::pair<GraphSpaceID, std::string>, TagID, pair_hash>;

class MetaClient {
public:
explicit MetaClient(const std::vector<std::string> &metaAddrs,
Expand All @@ -73,6 +77,8 @@ class MetaClient {
std::pair<bool, EdgeType> getEdgeTypeByNameFromCache(GraphSpaceID spaceId,
const std::string &name);

std::pair<bool, TagID> getTagIdByNameFromCache(GraphSpaceID spaceId, const std::string &name);

std::pair<bool, std::vector<PartitionID>> getPartsFromCache(GraphSpaceID spaceId);

std::pair<bool, HostAddr> getPartLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);
Expand All @@ -86,6 +92,8 @@ class MetaClient {

std::pair<bool, std::vector<meta::cpp2::EdgeItem>> listEdgeSchemas(GraphSpaceID spaceId);

std::pair<bool, std::vector<meta::cpp2::TagItem>> listTagSchemas(GraphSpaceID spaceId);

void loadLeader(const std::vector<nebula::meta::cpp2::HostItem> &hostItems,
const SpaceNameIdMap &spaceIndexByName);

Expand All @@ -107,6 +115,7 @@ class MetaClient {
MConfig mConfig_;
SpaceNameIdMap spaceIndexByName_;
SpaceEdgeNameTypeMap spaceEdgeIndexByName_;
SpaceTagNameTypeMap spaceTagIndexByName_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, HostAddr, pair_hash> spacePartLeaderMap_;
std::unordered_map<GraphSpaceID, std::vector<PartitionID>> spacePartsMap_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanEdgeIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanEdgeRequest* req_;
Expand Down
38 changes: 38 additions & 0 deletions include/nebula/sclient/ScanVertexIter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Copyright (c) 2023 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include <string>
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;

namespace storage {
namespace cpp2 {
class ScanVertexRequest;
} // namespace cpp2
} // namespace storage

struct ScanVertexIter {
ScanVertexIter(StorageClient* client, storage::cpp2::ScanVertexRequest* req, bool hasNext = true);

~ScanVertexIter();

bool hasNext();

std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanVertexRequest* req_;
bool hasNext_;
std::string nextCursor_;
};

} // namespace nebula
24 changes: 21 additions & 3 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "nebula/mclient/MetaClient.h"
#include "nebula/sclient/SConfig.h"
#include "nebula/sclient/ScanEdgeIter.h"
#include "nebula/sclient/ScanVertexIter.h"
#include "common/graph/Response.h"

namespace folly {
class IOThreadPoolExecutor;
Expand Down Expand Up @@ -58,6 +60,7 @@ class ScanResponse;

class StorageClient {
friend struct ScanEdgeIter;
friend struct ScanVertexIter;

public:
explicit StorageClient(const std::vector<std::string>& metaAddrs,
Expand All @@ -79,18 +82,33 @@ class StorageClient {
bool onlyLatestVersion = false,
bool enableReadFromFollower = true); // plato needed

ScanVertexIter scanVertexWithPart(
std::string spaceName,
int32_t partID,
// tag name -> prop names
std::unordered_map<std::string, std::vector<std::string>> tagProps,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true); // plato needed

MetaClient* getMetaClient() {
return mClient_.get();
}

private:
std::pair<bool, storage::cpp2::ScanResponse> doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanEdge(
const storage::cpp2::ScanEdgeRequest& req);

std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanVertex(
const storage::cpp2::ScanVertexRequest& req);

template <typename Request, typename RemoteFunc, typename Response>
void getResponse(std::pair<HostAddr, Request>&& request,
void getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro);
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro);

std::unique_ptr<MetaClient> mClient_;
SConfig sConfig_;
Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ set(NEBULA_COMMON_SOURCES
datatypes/Value.cpp
datatypes/Vertex.cpp
datatypes/Duration.cpp
graph/Response.cpp
time/TimeConversion.cpp
geo/io/wkt/WKTWriter.cpp
geo/io/wkb/WKBWriter.cpp
Expand All @@ -54,6 +53,7 @@ set(NEBULA_SCLIENT_SOURCES
${NEBULA_MCLIENT_SOURCES}
sclient/StorageClient.cpp
sclient/ScanEdgeIter.cpp
sclient/ScanVertexIter.cpp
)

set(NEBULA_THIRD_PARTY_LIBRARIES
Expand Down
Loading

0 comments on commit 1689851

Please sign in to comment.