Skip to content

Commit

Permalink
persist peer info when balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
pengweisong committed Mar 1, 2022
1 parent 43f2131 commit 98a2dae
Show file tree
Hide file tree
Showing 15 changed files with 571 additions and 88 deletions.
4 changes: 4 additions & 0 deletions src/common/datatypes/HostAddr.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ struct HostAddr {
ha.port = std::stoi(str.substr(pos + 1));
return ha;
}

static HostAddr nullAddr() {
return HostAddr("", 0);
}
};

inline std::ostream& operator<<(std::ostream& os, const HostAddr& addr) {
Expand Down
177 changes: 177 additions & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/base/Base.h"
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "common/utils/Types.h"
#include "interface/gen-cpp2/common_types.h"

namespace nebula {
Expand Down Expand Up @@ -39,6 +40,182 @@ class KVFilter {
const folly::StringPiece& val) const = 0;
};

/**
* @brief Part peer, including normal peer and learner peer.
*
*/
struct Peer {
enum class Status : uint8_t {
kNormalPeer = 0,
kLearner = 1,
kPromotedPeer = 2,
kMax = 3,
};

HostAddr addr;
Status status;

Peer() : addr(), status(Status::kNormalPeer) {}
Peer(HostAddr a, Status s) : addr(a), status(s) {}
Peer(const Peer& other) : addr(other.addr), status(other.status) {}

std::string toString() const {
return addr.toString() + "," + std::to_string(static_cast<int>(status));
}

void fromString(const std::string& str) {
auto pos = str.find(",");
if (pos == std::string::npos) {
LOG(ERROR) << "Parse peer failed:" << str;
return;
}
addr = HostAddr::fromString(str.substr(0, pos));

int s = std::stoi(str.substr(pos + 1));
if (s >= static_cast<int>(Status::kMax) || s < 0) {
LOG(ERROR) << "Parse peer status failed:" << str;
return;
}

status = static_cast<Status>(s);
}

bool operator==(const Peer& rhs) const {
return addr == rhs.addr && status == rhs.status;
}

bool operator!=(const Peer& rhs) const {
return !(*this == rhs);
}

Peer& operator=(const Peer& rhs) {
addr = rhs.addr;
status = rhs.status;
return *this;
}

static Peer nullPeer() {
return Peer();
}
};

inline std::ostream& operator<<(std::ostream& os, const Peer& peer) {
return os << peer.toString();
}

/**
* @brief Peers for one partition, it should handle serializing and deserializing.
*
*/
struct Peers {
std::map<HostAddr, Peer> peers;

Peers() {}
explicit Peers(const std::vector<HostAddr>& addrs) { // from normal peers
for (auto& addr : addrs) {
peers[addr] = Peer(addr, Peer::Status::kNormalPeer);
}
}
explicit Peers(const std::vector<Peer>& ps) {
for (auto& p : ps) {
peers[p.addr] = p;
}
}
explicit Peers(std::map<HostAddr, Peer> ps) : peers(std::move(ps)) {}

void addOrUpdate(const Peer& peer) {
peers[peer.addr] = peer;
}

bool get(const HostAddr& addr, Peer* peer) {
auto it = peers.find(addr);
if (it == peers.end()) {
return false;
}

if (peer != nullptr) {
*peer = it->second;
}
return true;
}

void remove(const HostAddr& addr) {
peers.erase(addr);
}

size_t size() {
return peers.size();
}

std::map<HostAddr, Peer> getPeers() {
return peers;
}

std::string toString() const {
std::stringstream os;
os << "version:1,"
<< "count:" << peers.size() << "\n";
for (const auto& [_, p] : peers) {
os << p << "\n";
}
return os.str();
}

static std::pair<int, int> extractHeader(const std::string& header) {
auto pos = header.find(":");
if (pos == std::string::npos) {
LOG(ERROR) << "Parse part peers header error:" << header;
return {0, 0};
}
int version = std::stoi(header.substr(pos + 1));
pos = header.find(":", pos + 1);
if (pos == std::string::npos) {
LOG(ERROR) << "Parse part peers header error:" << header;
return {0, 0};
}
int count = std::stoi(header.substr(pos + 1));

return {version, count};
}

static Peers fromString(const std::string& str) {
Peers peers;
int start = 0;
size_t peerCount = 0;
while (auto next = str.find("\n", start)) {
if (next == std::string::npos) {
if (peerCount != peers.size()) {
LOG(ERROR) << "Parse part peers error:" << str;
LOG(ERROR) << "Header count: " << peerCount
<< " does not match real count:" << peers.size();
}
break;
}

auto line = str.substr(start, next - start);
if (start == 0) { // header
auto [version, count] = extractHeader(line);
if (version != 1) {
LOG(ERROR) << "Parse version failed:" << version;
}
peerCount = count;
} else { // peers
Peer peer;
peer.fromString(line);
peers.addOrUpdate(peer);
}

start = next + 1;
}

return peers;
}
};

inline std::ostream& operator<<(std::ostream& os, const Peers& peers) {
return os << peers.toString();
}

using KV = std::pair<std::string, std::string>;
using KVCallback = folly::Function<void(nebula::cpp2::ErrorCode code)>;
using NewLeaderCallback = folly::Function<void(HostAddr nLeader)>;
Expand Down
26 changes: 22 additions & 4 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,22 @@ class KVEngine {
virtual nebula::cpp2::ErrorCode removeRange(const std::string& start, const std::string& end) = 0;

/**
* @brief Add a partition to kv engine
* @brief Add partId into current storage engine.
*
* @param partId Partition id to add
* @param partId
* @param raftPeers partition raft peers, including peers created during balance which are not in
* meta
*/
virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0;

/**
* @brief Update part info. Only update peers info now, and could only update one peer each time.
*
* @param partId
* @param raftPeer 1. if raftPeer is Peer::nullPeer(), delete this peer
* 2. if raftPeer is not null, add or update this peer
*/
virtual void addPart(PartitionID partId) = 0;
virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0;

/**
* @brief Remove a partition from kv engine
Expand All @@ -231,12 +242,19 @@ class KVEngine {
virtual void removePart(PartitionID partId) = 0;

/**
* @brief Return all partIds in kv engine
* @brief Return all parts current engine holds.
*
* @return std::vector<PartitionID> Partition ids
*/
virtual std::vector<PartitionID> allParts() = 0;

/**
* @brief Return all partId->peers that current storage engine holds.
*
* @return std::map<PartitionID, Peers> partId-> peers for each part, including learners
*/
virtual std::map<PartitionID, Peers> allPartPeers() = 0;

/**
* @brief Return total parts num
*
Expand Down
Loading

0 comments on commit 98a2dae

Please sign in to comment.