Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_calyptia: retry agent registration on flush callback [backport 3.1] #9662

Open
wants to merge 2 commits into
base: 3.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/custom_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct calyptia {
flb_sds_t fleet_config_dir; /* fleet configuration directory */
int fleet_interval_sec;
int fleet_interval_nsec;
bool register_retry_on_flush; /* retry registration on flush if failed */
};

/*
Expand Down Expand Up @@ -278,6 +279,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
flb_output_set_property(cloud, "match", "_calyptia_cloud");
flb_output_set_property(cloud, "api_key", ctx->api_key);

if (ctx->register_retry_on_flush) {
flb_output_set_property(cloud, "register_retry_on_flush", "true");
} else {
flb_output_set_property(cloud, "register_retry_on_flush", "false");
}

if (ctx->store_path) {
flb_output_set_property(cloud, "store_path", ctx->store_path);
}
Expand Down Expand Up @@ -606,6 +613,11 @@ static struct flb_config_map config_map[] = {
},
#endif /* FLB_HAVE_CHUNK_TRACE */

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},
/* EOF */
{0}
};
Expand Down
195 changes: 120 additions & 75 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return NULL;
}

ctx->metrics_endpoint = flb_sds_create_size(256);
if (!ctx->metrics_endpoint) {
flb_free(ctx);
return NULL;
}

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
if (!ctx->trace_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
flb_free(ctx);
return NULL;
}
#endif

/* api_key */
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
Expand Down Expand Up @@ -771,12 +786,41 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
return ctx;
}

static int cb_calyptia_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
static int register_agent(struct flb_calyptia *ctx, struct flb_config *config)
{
int ret;

/* Try registration */
ret = api_agent_create(config, ctx);

if (ret != FLB_OK) {
flb_plg_warn(ctx->ins, "agent registration failed");
return FLB_ERROR;
}

/* Update endpoints */
flb_sds_len_set(ctx->metrics_endpoint, 0);
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id);

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->pipeline_id) {
flb_sds_len_set(ctx->trace_endpoint, 0);
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
ctx->pipeline_id);
}
#endif

flb_plg_info(ctx->ins, "agent registration successful");
return FLB_OK;
}

static int cb_calyptia_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
struct flb_calyptia *ctx;
(void) data;
int ret;

/* create config context */
ctx = config_init(ins, config);
Expand All @@ -792,22 +836,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins,
flb_output_set_http_debug_callbacks(ins);

/* register/update agent */
ret = api_agent_create(config, ctx);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "agent registration failed");
ret = register_agent(ctx, config);
if (ret != FLB_OK && !ctx->register_retry_on_flush) {
flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false");
return -1;
}

/* metrics endpoint */
ctx->metrics_endpoint = flb_sds_create_size(256);
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id);

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
ctx->pipeline_id);
#endif /* FLB_HAVE_CHUNK_TRACE */
return 0;
}

Expand All @@ -830,6 +864,50 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
cmt_destroy(cmt);
}

static int cb_calyptia_exit(void *data, struct flb_config *config)
{
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}

if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}

if (ctx->env) {
flb_env_destroy(ctx->env);
}

if (ctx->metrics_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);

return 0;
}

static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *i_ins,
Expand All @@ -853,6 +931,17 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) config;

if (!ctx->agent_id && ctx->register_retry_on_flush) {
flb_plg_info(ctx->ins, "agent_id not found and register_retry_on_flush=true, attempting registration");
if (register_agent(ctx, config) != FLB_OK) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}
else if (!ctx->agent_id) {
flb_plg_error(ctx->ins, "no agent_id available and register_retry_on_flush=false");
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
if (!u_conn) {
Expand Down Expand Up @@ -899,12 +988,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
/* perform request */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
if (ret == FLB_OK) {
flb_plg_debug(ctx->ins, "metrics delivered OK");
}
else if (ret == FLB_ERROR) {
else {
flb_plg_error(ctx->ins, "could not deliver metrics");
debug_payload(ctx, out_buf, out_size);
}
Expand All @@ -915,7 +1004,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
json = flb_pack_msgpack_to_json_format(event_chunk->data,
event_chunk->size,
FLB_PACK_JSON_FORMAT_STREAM,
Expand All @@ -925,32 +1015,25 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
out_buf = (char *)json;
out_size = flb_sds_len(json);

if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
ctx->agent_id) == NULL) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
out_buf, out_size, NULL, 0, NULL, 0);
(char *) json, flb_sds_len(json),
NULL, 0, NULL, 0);

if (!c) {
flb_upstream_conn_release(u_conn);
flb_sds_destroy(json);
flb_sds_destroy(ctx->metrics_endpoint);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
/* perform request */
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
if (ret == FLB_OK) {
flb_plg_debug(ctx->ins, "trace delivered OK");
}
else if (ret == FLB_ERROR) {
else {
flb_plg_error(ctx->ins, "could not deliver trace");
debug_payload(ctx, out_buf, out_size);
debug_payload(ctx, (char *) json, flb_sds_len(json));
}
flb_sds_destroy(json);
}
Expand All @@ -964,50 +1047,6 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(ret);
}

static int cb_calyptia_exit(void *data, struct flb_config *config)
{
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

if (ctx->agent_id) {
flb_sds_destroy(ctx->agent_id);
}

if (ctx->agent_token) {
flb_sds_destroy(ctx->agent_token);
}

if (ctx->env) {
flb_env_destroy(ctx->env);
}

if (ctx->metrics_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
}

#ifdef FLB_HAVE_CHUNK_TRACE
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);

return 0;
}

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
Expand Down Expand Up @@ -1058,6 +1097,12 @@ static struct flb_config_map config_map[] = {
},
#endif

{
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush),
"Retry agent registration on flush if failed on init."
},

/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/out_calyptia/calyptia.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct flb_calyptia {
flb_sds_t trace_endpoint;
flb_sds_t pipeline_id;
#endif /* FLB_HAVE_CHUNK_TRACE */
bool register_retry_on_flush; /* retry registration on flush if failed */
};

#endif
29 changes: 28 additions & 1 deletion tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,34 @@ if(FLB_IN_LIB)
endif()

if (FLB_CUSTOM_CALYPTIA)
FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c")
set(CALYPTIA_TEST_LINK_LIBS
fluent-bit-static
${CMAKE_THREAD_LIBS_INIT}
)
set(CALYPTIA_TESTS
"custom_calyptia_test.c"
"custom_calyptia_registration_retry_test.c"
"custom_calyptia_input_test.c"
)

foreach(TEST_SOURCE ${CALYPTIA_TESTS})
get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE)
set(TEST_TARGET "flb-rt-${TEST_NAME}")
add_executable(${TEST_TARGET}
${TEST_SOURCE}
"../../plugins/custom_calyptia/calyptia.c"
)

target_link_libraries(${TEST_TARGET}
${CALYPTIA_TEST_LINK_LIBS}
)
add_test(NAME ${TEST_TARGET}
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)

set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
add_dependencies(${TEST_TARGET} fluent-bit-static)
endforeach()
endif()

if (FLB_PROCESSOR_METRICS_SELECTOR)
Expand Down
Loading
Loading