Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9058][SDK] Limit the number of inlong-groupid and inlong-streamid #9059

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class SdkConfig {
inlong_group_ids_; // Initialize the inlong groupid collection
uint32_t recv_buf_size_; // Receive buf size, tid granularity
uint32_t send_buf_size_; // Send buf size, bid granularity
uint32_t max_group_id_num_; // Send buf size, bid granularity
uint32_t max_stream_id_num_; // Send buf size, bid granularity

// thread parameters
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ void SdkConfig::defaultInit() {
send_buf_size_ = constants::kSendBufSize;
recv_buf_size_ = constants::kRecvBufSize;
max_msg_size_ = constants::kExtPackSize;
max_group_id_num_ = constants::kMaxGroupIdNum;
max_stream_id_num_=constants::kMaxStreamIdNum;

// Packaging parameters
enable_pack_ = constants::kEnablePack;
Expand Down Expand Up @@ -159,6 +161,22 @@ void SdkConfig::InitCacheParam(const rapidjson::Value &doc) {
} else {
send_buf_size_ = constants::kSendBufSize;
}

if (doc.HasMember("max_group_id_num") && doc["max_group_id_num"].IsInt() &&
doc["max_group_id_num"].GetInt() > 0) {
const rapidjson::Value &obj = doc["max_group_id_num"];
max_group_id_num_ = obj.GetInt();
} else {
max_group_id_num_ = constants::kMaxGroupIdNum;
}

if (doc.HasMember("max_stream_id_num") && doc["max_stream_id_num"].IsInt() &&
doc["max_stream_id_num"].GetInt() > 0) {
const rapidjson::Value &obj = doc["max_stream_id_num"];
max_stream_id_num_ = obj.GetInt();
} else {
max_stream_id_num_ = constants::kMaxGroupIdNum;
}
}

void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
Expand Down Expand Up @@ -440,6 +458,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("need_auth: " << need_auth_ ? "true" : "false");
LOG_INFO("auth_id: " << auth_id_.c_str());
LOG_INFO("auth_key: " << auth_key_.c_str());
LOG_INFO("max_group_id_num: " << max_group_id_num_);
LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
}

} // namespace inlong
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

#include "api_imp.h"
#include "../manager/proxy_manager.h"
#include "../utils/capi_constant.h"
#include "../utils/logger.h"
#include "../utils/utils.h"

#include "api_code.h"
#include <iostream>
#include <signal.h>
Expand All @@ -36,7 +38,7 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
return SdkCode::kErrorInit;
}
max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
SdkConfig::getInstance()->pack_size_) - constants::ATTR_LENGTH;
local_ip_ = SdkConfig::getInstance()->local_ip_;

return DoInit();
Expand Down Expand Up @@ -75,8 +77,8 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id,
auto recv_group =
recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id);
if (recv_group == nullptr) {
LOG_ERROR("fail to get pack queue, inlong_group_id:%s, inlong_stream_id:%s"
<< inlong_group_id.c_str() << inlong_stream_id.c_str());
LOG_ERROR("fail to get recv group, inlong_group_id:"
<< inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
return SdkCode::kFailGetRevGroup;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
#include "recv_group.h"

#include "../protocol/msg_protocol.h"
#include "../utils/capi_constant.h"
#include "../utils/utils.h"
#include "api_code.h"
#include <cstdlib>
#include <functional>

namespace inlong {
const uint32_t ATTR_LENGTH = 10;
const uint32_t DEFAULT_PACK_ATTR = 400;
RecvGroup::RecvGroup(const std::string &inlong_group_id,
const std::string &inlong_stream_id,
Expand Down Expand Up @@ -96,13 +96,13 @@ int32_t RecvGroup::DoDispatchMsg() {
std::vector<SdkMsgPtr> msgs_to_dispatch;
while (!msgs_.empty()) {
SdkMsgPtr msg = msgs_.front();
if (msg->msg_.size() + total_length + ATTR_LENGTH >
if (msg->msg_.size() + total_length + constants::ATTR_LENGTH >
SdkConfig::getInstance()->pack_size_) {
break;
}
msgs_to_dispatch.push_back(msg);
msgs_.pop();
total_length = msg->msg_.size() + total_length + ATTR_LENGTH;
total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
}

cur_len_ = cur_len_ - total_length;
Expand Down Expand Up @@ -144,7 +144,7 @@ void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
data_pack_format_attr, user_client_ip,
user_report_time));

cur_len_ += msg.size() + ATTR_LENGTH;
cur_len_ += msg.size() + constants::ATTR_LENGTH;
}

bool RecvGroup::ShouldPack(int32_t msg_len) {
Expand Down Expand Up @@ -322,7 +322,7 @@ bool RecvGroup::IsZipAndOperate(std::string &res, uint32_t real_cur_len) {
}

void RecvGroup::DispatchMsg(bool exit) {
if (cur_len_ <= ATTR_LENGTH || msgs_.empty())
if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty())
return;
bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_;
bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) >
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ RecvManager::RecvManager(std::shared_ptr<SendManager> send_manager)
exit_flag_(false) {
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_;

max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ *
SdkConfig::getInstance()->max_stream_id_num_;
LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_);

check_timer_ = std::make_shared<asio::steady_timer>(io_context_);
check_timer_->expires_after(std::chrono::milliseconds(10));
check_timer_->async_wait(
Expand Down Expand Up @@ -54,9 +58,13 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId,
const std::string &streamId) {
std::lock_guard<std::mutex> lck(mutex_);
auto it = recv_group_map_.find(groupId + streamId);
if (it != recv_group_map_.end())
if (it != recv_group_map_.end()) {
return it->second;
else {
} else {
if (recv_group_map_.size() > max_groupid_streamid_num_) {
return nullptr;
}

RecvGroupPtr recv_group =
std::make_shared<RecvGroup>(groupId, streamId, send_manager_);
recv_group_map_.emplace(groupId + streamId, recv_group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class RecvManager : noncopyable {

uint32_t dispatch_interval_;

uint64_t max_groupid_streamid_num_;

void Run();

public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ static const uint8_t kBinSnappyFlag = 1 << 5;
static const int32_t kPerGroupidThreadNums = 1;
static const int32_t kSendBufSize = 10240000;
static const int32_t kRecvBufSize = 10240000;
static const uint32_t kMaxGroupIdNum = 50;
static const uint32_t kMaxStreamIdNum = 100;

static const int32_t kDispatchIntervalZip = 8;
static const int32_t kDispatchIntervalSend = 10;
Expand Down Expand Up @@ -86,6 +88,7 @@ static const char kProtocolType[] = "TCP";
static const bool kNeedAuth = false;

static const uint32_t kMaxAttrLen = 2048;
const uint32_t ATTR_LENGTH = 10;

} // namespace constants
} // namespace inlong
Expand Down