Skip to content

Commit

Permalink
[License Manager Adaption] Report graph/storage cpu cores to meta (ve…
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiee authored Mar 30, 2023
1 parent 6a64f08 commit 9c57b75
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 37 deletions.
19 changes: 19 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ namespace meta {
// 28800 secs is 8 hours.
const uint32 kLicenseCheckPeriodInSecs = 28800;

// Update the hardware information every hour.
const uint32 kHwUploadPeriodInSecs = 3600;

Indexes buildIndexes(std::vector<cpp2::IndexItem> indexItemVec);

MetaClient::MetaClient() : metadata_(new MetaData()) {}
Expand All @@ -86,6 +89,7 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool
FLAGS_enable_ssl || FLAGS_enable_meta_ssl);
updateActive();
updateLeader();
cpuCores_ = std::thread::hardware_concurrency();
bgThread_ = std::make_unique<thread::GenericWorker>();
LOG(INFO) << "Create meta client to " << active_;
LOG(INFO) << folly::sformat(
Expand Down Expand Up @@ -142,6 +146,8 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) {
// Repeatedly request and check license from meta
size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900);
bgThread_->addDelayTask(delayMS, &MetaClient::licenseCheckThreadFunc, this);
bgThread_->addDelayTask(delayMS, &MetaClient::updateHwThreadFunc, this);

return ready_;
}

Expand Down Expand Up @@ -3062,6 +3068,12 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
req.host_ref() = options_.localHost_;
req.role_ref() = options_.role_;
req.git_info_sha_ref() = options_.gitInfoSHA_;
req.cpu_cores_ref() = cpuCores_;

VLOG(2) << "Heartbeat to meta server, role: "
<< apache::thrift::util::enumNameSafe(req.get_role())
<< " cpu cores: " << *req.get_cpu_cores();

if (options_.role_ == cpp2::HostRole::STORAGE ||
options_.role_ == cpp2::HostRole::META_LISTENER ||
options_.role_ == cpp2::HostRole::STORAGE_LISTENER) {
Expand Down Expand Up @@ -4885,5 +4897,12 @@ void MetaClient::licenseCheckThreadFunc() {
LOG(INFO) << "[License] Scheduled license checking passed";
}

void MetaClient::updateHwThreadFunc() {
SCOPE_EXIT {
bgThread_->addDelayTask(kHwUploadPeriodInSecs * 1000, &MetaClient::updateHwThreadFunc, this);
};
cpuCores_ = std::thread::hardware_concurrency();
}

} // namespace meta
} // namespace nebula
7 changes: 5 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -866,19 +866,20 @@ class MetaClient : public BaseMetaClient {
return options_.localHost_.toString();
}

// TODO(Aiee) Deprecated when license manager is implemented
// Requests license content
StatusOr<cpp2::GetLicenseResp> getLicenseFromMeta();

// Repeatedly request and validate the license
void licenseCheckThreadFunc();
// Repeatedly update hardware information and store it in the cache
void updateHwThreadFunc();

protected:
// Return true if load succeeded.
bool loadData();
bool loadCfg();
void heartBeatThreadFunc();
// Periodically check if the meta service is enterprise version
void entMetaCheckThreadFunc();

bool registerCfg();
void updateGflagsValue(const cpp2::ConfigItem& item);
Expand Down Expand Up @@ -1070,6 +1071,8 @@ class MetaClient : public BaseMetaClient {
SessionMap sessionMap_;
folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>> killedPlans_;
std::atomic<MetaData*> metadata_;
// The cpu cores of the machine which will be validated by license manager
int32_t cpuCores_{0};
};

} // namespace meta
Expand Down
11 changes: 11 additions & 0 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ std::unique_ptr<nebula::kvstore::KVStore> initMetaKV(
LOG(ERROR) << "Update meta from V3 to V3_4 failed " << ret;
return nullptr;
}
ret = nebula::meta::MetaVersionMan::updateMetaV3_4ToV3_5(engine);
if (!ret.ok()) {
LOG(ERROR) << "Update meta from V3_4 to V3_5 failed " << ret;
return nullptr;
}
} else {
auto v = nebula::value(version);
LOG(INFO) << "Get meta version is " << static_cast<int32_t>(v);
Expand All @@ -224,6 +229,12 @@ std::unique_ptr<nebula::kvstore::KVStore> initMetaKV(
LOG(ERROR) << "Update meta from V2 to V3_4 failed " << ret;
return nullptr;
}
} else if (v == nebula::meta::MetaVersion::V3_4) {
auto ret = nebula::meta::MetaVersionMan::updateMetaV3_4ToV3_5(engine);
if (!ret.ok()) {
LOG(ERROR) << "Update meta from V3_4 to V3_5 failed " << ret;
return nullptr;
}
}
}
nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V3_4);
Expand Down
5 changes: 5 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ struct HBReq {
7: optional common.DirInfo dir,
// version of binary
8: optional binary version,
// used in the enterprise version
9: optional i32 cpu_cores = -1,
}

// service(agent/metad/storaged/graphd) info
Expand Down Expand Up @@ -1564,6 +1566,9 @@ service MetaService {
// Requests the enterprise license from meta
GetLicenseResp getLicense(1: GetLicenseReq req);

// Sends request to the license manager to check the resource usage
GetLicenseResp checkLicenseManager(1: GetLicenseReq req);

// Interfaces for backup and restore
CreateBackupResp createBackup(1: CreateBackupReq req);
RestoreMetaResp restoreMeta(1: RestoreMetaReq req);
Expand Down
26 changes: 26 additions & 0 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,31 @@ void LastUpdateTimeMan::update(kvstore::BatchHolder* batchHolder, const int64_t
MetaKeyUtils::lastUpdateTimeVal(timeInMilliSec));
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostInfo>> ActiveHostsMan::getHostInfoByRole(
kvstore::KVStore* kv, cpp2::HostRole role) {
// Get all alive hosts
auto activeHostsRet = getActiveHosts(kv, 0, role);
if (!nebula::ok(activeHostsRet)) {
return nebula::error(activeHostsRet);
}

std::vector<HostInfo> hostInfos;
// If no active hosts, return empty
auto activeHosts = nebula::value(activeHostsRet);
if (activeHosts.empty()) {
return hostInfos;
}
hostInfos.reserve(activeHosts.size());

// Get host info by host address
for (const auto& host : activeHosts) {
auto hostInfoRet = getHostInfo(kv, host);
if (!nebula::ok(hostInfoRet)) {
return nebula::error(hostInfoRet);
}
hostInfos.emplace_back(nebula::value(hostInfoRet));
}
return hostInfos;
}
} // namespace meta
} // namespace nebula
74 changes: 73 additions & 1 deletion src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@ struct HostInfo {
HostInfo() = default;
explicit HostInfo(int64_t lastHBTimeInMilliSec) : lastHBTimeInMilliSec_(lastHBTimeInMilliSec) {}

// For compatibility we keep the interface without cpuNum
HostInfo(int64_t lastHBTimeInMilliSec, cpp2::HostRole role, std::string gitInfoSha)
: lastHBTimeInMilliSec_(lastHBTimeInMilliSec),
role_(role),
gitInfoSha_(std::move(gitInfoSha)) {}
gitInfoSha_(std::move(gitInfoSha)) {
cpuNum_ = -1;
}

// With cpuNum
HostInfo(int64_t lastHBTimeInMilliSec,
cpp2::HostRole role,
std::string gitInfoSha,
int32_t cpuNum)
: lastHBTimeInMilliSec_(lastHBTimeInMilliSec),
role_(role),
gitInfoSha_(std::move(gitInfoSha)),
cpuNum_(cpuNum) {}

bool operator==(const HostInfo& that) const {
return this->lastHBTimeInMilliSec_ == that.lastHBTimeInMilliSec_;
Expand All @@ -34,6 +47,7 @@ struct HostInfo {
int64_t lastHBTimeInMilliSec_ = 0;
cpp2::HostRole role_{cpp2::HostRole::UNKNOWN};
std::string gitInfoSha_;
int32_t cpuNum_ = 0;

static HostInfo decode(const folly::StringPiece& data) {
if (data.size() == sizeof(int64_t)) {
Expand All @@ -56,6 +70,7 @@ struct HostInfo {
* sizeof(HostRole) hostRole
* size_t length of gitInfoSha
* string gitInfoSha
* int cpuNum
*
* @param info
* @return
Expand All @@ -74,6 +89,8 @@ struct HostInfo {
if (!info.gitInfoSha_.empty()) {
encode.append(info.gitInfoSha_.data(), len);
}

encode.append(reinterpret_cast<const char*>(&info.cpuNum_), sizeof(int32_t));
return encode;
}

Expand Down Expand Up @@ -107,6 +124,51 @@ struct HostInfo {
}

info.gitInfoSha_ = std::string(data.data() + offset, len);

offset += len;
if (offset + sizeof(int32_t) > data.size()) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
} else {
info.cpuNum_ = *reinterpret_cast<const int*>(data.data() + offset);
}

return info;
}

/**
* @brief Parse a serialized value to HostInfo.
* For compatibility, we keep the interface without cpuNum
* This method is only used when upgrading meta data from 3.4.0 to 3.5.0
*
* @param data
* @return
*/
static HostInfo decodeV3_4(const folly::StringPiece& data) {
HostInfo info;
size_t offset = sizeof(int8_t);

info.lastHBTimeInMilliSec_ = *reinterpret_cast<const int64_t*>(data.data() + offset);
offset += sizeof(int64_t);

if (data.size() - offset < sizeof(cpp2::HostRole)) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
}
info.role_ = *reinterpret_cast<const cpp2::HostRole*>(data.data() + offset);
offset += sizeof(cpp2::HostRole);

if (offset + sizeof(size_t) > data.size()) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
}
size_t len = *reinterpret_cast<const size_t*>(data.data() + offset);
offset += sizeof(size_t);

if (offset + len > data.size()) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
}

info.gitInfoSha_ = std::string(data.data() + offset, len);
// The cpuNum field is not exist in 3.4.0, set it to -1 and wait for update
info.cpuNum_ = -1;
return info;
}
};
Expand Down Expand Up @@ -206,6 +268,16 @@ class ActiveHostsMan final {
static ErrorOr<nebula::cpp2::ErrorCode, HostInfo> getHostInfo(kvstore::KVStore* kv,
const HostAddr& host);

/**
* @brief Get all alive host info by given host role
*
* @param kv From where to get
* @param hostRole
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostInfo>> getHostInfoByRole(
kvstore::KVStore* kv, cpp2::HostRole role);

protected:
ActiveHostsMan() = default;
};
Expand Down
Loading

0 comments on commit 9c57b75

Please sign in to comment.