Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Go input to Cpp pipeline #1715

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,30 @@ bool PipelineConfig::Parse() {
const string pluginType = it->asString();
if (mHasGoInput) {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with extended input plugins",
noModule,
mName,
mProject,
mLogstore,
mRegion);
if (!isCurrentPluginNative) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugin comes after extended processor plugin",
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (pluginType == "processor_spl") {
if (i != 0 || itr->size() != 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with spl processor",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
}
mHasNativeProcessor = true;
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
isCurrentPluginNative = false;
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
Expand Down
64 changes: 59 additions & 5 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
#include "monitor/LogtailAlarm.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "provider/Provider.h"
#include "protobuf/models/ProtocolConversion.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -77,7 +79,7 @@ LogtailPlugin::~LogtailPlugin() {
DynamicLibLoader::CloseLib(mPluginAdapterPtr);
}

bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
LoadGoPipelineResp LogtailPlugin::LoadPipeline(const std::string& pipelineName,
const std::string& pipeline,
const std::string& project,
const std::string& logstore,
Expand All @@ -103,10 +105,10 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
goLogstore.p = logstore.c_str();
long long goLogStoreKey = static_cast<long long>(logstoreKey);

return mLoadPipelineFun(goProject, goLogstore, goConfigName, goLogStoreKey, goPluginConfig) == 0;
return *mLoadPipelineFun(goProject, goLogstore, goConfigName, goLogStoreKey, goPluginConfig);
}

return false;
return LoadGoPipelineResp{false, LoadGoPipelineResp::InputModeType::UNKNOWN};
}

bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
Expand Down Expand Up @@ -300,6 +302,46 @@ int LogtailPlugin::ExecPluginCmd(
return 0;
}

int LogtailPlugin::IsValidToProcess(const char* configName, int configNameSize) {
string configNameStr(configName, configNameSize);
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(configNameStr);
if (!pipeline) {
LOG_ERROR(sLogger,
("pipeline not found during IsValidToProcess",
"return invalid")("config", configName));
return -1;
}
auto processQueueKey = pipeline->GetContext().GetProcessQueueKey();
return ProcessQueueManager::GetInstance()->IsValidToPush(processQueueKey) ? 0 : -1;
}

int LogtailPlugin::PushQueue(const char* configName, int configNameSize, const char* pbBuffer, int pbSize) {
logtail::models::PipelineEventGroup eventGroupSrc;
string configNameStr(configName, configNameSize);
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(configNameStr);
if (!pipeline) {
LOG_ERROR(sLogger,
("pipeline not found during PushQueue, perhaps due to config deletion",
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
"return invalid")("config", configName));
return -1;
}

string pbStr(pbBuffer, pbSize);
if (!eventGroupSrc.ParseFromString(pbStr)) {
LOG_ERROR(sLogger, ("parse pb failed in PushQueue", "invalid pb"));
return -1;
}
string errMsg;
logtail::PipelineEventGroup eventGroupDst(std::make_shared<SourceBuffer>());
if (!TransferPBToPipelineEventGroup(eventGroupSrc, eventGroupDst, errMsg)) {
LOG_ERROR(sLogger, ("transfer pb to pipeline_event_group failed", errMsg));
return -1;
}

auto processQueueKey = pipeline->GetContext().GetProcessQueueKey();
return ProcessQueueManager::GetInstance()->PushQueue(processQueueKey, std::make_unique<ProcessQueueItem>(std::move(eventGroupDst), 0xFFFFFFFF));
}


bool LogtailPlugin::LoadPluginBase() {
if (mPluginValid) {
Expand Down Expand Up @@ -334,7 +376,9 @@ bool LogtailPlugin::LoadPluginBase() {
registerV2Fun(LogtailPlugin::IsValidToSend,
LogtailPlugin::SendPb,
LogtailPlugin::SendPbV2,
LogtailPlugin::ExecPluginCmd);
LogtailPlugin::ExecPluginCmd,
LogtailPlugin::IsValidToProcess,
LogtailPlugin::PushQueue);
} else {
LOG_WARNING(sLogger, ("load RegisterLogtailCallBackV2 failed", error)("try to load V1", ""));

Expand All @@ -343,7 +387,7 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_WARNING(sLogger, ("load RegisterLogtailCallBack failed", error));
return mPluginValid;
}
registerFun(LogtailPlugin::IsValidToSend, LogtailPlugin::SendPb, LogtailPlugin::ExecPluginCmd);
registerFun(LogtailPlugin::IsValidToSend, LogtailPlugin::SendPb, LogtailPlugin::ExecPluginCmd, LogtailPlugin::IsValidToProcess, LogtailPlugin::PushQueue);
}

mPluginAdapterPtr = loader.Release();
Expand Down Expand Up @@ -504,6 +548,16 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
return;
}
std::string realConfigName = configName + "/2";
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
auto iter = pipeline->GetPluginStatistics().find("go_inputs");
if (iter != pipeline->GetPluginStatistics().end()) {
for (auto& goInput : iter->second) {
if (goInput.second > 0) {
realConfigName[realConfigName.size() - 1] = '1';
break;
}
}
}
std::string packIdPrefix = ToHexString(HashString(packId));
GoString goConfigName;
GoSlice goLog;
Expand Down
31 changes: 27 additions & 4 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,19 @@ struct K8sContainerMeta {
}
};

struct LoadGoPipelineResp {
int Code; // 0: success, 1: fail
enum InputModeType {
UNKNOWN,
PUSH,
PULL,
};
InputModeType InputMode;
};

// Methods export by plugin.
typedef GoInt (*LoadGlobalConfigFun)(GoString);
typedef GoInt (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef struct LoadGoPipelineResp* (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef GoInt (*UnloadPipelineFun)(GoString c);
typedef void (*StopAllPipelinesFun)(GoInt);
typedef void (*StopFun)(GoString, GoInt);
Expand Down Expand Up @@ -168,13 +178,21 @@ typedef int (*SendPbV2Fun)(const char* configName,
typedef int (*PluginCtlCmdFun)(
const char* configName, int configNameSize, int optId, const char* params, int paramsLen);

typedef void (*RegisterLogtailCallBack)(IsValidToSendFun checkFun, SendPbFun sendFun, PluginCtlCmdFun cmdFun);
typedef int (*IsValidToProcessFun)(const char *configName, int configNameSize);
typedef int (*PushQueueFun)(const char *configName, int configNameSize, const char *pbBuffer, int pbSize);

typedef void (*RegisterLogtailCallBack)(IsValidToSendFun checkFun, SendPbFun sendFun, PluginCtlCmdFun cmdFun, IsValidToProcessFun checkProcessFun, PushQueueFun pushFun);
typedef void (*RegisterLogtailCallBackV2)(IsValidToSendFun checkFun,
SendPbFun sendFun,
SendPbV2Fun sendV2Fun,
PluginCtlCmdFun cmdFun);
PluginCtlCmdFun cmdFun,
IsValidToProcessFun checkProcessFun,
PushQueueFun pushFun);

typedef int (*PluginAdapterVersion)();

typedef void (*RegisterLogtailProcessCallBack)();

}

// Create by david zhang. 2017/09/02 22:22:12
Expand Down Expand Up @@ -207,7 +225,8 @@ class LogtailPlugin {
}

bool LoadPluginBase();
bool LoadPipeline(const std::string& pipelineName,
// void LoadConfig();
LoadGoPipelineResp LoadPipeline(const std::string& pipelineName,
const std::string& pipeline,
const std::string& project = "",
const std::string& logstore = "",
Expand Down Expand Up @@ -257,6 +276,10 @@ class LogtailPlugin {

void GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList, const std::string& metricType);

static int IsValidToProcess(const char* configName, int configNameSize);

static int PushQueue(const char* configName, int configNameSize, const char* pbBuffer, int pbSize);

private:
void* mPluginBasePtr;
void* mPluginAdapterPtr;
Expand Down
28 changes: 25 additions & 3 deletions core/go_pipeline/LogtailPluginAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,31 @@ IsValidToSendFun gAdapterIsValidToSendFun = NULL;
SendPbFun gAdapterSendPbFun = NULL;
SendPbV2Fun gAdapterSendPbV2Fun = NULL;
PluginCtlCmdFun gPluginCtlCmdFun = NULL;
IsValidToProcessFun gAdapterIsValidToProcessFun = NULL;
PushQueueFun gAdapterPushQueueFun = NULL;

void RegisterLogtailCallBack(IsValidToSendFun checkFun, SendPbFun sendFun, PluginCtlCmdFun cmdFun) {
fprintf(stderr, "[GoPluginAdapter] register fun %p %p %p\n", checkFun, sendFun, cmdFun);
void RegisterLogtailCallBack(IsValidToSendFun checkFun, SendPbFun sendFun, PluginCtlCmdFun cmdFun, IsValidToProcessFun checkProcessFun, PushQueueFun pushFun) {
fprintf(stderr, "[PluginAdapter] register fun %p %p %p %p %p\n", checkFun, sendFun, cmdFun, checkProcessFun, pushFun);
gAdapterIsValidToSendFun = checkFun;
gAdapterSendPbFun = sendFun;
gPluginCtlCmdFun = cmdFun;
gAdapterIsValidToProcessFun = checkProcessFun;
gAdapterPushQueueFun = pushFun;
}

void RegisterLogtailCallBackV2(IsValidToSendFun checkFun,
SendPbFun sendV1Fun,
SendPbV2Fun sendV2Fun,
PluginCtlCmdFun cmdFun) {
PluginCtlCmdFun cmdFun,
IsValidToProcessFun checkProcessFun,
PushQueueFun pushFun) {
fprintf(stderr, "register fun v2 %p %p %p %p\n", checkFun, sendV1Fun, sendV2Fun, cmdFun);
gAdapterIsValidToSendFun = checkFun;
gAdapterSendPbFun = sendV1Fun;
gAdapterSendPbV2Fun = sendV2Fun;
gPluginCtlCmdFun = cmdFun;
gAdapterIsValidToProcessFun = checkProcessFun;
gAdapterPushQueueFun = pushFun;
}

int LogtailIsValidToSend(long long logstoreKey) {
Expand Down Expand Up @@ -86,4 +94,18 @@ int LogtailCtlCmd(const char* configName, int configNameSize, int optId, const c
// - Update RegisterLogtailCallBack to register LogtailSendPBV2.
int PluginAdapterVersion() {
return 300;
}

int LogtailIsValidToProcess(const char* configName, int configNameSize) {
if (gAdapterIsValidToProcessFun == NULL) {
return -1;
}
return gAdapterIsValidToProcessFun(configName, configNameSize);
}

int LogtailPushQueue(const char* configName, int configNameSize, const char* pbBuffer, int pbSize) {
if (gAdapterPushQueueFun == NULL) {
return -1;
}
return gAdapterPushQueueFun(configName, configNameSize, pbBuffer, pbSize);
}
17 changes: 15 additions & 2 deletions core/go_pipeline/LogtailPluginAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,21 @@ typedef int (*SendPbV2Fun)(const char* configName,
typedef int (*PluginCtlCmdFun)(
const char* configName, int configNameSize, int optId, const char* params, int paramsLen);

PLUGIN_ADAPTER_API void RegisterLogtailCallBack(IsValidToSendFun checkFun, SendPbFun sendFun, PluginCtlCmdFun cmdFun);
typedef int (*IsValidToProcessFun)(const char *configName, int configNameSize);
typedef int (*PushQueueFun)(const char *configName, int configNameSize, const char *pbBuffer, int pbSize);

PLUGIN_ADAPTER_API void RegisterLogtailCallBack(IsValidToSendFun checkFun,
SendPbFun sendFun,
PluginCtlCmdFun cmdFun,
IsValidToProcessFun checkProcessFun,
PushQueueFun pushFun);

PLUGIN_ADAPTER_API void RegisterLogtailCallBackV2(IsValidToSendFun checkFun,
SendPbFun sendV1Fun,
SendPbV2Fun sendV2Fun,
PluginCtlCmdFun cmdFun);
PluginCtlCmdFun cmdFun,
IsValidToProcessFun checkProcessFun,
PushQueueFun pushFun);

PLUGIN_ADAPTER_API int LogtailIsValidToSend(long long logstoreKey);

Expand Down Expand Up @@ -82,6 +91,10 @@ LogtailCtlCmd(const char* configName, int configNameSize, int cmdId, const char*
// version for logtail plugin adapter, used for check plugin adapter version
PLUGIN_ADAPTER_API int PluginAdapterVersion();

PLUGIN_ADAPTER_API int LogtailIsValidToProcess(const char *configName, int configNameSize);

PLUGIN_ADAPTER_API int LogtailPushQueue(const char *configName, int configNameSize, const char *pbBuffer, int pbSize);

#ifdef __cplusplus
}
#endif
Loading
Loading