From 669af8d88a3dbdf71c6bd61cb4a041169d1341a3 Mon Sep 17 00:00:00 2001 From: Patrick Stephens Date: Thu, 28 Nov 2024 12:03:55 +0000 Subject: [PATCH 1/2] out_calyptia: retry agent registration on flush callback [backport 3.1] Signed-off-by: Patrick Stephens --- plugins/custom_calyptia/calyptia.c | 12 ++ plugins/out_calyptia/calyptia.c | 195 +++++++++++------- plugins/out_calyptia/calyptia.h | 1 + tests/runtime/CMakeLists.txt | 29 ++- tests/runtime/custom_calyptia_input_test.c | 172 +++++++++++++++ .../custom_calyptia_registration_retry_test.c | 117 +++++++++++ 6 files changed, 450 insertions(+), 76 deletions(-) create mode 100644 tests/runtime/custom_calyptia_input_test.c create mode 100644 tests/runtime/custom_calyptia_registration_retry_test.c diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index 4aba53ca79c..3ddc38d5413 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -62,6 +62,7 @@ struct calyptia { flb_sds_t fleet_config_dir; /* fleet configuration directory */ int fleet_interval_sec; int fleet_interval_nsec; + bool register_retry_on_flush; /* retry registration on flush if failed */ }; /* @@ -278,6 +279,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, flb_output_set_property(cloud, "match", "_calyptia_cloud"); flb_output_set_property(cloud, "api_key", ctx->api_key); + if (ctx->register_retry_on_flush) { + flb_output_set_property(cloud, "register_retry_on_flush", "true"); + } else { + flb_output_set_property(cloud, "register_retry_on_flush", "false"); + } + if (ctx->store_path) { flb_output_set_property(cloud, "store_path", ctx->store_path); } @@ -606,6 +613,11 @@ static struct flb_config_map config_map[] = { }, #endif /* FLB_HAVE_CHUNK_TRACE */ + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index aa760e2fa5e..34c86a528c9 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -721,6 +721,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return NULL; } + ctx->metrics_endpoint = flb_sds_create_size(256); + if (!ctx->metrics_endpoint) { + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_CHUNK_TRACE + ctx->trace_endpoint = flb_sds_create_size(256); + if (!ctx->trace_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + flb_free(ctx); + return NULL; + } +#endif + /* api_key */ if (!ctx->api_key) { flb_plg_error(ctx->ins, "configuration 'api_key' is missing"); @@ -771,12 +786,41 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return ctx; } -static int cb_calyptia_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) +static int register_agent(struct flb_calyptia *ctx, struct flb_config *config) { int ret; + + /* Try registration */ + ret = api_agent_create(config, ctx); + + if (ret != FLB_OK) { + flb_plg_warn(ctx->ins, "agent registration failed"); + return FLB_ERROR; + } + + /* Update endpoints */ + flb_sds_len_set(ctx->metrics_endpoint, 0); + flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, + ctx->agent_id); + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ctx->pipeline_id) { + flb_sds_len_set(ctx->trace_endpoint, 0); + flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, + ctx->pipeline_id); + } +#endif + + flb_plg_info(ctx->ins, "agent registration successful"); + return FLB_OK; +} + +static int cb_calyptia_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ struct flb_calyptia *ctx; (void) data; + int ret; /* create config context */ ctx = config_init(ins, config); @@ -792,22 +836,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins, flb_output_set_http_debug_callbacks(ins); /* register/update agent */ - ret = api_agent_create(config, ctx); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "agent registration failed"); + ret = register_agent(ctx, config); + if (ret != FLB_OK && !ctx->register_retry_on_flush) { + flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false"); return -1; } - /* metrics endpoint */ - ctx->metrics_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id); - -#ifdef FLB_HAVE_CHUNK_TRACE - ctx->trace_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, - ctx->pipeline_id); -#endif /* FLB_HAVE_CHUNK_TRACE */ return 0; } @@ -830,6 +864,50 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes) cmt_destroy(cmt); } +static int cb_calyptia_exit(void *data, struct flb_config *config) +{ + struct flb_calyptia *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->agent_id) { + flb_sds_destroy(ctx->agent_id); + } + + if (ctx->agent_token) { + flb_sds_destroy(ctx->agent_token); + } + + if (ctx->env) { + flb_env_destroy(ctx->env); + } + + if (ctx->metrics_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + } + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ctx->trace_endpoint) { + flb_sds_destroy(ctx->trace_endpoint); + } +#endif /* FLB_HAVE_CHUNK_TRACE */ + + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + + flb_kv_release(&ctx->kv_labels); + flb_free(ctx); + + return 0; +} + static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -853,6 +931,17 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; + if (!ctx->agent_id && ctx->register_retry_on_flush) { + flb_plg_info(ctx->ins, "agent_id not found and register_retry_on_flush=true, attempting registration"); + if (register_agent(ctx, config) != FLB_OK) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else if (!ctx->agent_id) { + flb_plg_error(ctx->ins, "no agent_id available and register_retry_on_flush=false"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -899,12 +988,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ + /* perform request */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "metrics delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver metrics"); debug_payload(ctx, out_buf, out_size); } @@ -915,7 +1004,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, } #ifdef FLB_HAVE_CHUNK_TRACE - if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) { + if (event_chunk->type & FLB_EVENT_TYPE_LOGS && + event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) { json = flb_pack_msgpack_to_json_format(event_chunk->data, event_chunk->size, FLB_PACK_JSON_FORMAT_STREAM, @@ -925,32 +1015,25 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } - out_buf = (char *)json; - out_size = flb_sds_len(json); - if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id) == NULL) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + (char *) json, flb_sds_len(json), + NULL, 0, NULL, 0); + if (!c) { flb_upstream_conn_release(u_conn); flb_sds_destroy(json); - flb_sds_destroy(ctx->metrics_endpoint); FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ + /* perform request */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "trace delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver trace"); - debug_payload(ctx, out_buf, out_size); + debug_payload(ctx, (char *) json, flb_sds_len(json)); } flb_sds_destroy(json); } @@ -964,50 +1047,6 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(ret); } -static int cb_calyptia_exit(void *data, struct flb_config *config) -{ - struct flb_calyptia *ctx = data; - - if (!ctx) { - return 0; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - - if (ctx->agent_id) { - flb_sds_destroy(ctx->agent_id); - } - - if (ctx->agent_token) { - flb_sds_destroy(ctx->agent_token); - } - - if (ctx->env) { - flb_env_destroy(ctx->env); - } - - if (ctx->metrics_endpoint) { - flb_sds_destroy(ctx->metrics_endpoint); - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ctx->trace_endpoint) { - flb_sds_destroy(ctx->trace_endpoint); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - if (ctx->fs) { - flb_fstore_destroy(ctx->fs); - } - - flb_kv_release(&ctx->kv_labels); - flb_free(ctx); - - return 0; -} - /* Configuration properties map */ static struct flb_config_map config_map[] = { { @@ -1058,6 +1097,12 @@ static struct flb_config_map config_map[] = { }, #endif + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, + /* EOF */ {0} }; diff --git a/plugins/out_calyptia/calyptia.h b/plugins/out_calyptia/calyptia.h index ee37d8778dc..0532e4a2b01 100644 --- a/plugins/out_calyptia/calyptia.h +++ b/plugins/out_calyptia/calyptia.h @@ -80,6 +80,7 @@ struct flb_calyptia { flb_sds_t trace_endpoint; flb_sds_t pipeline_id; #endif /* FLB_HAVE_CHUNK_TRACE */ + bool register_retry_on_flush; /* retry registration on flush if failed */ }; #endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index e902f7892ff..77534c80b15 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -129,7 +129,34 @@ if(FLB_IN_LIB) endif() if (FLB_CUSTOM_CALYPTIA) - FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c") + set(CALYPTIA_TEST_LINK_LIBS + fluent-bit-static + ${CMAKE_THREAD_LIBS_INIT} + ) + set(CALYPTIA_TESTS + "custom_calyptia_test.c" + "custom_calyptia_registration_retry_test.c" + "custom_calyptia_input_test.c" + ) + + foreach(TEST_SOURCE ${CALYPTIA_TESTS}) + get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE) + set(TEST_TARGET "flb-rt-${TEST_NAME}") + add_executable(${TEST_TARGET} + ${TEST_SOURCE} + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) + endforeach() endif() if (FLB_PROCESSOR_METRICS_SELECTOR) diff --git a/tests/runtime/custom_calyptia_input_test.c b/tests/runtime/custom_calyptia_input_test.c new file mode 100644 index 00000000000..ffc774c83d9 --- /dev/null +++ b/tests/runtime/custom_calyptia_input_test.c @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include "../../plugins/custom_calyptia/calyptia.h" + +/* Test context structure */ +struct test_context { + struct calyptia *ctx; + struct flb_input_instance *fleet; + struct flb_config *config; +}; + +/* Initialize test context */ +static struct test_context *init_test_context() +{ + struct test_context *t_ctx = flb_calloc(1, sizeof(struct test_context)); + if (!t_ctx) { + return NULL; + } + + t_ctx->config = flb_config_init(); + if (!t_ctx->config) { + flb_free(t_ctx); + return NULL; + } + + t_ctx->ctx = flb_calloc(1, sizeof(struct calyptia)); + if (!t_ctx->ctx) { + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize plugin instance for logging */ + t_ctx->ctx->ins = flb_calloc(1, sizeof(struct flb_custom_instance)); + if (!t_ctx->ctx->ins) { + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + /* Initialize test values in ctx */ + t_ctx->ctx->api_key = flb_strdup("test_api_key"); + t_ctx->ctx->fleet_config_dir = flb_strdup("/test/config/dir"); + t_ctx->ctx->fleet_id = flb_strdup("test_fleet_id"); + t_ctx->ctx->fleet_name = flb_strdup("test_fleet"); + t_ctx->ctx->machine_id = flb_strdup("test_machine_id"); + t_ctx->ctx->fleet_max_http_buffer_size = flb_strdup("1024"); + t_ctx->ctx->fleet_interval_sec = flb_strdup("60"); + t_ctx->ctx->fleet_interval_nsec = flb_strdup("500000000"); + + t_ctx->fleet = flb_input_new(t_ctx->config, "calyptia_fleet", NULL, FLB_FALSE); + if (!t_ctx->fleet) { + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + flb_config_exit(t_ctx->config); + flb_free(t_ctx); + return NULL; + } + + return t_ctx; +} + +static void cleanup_test_context(struct test_context *t_ctx) +{ + if (!t_ctx) { + return; + } + + if (t_ctx->fleet) { + /* Input instance cleanup */ + flb_input_instance_destroy(t_ctx->fleet); + } + + if (t_ctx->ctx) { + if (t_ctx->ctx->api_key) flb_free(t_ctx->ctx->api_key); + if (t_ctx->ctx->fleet_config_dir) flb_free(t_ctx->ctx->fleet_config_dir); + if (t_ctx->ctx->fleet_id) flb_free(t_ctx->ctx->fleet_id); + if (t_ctx->ctx->fleet_name) flb_free(t_ctx->ctx->fleet_name); + if (t_ctx->ctx->machine_id) flb_free(t_ctx->ctx->machine_id); + if (t_ctx->ctx->fleet_max_http_buffer_size) flb_free(t_ctx->ctx->fleet_max_http_buffer_size); + if (t_ctx->ctx->fleet_interval_sec) flb_free(t_ctx->ctx->fleet_interval_sec); + if (t_ctx->ctx->fleet_interval_nsec) flb_free(t_ctx->ctx->fleet_interval_nsec); + if (t_ctx->ctx->ins) flb_free(t_ctx->ctx->ins); + flb_free(t_ctx->ctx); + } + + if (t_ctx->config) { + /* Destroy the config which will cleanup any remaining instances */ + flb_config_exit(t_ctx->config); + } + + flb_free(t_ctx); +} + +void test_set_fleet_input_properties() +{ + struct test_context *t_ctx = init_test_context(); + TEST_CHECK(t_ctx != NULL); + + /* Test setting properties */ + int ret = set_fleet_input_properties(t_ctx->ctx, t_ctx->fleet); + TEST_CHECK(ret == 0); + + /* Verify properties were set correctly */ + const char *value; + + /* Check api_key */ + value = flb_input_get_property("api_key", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("api_key expected=%s got=%s", t_ctx->ctx->api_key, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->api_key) == 0); + + /* Check config_dir */ + value = flb_input_get_property("config_dir", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("config_dir expected=%s got=%s", t_ctx->ctx->fleet_config_dir, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_config_dir) == 0); + + /* Check fleet_id */ + value = flb_input_get_property("fleet_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_id expected=%s got=%s", t_ctx->ctx->fleet_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_id) == 0); + + /* Check fleet_name */ + value = flb_input_get_property("fleet_name", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("fleet_name expected=%s got=%s", t_ctx->ctx->fleet_name, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_name) == 0); + + /* Check machine_id */ + value = flb_input_get_property("machine_id", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("machine_id expected=%s got=%s", t_ctx->ctx->machine_id, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->machine_id) == 0); + + /* Check max_http_buffer_size */ + value = flb_input_get_property("max_http_buffer_size", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("max_http_buffer_size expected=%s got=%s", t_ctx->ctx->fleet_max_http_buffer_size, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_max_http_buffer_size) == 0); + + // /* Check interval_sec */ + value = flb_input_get_property("interval_sec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_sec expected=%s got=%s", t_ctx->ctx->fleet_interval_sec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_sec) == 0); + + // /* Check interval_nsec */ + value = flb_input_get_property("interval_nsec", t_ctx->fleet); + TEST_CHECK(value != NULL); + TEST_MSG("interval_nsec expected=%s got=%s", t_ctx->ctx->fleet_interval_nsec, value); + TEST_CHECK(value && strcmp(value, t_ctx->ctx->fleet_interval_nsec) == 0); + + ret = set_fleet_input_properties(t_ctx->ctx, NULL); + TEST_CHECK(ret == -1); + + cleanup_test_context(t_ctx); +} + +/* Define test list */ +TEST_LIST = { + {"set_fleet_input_properties", test_set_fleet_input_properties}, + {NULL, NULL} +}; \ No newline at end of file diff --git a/tests/runtime/custom_calyptia_registration_retry_test.c b/tests/runtime/custom_calyptia_registration_retry_test.c new file mode 100644 index 00000000000..8ef3b941859 --- /dev/null +++ b/tests/runtime/custom_calyptia_registration_retry_test.c @@ -0,0 +1,117 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include "flb_tests_runtime.h" + +#define MOCK_SERVER_HOST "127.0.0.1" +#define MOCK_SERVER_PORT 9876 + +static int registration_count = 0; + +static void mock_server_cb(mk_request_t *request, void *data) +{ + registration_count++; + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + mk_http_done(request); +} + +/* Test function */ +void test_calyptia_register_retry() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration attempt should have failed */ + TEST_CHECK(registration_count == 1); + + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +/* Test list */ +TEST_LIST = { + {"register_retry", test_calyptia_register_retry}, + {NULL, NULL} +}; From 3d437584396d0cce45514eb8170e0b683ab8c835 Mon Sep 17 00:00:00 2001 From: Patrick Stephens Date: Thu, 28 Nov 2024 12:11:57 +0000 Subject: [PATCH 2/2] out_calyptia: remove unnecessary include for tests Signed-off-by: Patrick Stephens --- tests/runtime/custom_calyptia_input_test.c | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/runtime/custom_calyptia_input_test.c b/tests/runtime/custom_calyptia_input_test.c index ffc774c83d9..54e921ed3da 100644 --- a/tests/runtime/custom_calyptia_input_test.c +++ b/tests/runtime/custom_calyptia_input_test.c @@ -6,7 +6,6 @@ #include #include #include "flb_tests_runtime.h" -#include "../../plugins/custom_calyptia/calyptia.h" /* Test context structure */ struct test_context {