diff --git a/src/common/datatypes/HostAddr.h b/src/common/datatypes/HostAddr.h index 16cfe542db0..016a6baa112 100644 --- a/src/common/datatypes/HostAddr.h +++ b/src/common/datatypes/HostAddr.h @@ -64,6 +64,10 @@ struct HostAddr { ha.port = std::stoi(str.substr(pos + 1)); return ha; } + + static HostAddr nullAddr() { + return HostAddr("", 0); + } }; inline std::ostream& operator<<(std::ostream& os, const HostAddr& addr) { diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index ede3ae3cc69..d6db1a01384 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -12,6 +12,7 @@ #include "common/base/Base.h" #include "common/datatypes/HostAddr.h" #include "common/thrift/ThriftTypes.h" +#include "common/utils/Types.h" #include "interface/gen-cpp2/common_types.h" namespace nebula { @@ -39,6 +40,182 @@ class KVFilter { const folly::StringPiece& val) const = 0; }; +/** + * @brief Part peer, including normal peer and learner peer. + * + */ +struct Peer { + enum class Status : uint8_t { + kNormalPeer = 0, + kLearner = 1, + kPromotedPeer = 2, + kMax = 3, + }; + + HostAddr addr; + Status status; + + Peer() : addr(), status(Status::kNormalPeer) {} + Peer(HostAddr a, Status s) : addr(a), status(s) {} + Peer(const Peer& other) : addr(other.addr), status(other.status) {} + + std::string toString() const { + return addr.toString() + "," + std::to_string(static_cast(status)); + } + + void fromString(const std::string& str) { + auto pos = str.find(","); + if (pos == std::string::npos) { + LOG(ERROR) << "Parse peer failed:" << str; + return; + } + addr = HostAddr::fromString(str.substr(0, pos)); + + int s = std::stoi(str.substr(pos + 1)); + if (s >= static_cast(Status::kMax) || s < 0) { + LOG(ERROR) << "Parse peer status failed:" << str; + return; + } + + status = static_cast(s); + } + + bool operator==(const Peer& rhs) const { + return addr == rhs.addr && status == rhs.status; + } + + bool operator!=(const Peer& rhs) const { + return !(*this == rhs); + } + + Peer& operator=(const Peer& rhs) { + addr = rhs.addr; + status = rhs.status; + return *this; + } + + static Peer nullPeer() { + return Peer(); + } +}; + +inline std::ostream& operator<<(std::ostream& os, const Peer& peer) { + return os << peer.toString(); +} + +/** + * @brief Peers for one partition, it should handle serializing and deserializing. + * + */ +struct Peers { + std::map peers; + + Peers() {} + explicit Peers(const std::vector& addrs) { // from normal peers + for (auto& addr : addrs) { + peers[addr] = Peer(addr, Peer::Status::kNormalPeer); + } + } + explicit Peers(const std::vector& ps) { + for (auto& p : ps) { + peers[p.addr] = p; + } + } + explicit Peers(std::map ps) : peers(std::move(ps)) {} + + void addOrUpdate(const Peer& peer) { + peers[peer.addr] = peer; + } + + bool get(const HostAddr& addr, Peer* peer) { + auto it = peers.find(addr); + if (it == peers.end()) { + return false; + } + + if (peer != nullptr) { + *peer = it->second; + } + return true; + } + + void remove(const HostAddr& addr) { + peers.erase(addr); + } + + size_t size() { + return peers.size(); + } + + std::map getPeers() { + return peers; + } + + std::string toString() const { + std::stringstream os; + os << "version:1," + << "count:" << peers.size() << "\n"; + for (const auto& [_, p] : peers) { + os << p << "\n"; + } + return os.str(); + } + + static std::pair extractHeader(const std::string& header) { + auto pos = header.find(":"); + if (pos == std::string::npos) { + LOG(ERROR) << "Parse part peers header error:" << header; + return {0, 0}; + } + int version = std::stoi(header.substr(pos + 1)); + pos = header.find(":", pos + 1); + if (pos == std::string::npos) { + LOG(ERROR) << "Parse part peers header error:" << header; + return {0, 0}; + } + int count = std::stoi(header.substr(pos + 1)); + + return {version, count}; + } + + static Peers fromString(const std::string& str) { + Peers peers; + int start = 0; + size_t peerCount = 0; + while (auto next = str.find("\n", start)) { + if (next == std::string::npos) { + if (peerCount != peers.size()) { + LOG(ERROR) << "Parse part peers error:" << str; + LOG(ERROR) << "Header count: " << peerCount + << " does not match real count:" << peers.size(); + } + break; + } + + auto line = str.substr(start, next - start); + if (start == 0) { // header + auto [version, count] = extractHeader(line); + if (version != 1) { + LOG(ERROR) << "Parse version failed:" << version; + } + peerCount = count; + } else { // peers + Peer peer; + peer.fromString(line); + peers.addOrUpdate(peer); + } + + start = next + 1; + } + + return peers; + } +}; + +inline std::ostream& operator<<(std::ostream& os, const Peers& peers) { + return os << peers.toString(); +} + using KV = std::pair; using KVCallback = folly::Function; using NewLeaderCallback = folly::Function; diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 71902c12b89..172ebd462ec 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -217,11 +217,22 @@ class KVEngine { virtual nebula::cpp2::ErrorCode removeRange(const std::string& start, const std::string& end) = 0; /** - * @brief Add a partition to kv engine + * @brief Add partId into current storage engine. * - * @param partId Partition id to add + * @param partId + * @param raftPeers partition raft peers, including peers created during balance which are not in + * meta + */ + virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0; + + /** + * @brief Update part info. Only update peers info now, and could only update one peer each time. + * + * @param partId + * @param raftPeer 1. if raftPeer is Peer::nullPeer(), delete this peer + * 2. if raftPeer is not null, add or update this peer */ - virtual void addPart(PartitionID partId) = 0; + virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0; /** * @brief Remove a partition from kv engine @@ -231,12 +242,19 @@ class KVEngine { virtual void removePart(PartitionID partId) = 0; /** - * @brief Return all partIds in kv engine + * @brief Return all parts current engine holds. * * @return std::vector Partition ids */ virtual std::vector allParts() = 0; + /** + * @brief Return all partId->peers that current storage engine holds. + * + * @return std::map partId-> peers for each part, including learners + */ + virtual std::map allPartPeers() = 0; + /** * @brief Return total parts num * diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 731df41fcde..c2890fa8a0e 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -63,6 +63,8 @@ bool NebulaStore::init() { // todo(doodle): we could support listener and normal storage start at same // instance if (!isListener()) { + // TODO(spw): need to refactor, we could load data from local regardless of partManager, + // then adjust the data in loadPartFromPartManager. loadPartFromDataPath(); loadPartFromPartManager(); loadRemoteListenerFromPartManager(); @@ -96,14 +98,6 @@ void NebulaStore::loadPartFromDataPath() { continue; } - if (!options_.partMan_->spaceExist(storeSvcAddr_, spaceId).ok()) { - if (FLAGS_auto_remove_invalid_space) { - auto spaceDir = folly::stringPrintf("%s/%s", rootPath.c_str(), dir.c_str()); - removeSpaceDir(spaceDir); - } - continue; - } - KVEngine* enginePtr = nullptr; { folly::RWSpinLock::WriteHolder wh(&lock_); @@ -118,43 +112,93 @@ void NebulaStore::loadPartFromDataPath() { } // partIds is the partition in this host waiting to open - std::vector partIds; - for (auto& partId : enginePtr->allParts()) { - if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok()) { - LOG(INFO) << "Part " << partId << " does not exist any more, remove it!"; + std::map partPeers; + for (auto& [partId, peers] : enginePtr->allPartPeers()) { + bool isNormalPeer = true; + + Peer peer; + bool exist = peers.get(raftAddr_, &peer); + if (exist) { + if (peer.status != Peer::Status::kNormalPeer) { + isNormalPeer = false; + } + } + + if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok() && isNormalPeer) { + LOG(INFO) << "Part " << partId + << " is a normal peer and does not exist in meta any more, will remove it!"; enginePtr->removePart(partId); continue; } else { auto spacePart = std::make_pair(spaceId, partId); if (spacePartIdSet.find(spacePart) == spacePartIdSet.end()) { spacePartIdSet.emplace(spacePart); - partIds.emplace_back(partId); + partPeers.emplace(partId, peers); } } } - if (partIds.empty()) { + if (partPeers.empty()) { + if (!options_.partMan_->spaceExist(storeSvcAddr_, spaceId).ok()) { + if (FLAGS_auto_remove_invalid_space) { + auto spaceDir = folly::stringPrintf("%s/%s", rootPath.c_str(), dir.c_str()); + removeSpaceDir(spaceDir); + } + } continue; } - std::atomic counter(partIds.size()); + std::atomic counter(partPeers.size()); folly::Baton baton; - LOG(INFO) << "Need to open " << partIds.size() << " parts of space " << spaceId; - for (auto& partId : partIds) { - bgWorkers_->addTask([spaceId, partId, enginePtr, &counter, &baton, this]() mutable { - auto part = newPart(spaceId, partId, enginePtr, false, {}); - LOG(INFO) << "Load part " << spaceId << ", " << partId << " from disk"; - - { - folly::RWSpinLock::WriteHolder holder(&lock_); - auto iter = spaces_.find(spaceId); - CHECK(iter != spaces_.end()); - iter->second->parts_.emplace(partId, part); - } - counter.fetch_sub(1); - if (counter.load() == 0) { - baton.post(); - } - }); + LOG(INFO) << "Need to open " << partPeers.size() << " parts of space " << spaceId; + for (auto& it : partPeers) { + auto& partId = it.first; + Peers& peers = it.second; + + bgWorkers_->addTask( + [spaceId, partId, &peers, enginePtr, &counter, &baton, this]() mutable { + // create part + bool isLearner = false; + std::vector addrs; // raft peers + for (auto& [addr, peer] : peers.getPeers()) { + if (addr == raftAddr_) { // self + if (peer.status == Peer::Status::kLearner) { + isLearner = true; + } + } else { // others + if (peer.status == Peer::Status::kNormalPeer || + peer.status == Peer::Status::kPromotedPeer) { + addrs.emplace_back(addr); + } + } + } + auto part = newPart(spaceId, partId, enginePtr, isLearner, addrs); + LOG(INFO) << "Load part " << spaceId << ", " << partId << " from disk"; + + // add learner peers + if (!isLearner) { + for (auto& [addr, peer] : peers.getPeers()) { + if (addr == raftAddr_) { + continue; + } + + if (peer.status == Peer::Status::kLearner) { + part->addLearner(addr); + } + } + } + + // add part to space + { + folly::RWSpinLock::WriteHolder holder(&lock_); + auto iter = spaces_.find(spaceId); + CHECK(iter != spaces_.end()); + iter->second->parts_.emplace(partId, part); + } + counter.fetch_sub(1); + if (counter.load() == 0) { + baton.post(); + } + }); } baton.wait(); LOG(INFO) << "Load space " << spaceId << " complete"; @@ -170,14 +214,15 @@ void NebulaStore::loadPartFromPartManager() { auto partsMap = options_.partMan_->parts(storeSvcAddr_); for (auto& entry : partsMap) { auto spaceId = entry.first; + auto& partPeers = entry.second; addSpace(spaceId); std::vector partIds; - for (auto it = entry.second.begin(); it != entry.second.end(); it++) { + for (auto it = partPeers.begin(); it != partPeers.end(); it++) { partIds.emplace_back(it->first); } std::sort(partIds.begin(), partIds.end()); for (auto& partId : partIds) { - addPart(spaceId, partId, false); + addPart(spaceId, partId, false, partPeers[partId].hosts_); } } } @@ -297,13 +342,18 @@ void NebulaStore::addPart(GraphSpaceID spaceId, bool asLearner, const std::vector& peers) { folly::RWSpinLock::WriteHolder wh(&lock_); + std::vector raftPeers; + for (auto& p : peers) { + raftPeers.push_back(getRaftAddr(p)); + } + auto spaceIt = this->spaces_.find(spaceId); CHECK(spaceIt != this->spaces_.end()) << "Space should exist!"; auto partIt = spaceIt->second->parts_.find(partId); if (partIt != spaceIt->second->parts_.end()) { LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] has existed!"; - if (!peers.empty()) { - partIt->second->checkAndResetPeers(peers); + if (!raftPeers.empty()) { + partIt->second->checkAndResetPeers(raftPeers); } return; } @@ -322,10 +372,14 @@ void NebulaStore::addPart(GraphSpaceID spaceId, CHECK_GE(minIndex, 0) << "engines number:" << engines.size(); const auto& targetEngine = engines[minIndex]; - // Write the information into related engine. - targetEngine->addPart(partId); - spaceIt->second->parts_.emplace(partId, - newPart(spaceId, partId, targetEngine.get(), asLearner, peers)); + Peers peersToPersist(raftPeers); + if (asLearner) { + peersToPersist.addOrUpdate(Peer(raftAddr_, Peer::Status::kLearner)); + } + targetEngine->addPart(partId, peersToPersist); + + spaceIt->second->parts_.emplace( + partId, newPart(spaceId, partId, targetEngine.get(), asLearner, raftPeers)); LOG(INFO) << "Space " << spaceId << ", part " << partId << " has been added, asLearner " << asLearner; } @@ -334,7 +388,7 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, PartitionID partId, KVEngine* engine, bool asLearner, - const std::vector& defaultPeers) { + const std::vector& raftPeers) { auto walPath = folly::stringPrintf("%s/wal/%d", engine->getWalRoot(), partId); auto part = std::make_shared(spaceId, partId, @@ -348,37 +402,18 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, clientMan_, diskMan_, getSpaceVidLen(spaceId)); - std::vector peers; - if (defaultPeers.empty()) { - // pull the information from meta - auto metaStatus = options_.partMan_->partMeta(spaceId, partId); - if (!metaStatus.ok()) { - LOG(ERROR) << folly::sformat("Can't find space {} part {} from meta: {}", - spaceId, - partId, - metaStatus.status().toString()); - return nullptr; - } - - auto partMeta = metaStatus.value(); - for (auto& h : partMeta.hosts_) { - if (h != storeSvcAddr_) { - peers.emplace_back(getRaftAddr(h)); - VLOG(1) << "Add peer " << peers.back(); - } - } - } else { - for (auto& h : defaultPeers) { - if (h != raftAddr_) { - peers.emplace_back(h); - } + std::vector peersWithoutMe; + for (auto& p : raftPeers) { + if (p != raftAddr_) { + peersWithoutMe.push_back(p); } } + raftService_->addPartition(part); for (auto& func : onNewPartAdded_) { func.second(part); } - part->start(std::move(peers), asLearner); + part->start(std::move(peersWithoutMe), asLearner); diskMan_->addPartToPath(spaceId, partId, engine->getDataRoot()); return part; } diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 6d032d62e86..02223290f05 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -49,6 +49,7 @@ struct SpaceListenerInfo { class NebulaStore : public KVStore, public Handler { FRIEND_TEST(NebulaStoreTest, SimpleTest); FRIEND_TEST(NebulaStoreTest, PartsTest); + FRIEND_TEST(NebulaStoreTest, PersistPeersTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); FRIEND_TEST(NebulaStoreTest, TransLeaderTest); FRIEND_TEST(NebulaStoreTest, CheckpointTest); @@ -551,12 +552,12 @@ class NebulaStore : public KVStore, public Handler { * @param spaceId * @param partId * @param asLearner Whether start partition as learner - * @param peers Raft peers + * @param peers Storage peers, do not contain learner address */ void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner, - const std::vector& peers = {}) override; + const std::vector& peers) override; /** * @brief Remove a space, called from part manager @@ -760,14 +761,14 @@ class NebulaStore : public KVStore, public Handler { * @param partId * @param engine Partition's related kv engine * @param asLearner Whether start as raft learner - * @param defaultPeers The raft peer's address + * @param peers All partition raft peers * @return std::shared_ptr */ std::shared_ptr newPart(GraphSpaceID spaceId, PartitionID partId, KVEngine* engine, bool asLearner, - const std::vector& defaultPeers); + const std::vector& raftPeers); /** * @brief Start a new listener part diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index ddeaef02b65..af2b643f247 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -9,6 +9,7 @@ #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "common/utils/OperationKeyUtils.h" +#include "common/utils/Utils.h" #include "kvstore/LogEncoder.h" #include "kvstore/RocksEngineConfig.h" @@ -409,6 +410,9 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const if (ts > startTimeMs_) { VLOG(1) << idStr_ << "preprocess add learner " << learner; addLearner(learner); + + // persist the part learner info in case of storaged restarting + engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); } else { VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " << ts; @@ -434,6 +438,9 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const if (ts > startTimeMs_) { VLOG(1) << idStr_ << "preprocess add peer " << peer; addPeer(peer); + + // persist the part info in case of storaged restarting + engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); } else { VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " << ts; @@ -446,6 +453,9 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const if (ts > startTimeMs_) { VLOG(1) << idStr_ << "preprocess remove peer " << peer; preProcessRemovePeer(peer); + + // remove peer in the persist info + engine_->updatePart(partId_, Peer::nullPeer()); } else { VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " << ts; diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 64ccdab9f72..7508cd65d11 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -160,7 +160,7 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated( void MetaServerBasedPartManager::onPartAdded(const meta::PartHosts& partMeta) { if (handler_ != nullptr) { - handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, {}); + handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, partMeta.hosts_); } } diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 0e2527f89fb..cc2a2fc91dd 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -206,6 +206,7 @@ class PartManager { class MemPartManager final : public PartManager { FRIEND_TEST(NebulaStoreTest, SimpleTest); FRIEND_TEST(NebulaStoreTest, PartsTest); + FRIEND_TEST(NebulaStoreTest, PersistPeersTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); FRIEND_TEST(NebulaStoreTest, TransLeaderTest); FRIEND_TEST(NebulaStoreTest, CheckpointTest); @@ -259,7 +260,7 @@ class MemPartManager final : public PartManager { handler_->addSpace(spaceId); } if (noPart && handler_) { - handler_->addPart(spaceId, partId, false, {}); + handler_->addPart(spaceId, partId, false, peers); } } diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 43e92cca92f..60c81f35486 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -363,14 +363,35 @@ std::string RocksEngine::partKey(PartitionID partId) { return NebulaKeyUtils::systemPartKey(partId); } -void RocksEngine::addPart(PartitionID partId) { - auto ret = put(partKey(partId), ""); +void RocksEngine::addPart(PartitionID partId, const Peers& raftPeers) { + auto ret = put(partKey(partId), raftPeers.toString()); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { partsNum_++; CHECK_GE(partsNum_, 0); } } +void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) { + std::string val; + auto ret = get(partKey(partId), &val); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Update part failed when get, partId=" << partId; + return; + } + + auto peers = Peers::fromString(val); + if (raftPeer == Peer::nullPeer()) { + peers.remove(raftPeer.addr); + } else { + peers.addOrUpdate(raftPeer); + } + + ret = put(partKey(partId), peers.toString()); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Update part failed when put back, partId=" << partId; + } +} + void RocksEngine::removePart(PartitionID partId) { rocksdb::WriteOptions options; options.disableWAL = FLAGS_rocksdb_disable_wal; @@ -410,6 +431,33 @@ std::vector RocksEngine::allParts() { return parts; } +std::map RocksEngine::allPartPeers() { + std::unique_ptr iter; + std::map partPeers; + static const std::string prefixStr = NebulaKeyUtils::systemPrefix(); + auto retCode = this->prefix(prefixStr, &iter); + if (nebula::cpp2::ErrorCode::SUCCEEDED != retCode) { + return partPeers; + } + + while (iter->valid()) { + auto key = iter->key(); + CHECK_EQ(key.size(), sizeof(PartitionID) + sizeof(NebulaSystemKeyType)); + PartitionID partId = *reinterpret_cast(key.data()); + if (!NebulaKeyUtils::isSystemPart(key)) { + VLOG(3) << "Skip: " << std::bitset<32>(partId); + iter->next(); + continue; + } + + auto peers = Peers::fromString(iter->val().toString()); + partId = partId >> 8; + partPeers.emplace(partId, peers); + iter->next(); + } + return partPeers; +} + int32_t RocksEngine::totalPartsNum() { return partsNum_; } diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 0fb9ecf2c11..bb898fac757 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -346,13 +346,23 @@ class RocksEngine : public KVEngine { /********************* * Non-data operation ********************/ - /** * @brief Write the part key into rocksdb for persistance * * @param partId + * @param raftPeers partition raft peers, including peers created during balance which are not in + * the meta + */ + void addPart(PartitionID partId, const Peers& raftPeers = {}) override; + + /** + * @brief Update part info. Could only update the peer info and update one peer each time. + * + * @param partId + * @param raftPeer 1. if raftPeer is Peer::nullPeer(), delete this peer + * 2. if raftPeer is not null, add or update this peer */ - void addPart(PartitionID partId) override; + void updatePart(PartitionID partId, const Peer& raftPeer) override; /** * @brief Remove the part key from rocksdb @@ -362,12 +372,19 @@ class RocksEngine : public KVEngine { void removePart(PartitionID partId) override; /** - * @brief Return all partitions in rocksdb instance by scanning system part key + * @brief Return all partitions in rocksdb instance by scanning system part key. * - * @return std::vector Partition ids + * @return std::vector */ std::vector allParts() override; + /** + * @brief Retrun all the part->peers in rocksdb engine by scanning system part key. + * + * @return std::map + */ + std::map allPartPeers() override; + /** * @brief Return total partition numbers */ diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 34d3c7e7f42..e4cfde936cc 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -377,7 +377,7 @@ nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) { } void RaftPart::addLearner(const HostAddr& addr) { - CHECK(!raftLock_.try_lock()); + raftLock_.try_lock(); if (addr == addr_) { VLOG(1) << idStr_ << "I am learner!"; return; diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index 1954c2c9f0d..0f0276c74ec 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -43,6 +43,21 @@ set(KVSTORE_TEST_LIBS $ ) +nebula_add_test( + NAME + kvstore_common_test + SOURCES + CommonTest.cpp + OBJECTS + ${KVSTORE_TEST_LIBS} + LIBRARIES + ${THRIFT_LIBRARIES} + ${ROCKSDB_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) + nebula_add_test( NAME part_test diff --git a/src/kvstore/test/CommonTest.cpp b/src/kvstore/test/CommonTest.cpp new file mode 100644 index 00000000000..6f30b950081 --- /dev/null +++ b/src/kvstore/test/CommonTest.cpp @@ -0,0 +1,69 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include + +#include "common/base/Base.h" +#include "kvstore/Common.h" + +namespace nebula { +namespace kvstore { + +TEST(KvStoreCommonTest, PeerTest) { + Peer p; + ASSERT_EQ(p, Peer::nullPeer()); +} + +TEST(KvStoreCommonTest, PeersTest) { + // null peers + Peers nullPeers; + auto nullPeers1 = Peers::fromString(nullPeers.toString()); + ASSERT_EQ(nullPeers1.size(), 0); + + // one peer + Peers onePeer; + HostAddr addr1("1", 1); + onePeer.addOrUpdate(Peer(addr1, Peer::Status::kPromotedPeer)); + auto onePeer1 = Peers::fromString(onePeer.toString()); + ASSERT_EQ(onePeer1.size(), 1); + Peer p1; + onePeer.get(addr1, &p1); + ASSERT_EQ(p1.addr, addr1); + + // many peers + std::vector addrs; + size_t peerCount = 10; + for (size_t i = 1; i <= peerCount; ++i) { + addrs.push_back(HostAddr(std::to_string(i), static_cast(i))); + } + Peers manyPeers(addrs); + + HostAddr laddr(std::to_string(peerCount + 1), peerCount + 1); + manyPeers.addOrUpdate(Peer(laddr, Peer::Status::kLearner)); // add one learner + + ASSERT_EQ(manyPeers.size(), peerCount + 1); + Peers manyPeers1 = Peers::fromString(manyPeers.toString()); + ASSERT_EQ(manyPeers1.size(), peerCount + 1); + for (size_t i = 1; i <= peerCount; ++i) { + HostAddr addr(std::to_string(i), static_cast(i)); + Peer p; + manyPeers.get(addr, &p); + ASSERT_EQ(p, Peer(addr, Peer::Status::kNormalPeer)); + } + + Peer p; + manyPeers.get(laddr, &p); + ASSERT_EQ(p, Peer(laddr, Peer::Status::kLearner)); +} + +} // namespace kvstore +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index fcd96929f06..35b223d3777 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -246,6 +246,93 @@ TEST(NebulaStoreTest, PartsTest) { ASSERT_TRUE(store->spaces_.find(0) == store->spaces_.end()); } +TEST(NebulaStoreTest, PersistPeersTest) { + fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX"); + auto ioThreadPool = std::make_shared(4); + auto partMan = std::make_unique(); + + // GraphSpaceID => {PartitionIDs} + // 0 => {0, 1, 2, 3...9} + // The parts on PartMan is 0...9 + for (auto partId = 0; partId < 10; partId++) { + partMan->addPart(0, partId); + } + + std::vector paths; + paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath.path())); + paths.emplace_back(folly::stringPrintf("%s/disk2", rootPath.path())); + + HostAddr local = {"", 0}; + for (size_t i = 0; i < paths.size(); i++) { + auto db = std::make_unique(0, /* spaceId */ + kDefaultVidLen, + paths[i]); + for (auto partId = 0; partId < 3; partId++) { + db->addPart(5 * i + partId); + } + + if (i == 0) { + Peers peers; + peers.addOrUpdate(Peer(local, Peer::Status::kLearner)); + db->addPart(5 * i + 10, peers); + } else { + Peers peers; + peers.addOrUpdate(Peer(local, Peer::Status::kPromotedPeer)); + db->addPart(5 * i + 10, peers); + } + auto parts = db->allParts(); + dump(parts); + } + // Currently, the disks hold parts as below: + // disk1: 0, 1, 2, 10 + // disk2: 5, 6, 7, 15 + + KVOptions options; + options.dataPaths_ = std::move(paths); + options.partMan_ = std::move(partMan); + auto store = + std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); + store->init(); + auto check = [&](GraphSpaceID spaceId) { + for (int i = 0; i < static_cast(paths.size()); i++) { + ASSERT_EQ(folly::stringPrintf("%s/disk%d/nebula/%d", rootPath.path(), i + 1, spaceId), + store->spaces_[spaceId]->engines_[i]->getDataRoot()); + auto parts = store->spaces_[spaceId]->engines_[i]->allParts(); + dump(parts); + if (i == 0) { + ASSERT_EQ(6, parts.size()); + } else { + ASSERT_EQ(6, parts.size()); + } + } + }; + check(0); + // After init, the parts should be 0-9, and the distribution should be + // disk1: 0, 1, 2, 3, 8, 10 + // disk2: 4, 5, 6, 7, 9, 15 + // After restart, the original order should not been broken. + { + auto parts = store->spaces_[0]->engines_[0]->allParts(); + dump(parts); + ASSERT_EQ(0, parts[0]); + ASSERT_EQ(1, parts[1]); + ASSERT_EQ(2, parts[2]); + ASSERT_EQ(3, parts[3]); + ASSERT_EQ(8, parts[4]); + ASSERT_EQ(10, parts[5]); + } + { + auto parts = store->spaces_[0]->engines_[1]->allParts(); + dump(parts); + ASSERT_EQ(4, parts[0]); + ASSERT_EQ(5, parts[1]); + ASSERT_EQ(6, parts[2]); + ASSERT_EQ(7, parts[3]); + ASSERT_EQ(9, parts[4]); + ASSERT_EQ(15, parts[5]); + } +} + TEST(NebulaStoreTest, ThreeCopiesTest) { fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX"); auto initNebulaStore = [](const std::vector& peers, diff --git a/src/storage/admin/AdminProcessor.h b/src/storage/admin/AdminProcessor.h index 534271dfb5c..d6d8511f0ec 100644 --- a/src/storage/admin/AdminProcessor.h +++ b/src/storage/admin/AdminProcessor.h @@ -140,11 +140,7 @@ class AddPartProcessor : public BaseProcessor { LOG(INFO) << "Space " << spaceId << " not exist, create it!"; store->addSpace(spaceId); } - std::vector peers; - for (auto& p : req.get_peers()) { - peers.emplace_back(kvstore::NebulaStore::getRaftAddr(p)); - } - store->addPart(spaceId, partId, req.get_as_learner(), peers); + store->addPart(spaceId, partId, req.get_as_learner(), req.get_peers()); onFinished(); } @@ -328,6 +324,11 @@ class CheckPeersProcessor : public BaseProcessor { peers.emplace_back(kvstore::NebulaStore::getRaftAddr(p)); } part->checkAndResetPeers(peers); + + for (auto p : peers) { + // change the promoted peer to the normal peer when finish balancing + part->engine()->updatePart(partId, kvstore::Peer(p, kvstore::Peer::Status::kNormalPeer)); + } this->onFinished(); }