Skip to content

Commit

Permalink
Merge branch raft into master (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
innerr authored Apr 2, 2019
1 parent 15999e7 commit d34d7c9
Show file tree
Hide file tree
Showing 2,087 changed files with 64,850 additions and 3,771 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,9 @@ node_modules
public
website/docs
website/presentations

# TiCS built and test files
build_docker
docker/builder/tics
tests/docker/data
tests/docker/log
6 changes: 5 additions & 1 deletion cmake/find_kvproto.cmake
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Currently kvproto should always use bundled library.

if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/kvproto/cpp/kvproto/errorpb.pb.h")
message (FATAL_ERROR "kvproto submodule in contrib/kvproto is missing.")
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/kvproto/proto/errorpb.proto")
message (FATAL_ERROR "kvproto cpp files in contrib/kvproto is missing. Try go to contrib/kvproto, and run ./generate_cpp.sh")
else()
message (FATAL_ERROR "kvproto submodule in contrib/kvproto is missing. Try run 'git submodule update --init --recursive', and go to contrib/kvproto, and run ./generate_cpp.sh")
endif()
endif ()

message(STATUS "Using kvproto: ${ClickHouse_SOURCE_DIR}/contrib/kvproto/cpp")
Expand Down
1 change: 1 addition & 0 deletions contrib/client-c/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ build
.idea
*.iml
*.swp
*.swo
tags
.clang-format

Expand Down
2 changes: 2 additions & 0 deletions contrib/client-c/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 2.8)
project(kvClient)
set (CMAKE_CXX_STANDARD 11)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-narrowing")

if (NOT gRPC_FOUND)
include (cmake/find_grpc.cmake)
Expand Down
2 changes: 1 addition & 1 deletion contrib/client-c/cmake/find_kvproto.cmake
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Currently kvproto should always use bundled library.

if (NOT EXISTS "${kvClient_SOURCE_DIR}/third_party/kvproto/cpp/kvproto/errorpb.pb.h")
message (FATAL_ERROR "kvproto submodule in thrid_party/kvproto is missing.")
message (FATAL_ERROR "kvproto submodule in third_party/kvproto is missing.")
endif ()

message(STATUS "Using kvproto: ${kvClient_SOURCE_DIR}/third_party/kvproto/cpp")
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
#include <Poco/Exception.h>

namespace pingcap {
namespace kv {

const int MismatchClusterIDCode = 1;
const int GRPCErrorCode = 2;
const int InitClusterIDFailed = 3;
const int UpdatePDLeaderFailed = 4;
const int TimeoutError = 5;
const int RegionUnavailable = 6;

class Exception : public Poco::Exception
{
Expand All @@ -23,6 +27,4 @@ class Exception : public Poco::Exception

};


}
}
4 changes: 3 additions & 1 deletion contrib/client-c/include/tikv/Backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <map>
#include <memory>

#include <tikv/Exception.h>
#include <common/CltException.h>

namespace pingcap {
namespace kv {
Expand Down Expand Up @@ -71,6 +71,8 @@ struct Backoff {
}
};

constexpr int readIndexMaxBackoff = 20000;

using BackoffPtr = std::shared_ptr<Backoff>;

struct Backoffer {
Expand Down
20 changes: 19 additions & 1 deletion contrib/client-c/include/tikv/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <pd/MockPDClient.h>
#include <kvproto/metapb.pb.h>
#include <tikv/Backoff.h>
#include <common/Log.h>
#include <kvproto/errorpb.pb.h>

namespace pingcap {
namespace kv {
Expand All @@ -31,6 +33,8 @@ struct RegionVerID {
uint64_t confVer;
uint64_t ver;

RegionVerID(uint64_t id_, uint64_t conf_ver, uint64_t ver_): id(id_), confVer(conf_ver), ver(ver_){}

bool operator == (const RegionVerID & rhs) const {
return id == rhs.id && confVer == rhs.confVer && ver == rhs.ver;
}
Expand All @@ -56,6 +60,8 @@ struct Region {
metapb::Peer peer;
metapb::Peer learner;

Region(const metapb::Region & meta_, const metapb::Peer & peer_) : meta(meta_), peer(peer_), learner(metapb::Peer::default_instance()) {}

Region(const metapb::Region & meta_, const metapb::Peer & peer_, const metapb::Peer & learner_)
: meta(meta_), peer(peer_), learner(learner_) {}

Expand Down Expand Up @@ -118,12 +124,22 @@ using RPCContextPtr = std::shared_ptr<RPCContext>;

class RegionCache {
public:
RegionCache(pd::ClientPtr pdClient_) : pdClient(pdClient_) {
RegionCache(pd::ClientPtr pdClient_) : pdClient(pdClient_), log(&Logger::get("pingcap.tikv")) {
}

RPCContextPtr getRPCContext(Backoffer & bo, const RegionVerID & id, bool is_learner);

void updateLeader(Backoffer & bo, const RegionVerID & region_id, uint64_t leader_store_id);

//KeyLocation locateKey(Backoffer & bo, std::string key);
//
void dropRegion(const RegionVerID &);

void dropStore(uint64_t failed_store_id);

void dropStoreOnSendReqFail(RPCContextPtr & ctx, const Exception & exc);

void onRegionStale(RPCContextPtr ctx, const errorpb::StaleEpoch & stale_epoch);

private:
RegionPtr getCachedRegion(Backoffer & bo, const RegionVerID & id);
Expand Down Expand Up @@ -153,6 +169,8 @@ class RegionCache {
std::mutex region_mutex;

std::mutex store_mutex;

Logger * log;
};

using RegionCachePtr = std::shared_ptr<RegionCache>;
Expand Down
71 changes: 59 additions & 12 deletions contrib/client-c/include/tikv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,75 @@ struct RegionClient {

int64_t getReadIndex() {
auto request = new kvrpcpb::ReadIndexRequest();
Backoffer bo(10000);
Backoffer bo(readIndexMaxBackoff);
auto rpc_call = std::make_shared<RpcCall<kvrpcpb::ReadIndexRequest>>(request);
auto ctx = cache -> getRPCContext(bo, region_id, true);
store_addr = ctx->addr;
sendReqToRegion(bo, rpc_call, ctx);
sendReqToRegion(bo, rpc_call, true);
return rpc_call -> getResp() -> read_index();
}

template<typename T>
void sendReqToRegion(Backoffer & bo, RpcCallPtr<T> rpc, RPCContextPtr rpc_ctx) {
try {
rpc -> setCtx(rpc_ctx);
client -> sendRequest(store_addr, rpc);
void sendReqToRegion(Backoffer & bo, RpcCallPtr<T> rpc, bool learner) {
for (;;) {
auto ctx = cache -> getRPCContext(bo, region_id, learner);
store_addr = ctx->addr;
rpc -> setCtx(ctx);
try {
client -> sendRequest(store_addr, rpc);
} catch(const Exception & e) {
onSendFail(bo, e, ctx);
continue;
}
auto resp = rpc -> getResp();
if (resp -> has_region_error()) {
onRegionError(bo, ctx, resp->region_error());
} else {
return;
}
}
catch(const Exception & e) {
onSendFail(bo, e);
}

void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) {
if (err.has_not_leader()) {
auto not_leader = err.not_leader();
if (not_leader.has_leader()) {
cache -> updateLeader(bo, rpc_ctx->region, not_leader.leader().store_id());
bo.backoff(boUpdateLeader, Exception("not leader"));
} else {
cache -> dropRegion(rpc_ctx->region);
bo.backoff(boRegionMiss, Exception("not leader"));
}
return;
}

if (err.has_store_not_match()) {
cache -> dropStore(rpc_ctx->peer.store_id());
return;
}

if (err.has_stale_epoch()) {
cache -> onRegionStale(rpc_ctx, err.stale_epoch());
return;
}

if (err.has_server_is_busy()) {
bo.backoff(boServerBusy, Exception("server busy"));
return;
}

if (err.has_stale_command()) {
return;
}

if (err.has_raft_entry_too_large()) {
throw Exception("entry too large");
}

cache -> dropRegion(rpc_ctx -> region);
}

void onSendFail(Backoffer & ,const Exception & e) {
e.rethrow();
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) {
cache->dropStoreOnSendReqFail(rpc_ctx, e);
bo.backoff(boTiKVRPC, e);
}
};

Expand Down
5 changes: 4 additions & 1 deletion contrib/client-c/include/tikv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ class RpcCall {
void call(std::unique_ptr<tikvpb::Tikv::Stub> stub) {
if constexpr(std::is_same<T, kvrpcpb::ReadIndexRequest>::value) {
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(3));
auto status = stub->ReadIndex(&context, *req, resp);
if (!status.ok()) {
log -> error("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
std::string err_msg = ("read index failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions contrib/client-c/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
add_subdirectory (pd)

set(kvClient_sources)

list(APPEND kvClient_sources pd/Client.cc)
Expand All @@ -13,3 +11,5 @@ set(kvClient_INCLUDE_DIR ${kvClient_SOURCE_DIR}/include)
add_library(kv_client ${kvClient_sources})
target_include_directories(kv_client PUBLIC ${KVPROTO_INCLUDE_DIR} ${kvClient_INCLUDE_DIR})
target_link_libraries(kv_client kvproto ${Poco_Foundation_LIBRARY})

add_subdirectory (test)
1 change: 0 additions & 1 deletion contrib/client-c/src/pd/CMakeLists.txt

This file was deleted.

29 changes: 20 additions & 9 deletions contrib/client-c/src/pd/Client.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <pd/Client.h>
#include <common/CltException.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/create_channel.h>
#include <Poco/URI.h>
Expand Down Expand Up @@ -76,7 +77,9 @@ pdpb::GetMembersResponse Client::getMembers(std::string url)

auto status = pdpb::PD::NewStub(cc)->GetMembers(&context, pdpb::GetMembersRequest{}, &resp);
if (!status.ok()) {
log->error("get member failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
std::string err_msg = "get member failed: " + std::to_string(status.error_code()) + ": " + status.error_message();
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
return resp;
}
Expand All @@ -100,7 +103,7 @@ void Client::initClusterID() {
};
std::this_thread::sleep_for(std::chrono::seconds(1));
}
throw "failed to init cluster id";
throw Exception("failed to init cluster id", InitClusterIDFailed);
}

void Client::updateLeader() {
Expand All @@ -115,7 +118,7 @@ void Client::updateLeader() {
switchLeader(resp.leader().client_urls());
return;
}
throw "failed to update leader";
throw Exception("failed to update leader", UpdatePDLeaderFailed);
}

void Client::switchLeader(const ::google::protobuf::RepeatedPtrField<std::string>& leader_urls) {
Expand Down Expand Up @@ -162,8 +165,8 @@ void Client::leaderLoop() {
try {
check_leader = false;
updateLeader();
} catch (...) {
log->error("update leader failed.");
} catch (Exception & e) {
log->error(e.displayText());
}
}
}
Expand All @@ -186,7 +189,9 @@ uint64_t Client::getGCSafePoint() {

auto status = leaderStub()->GetGCSafePoint(&context, request, &response);
if (!status.ok()) {
log->error("get safe point failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
std::string err_msg = "get safe point failed: " + std::to_string(status.error_code()) + ": " + status.error_message();
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
return response.safe_point();
}
Expand All @@ -204,7 +209,9 @@ std::tuple<metapb::Region, metapb::Peer, metapb::Peer> Client::getRegion(std::st

auto status = leaderStub()->GetRegion(&context, request, &response);
if (!status.ok()) {
log->error("get region failed: " + std::to_string(status.error_code()) + " : " + status.error_message());
std::string err_msg = ("get region failed: " + std::to_string(status.error_code()) + " : " + status.error_message());
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
if (response.slaves_size() == 0) {
return std::make_tuple(response.region(), response.leader(), metapb::Peer::default_instance());
Expand All @@ -225,7 +232,9 @@ std::tuple<metapb::Region, metapb::Peer, metapb::Peer> Client::getRegionByID(uin

auto status = leaderStub()->GetRegionByID(&context, request, &response);
if (!status.ok()) {
log-> error("get region by id failed: " + std::to_string (status.error_code()) + ": " + status.error_message());
std::string err_msg = ("get region by id failed: " + std::to_string (status.error_code()) + ": " + status.error_message());
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
if (response.slaves_size() == 0) {
return std::make_tuple(response.region(), response.leader(), metapb::Peer::default_instance());
Expand All @@ -246,7 +255,9 @@ metapb::Store Client::getStore(uint64_t store_id) {

auto status = leaderStub()->GetStore(&context, request, &response);
if (!status.ok()) {
log-> error("get store failed: " + std::to_string (status.error_code()) + ": " + status.error_message());
std::string err_msg = ("get store failed: " + std::to_string (status.error_code()) + ": " + status.error_message());
log->error(err_msg);
throw Exception(err_msg, GRPCErrorCode);
}
return response.store();
}
Expand Down
14 changes: 0 additions & 14 deletions contrib/client-c/src/pd/mock/CMakeLists.txt

This file was deleted.

Loading

0 comments on commit d34d7c9

Please sign in to comment.