diff --git a/plugins/out_influxdb/influxdb.c b/plugins/out_influxdb/influxdb.c index b84d59ca9f8..671dd5c16a6 100644 --- a/plugins/out_influxdb/influxdb.c +++ b/plugins/out_influxdb/influxdb.c @@ -58,15 +58,19 @@ static void influxdb_tsmod(struct flb_time *ts, struct flb_time *dupe, * Convert the internal Fluent Bit data representation to the required one * by InfluxDB. */ -static char *influxdb_format(const char *tag, int tag_len, - const void *data, size_t bytes, size_t *out_size, - struct flb_influxdb *ctx) +static int influxdb_format(struct flb_config *config, + struct flb_input_instance *ins, + void *plugin_context, + void *flush_ctx, + int event_type, + const char *tag, int tag_len, + const void *data, size_t bytes, + void **out_data, size_t *out_size) { int i; int ret; int n_size; uint64_t seq = 0; - char *buf; char *str = NULL; size_t str_size; char tmp[128]; @@ -77,6 +81,7 @@ static char *influxdb_format(const char *tag, int tag_len, struct influxdb_bulk *bulk_body = NULL; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + struct flb_influxdb *ctx = plugin_context; ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); @@ -84,7 +89,7 @@ static char *influxdb_format(const char *tag, int tag_len, flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); - return NULL; + return -1; } /* Create the bulk composer */ @@ -171,11 +176,21 @@ static char *influxdb_format(const char *tag, int tag_len, } else if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { val = tmp; - val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64, v->via.u64); + if (ctx->use_influxdb_integer) { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64 "i", v->via.u64); + } + else { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRIu64, v->via.u64); + } } else if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { val = tmp; - val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64, v->via.i64); + if (ctx->use_influxdb_integer) { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64 "i", v->via.i64); + } + else { + val_len = snprintf(tmp, sizeof(tmp) - 1, "%" PRId64, v->via.i64); + } } else if (v->type == MSGPACK_OBJECT_FLOAT || v->type == MSGPACK_OBJECT_FLOAT32) { val = tmp; @@ -268,8 +283,8 @@ static char *influxdb_format(const char *tag, int tag_len, flb_log_event_decoder_destroy(&log_decoder); + *out_data = bulk->ptr; *out_size = bulk->len; - buf = bulk->ptr; /* * Note: we don't destroy the bulk as we need to keep the allocated @@ -280,7 +295,7 @@ static char *influxdb_format(const char *tag, int tag_len, influxdb_bulk_destroy(bulk_head); influxdb_bulk_destroy(bulk_body); - return buf; + return 0; error: if (bulk != NULL) { @@ -295,7 +310,7 @@ static char *influxdb_format(const char *tag, int tag_len, flb_log_event_decoder_destroy(&log_decoder); - return NULL; + return -1; } static int cb_influxdb_init(struct flb_output_instance *ins, struct flb_config *config, @@ -453,6 +468,7 @@ static void cb_influxdb_flush(struct flb_event_chunk *event_chunk, int is_metric = FLB_FALSE; size_t b_sent; size_t bytes_out; + void *out_buf; char *pack; char tmp[128]; struct mk_list *head; @@ -477,12 +493,17 @@ static void cb_influxdb_flush(struct flb_event_chunk *event_chunk, } else { /* format logs */ - pack = influxdb_format(event_chunk->tag, flb_sds_len(event_chunk->tag), - event_chunk->data, event_chunk->size, - &bytes_out, ctx); - if (!pack) { + ret = influxdb_format(config, i_ins, + ctx, NULL, + event_chunk->type, + event_chunk->tag, flb_sds_len(event_chunk->tag), + event_chunk->data, event_chunk->size, + &out_buf, &bytes_out); + if (ret != 0) { FLB_OUTPUT_RETURN(FLB_ERROR); } + + pack = (char *) out_buf; } /* Get upstream connection */ @@ -569,6 +590,10 @@ static int cb_influxdb_exit(void *data, struct flb_config *config) flb_utils_split_free(ctx->tag_keys); } + if (ctx->seq_name) { + flb_free(ctx->seq_name); + } + flb_upstream_destroy(ctx->u); flb_free(ctx); @@ -665,6 +690,12 @@ static struct flb_config_map config_map[] = { "Space separated list of keys that needs to be tagged." }, + { + FLB_CONFIG_MAP_BOOL, "add_integer_suffix", "false", + 0, FLB_TRUE, offsetof(struct flb_influxdb, use_influxdb_integer), + "Use influxdb line protocol's integer type suffix." + }, + /* EOF */ {0} }; @@ -677,6 +708,7 @@ struct flb_output_plugin out_influxdb_plugin = { .cb_flush = cb_influxdb_flush, .cb_exit = cb_influxdb_exit, .config_map = config_map, + .test_formatter.callback = influxdb_format, .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS }; diff --git a/plugins/out_influxdb/influxdb.h b/plugins/out_influxdb/influxdb.h index c9ff8a383d3..c21145611c8 100644 --- a/plugins/out_influxdb/influxdb.h +++ b/plugins/out_influxdb/influxdb.h @@ -65,6 +65,9 @@ struct flb_influxdb { /* Arbitrary HTTP headers */ struct mk_list *headers; + /* Use line protocol's integer type */ + int use_influxdb_integer; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 9e5cc4670e4..e902f7892ff 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -124,6 +124,7 @@ if(FLB_IN_LIB) endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") + FLB_RT_TEST(FLB_OUT_INFLUXDB "out_influxdb.c") endif() diff --git a/tests/runtime/out_influxdb.c b/tests/runtime/out_influxdb.c new file mode 100644 index 00000000000..c536d7ea4e0 --- /dev/null +++ b/tests/runtime/out_influxdb.c @@ -0,0 +1,482 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2024 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "flb_tests_runtime.h" + +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +int num_output = 0; +static int get_output_num() +{ + int ret; + pthread_mutex_lock(&result_mutex); + ret = num_output; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_output_num(int num) +{ + pthread_mutex_lock(&result_mutex); + num_output = num; + pthread_mutex_unlock(&result_mutex); +} + +static void clear_output_num() +{ + set_output_num(0); +} + +#define JSON_BASIC "[12345678, {\"key\":\"value\"}]" +static void cb_check_basic(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "key=\"value\""; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +#define JSON_FLOAT "[12345678, {\"float\":1.3}]" +static void cb_check_float_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "float=1.3"; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +#define JSON_INTEGER "[12345678, {\"int\":100}]" +static void cb_check_int_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "int=100i"; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + + +#define JSON_NEGATIVE_INTEGER "[12345678, {\"int\":-200}]" +static void cb_check_negative_int_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *index_line = "int=-200i"; + + set_output_num(1); + + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +static void cb_check_int_as_float_value(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *missing_index_line = "int=100i"; + char *index_line = "int=100"; + + set_output_num(1); + + p = strstr(out, missing_index_line); + if (!TEST_CHECK(p == NULL)) { + TEST_MSG("Given:%s", out); + } + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +static void cb_check_negative_int_as_float_value( + void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + flb_sds_t out = res_data; + char *missing_index_line = "int=-200i"; + char *index_line = "int=-200"; + + set_output_num(1); + + p = strstr(out, missing_index_line); + if (!TEST_CHECK(p == NULL)) { + TEST_MSG("Given:%s", out); + } + p = strstr(out, index_line); + if (!TEST_CHECK(p != NULL)) { + TEST_MSG("Given:%s", out); + } + + flb_free(out); +} + +void flb_test_basic() +{ + int ret; + int size = sizeof(JSON_BASIC) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_basic, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_BASIC, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_float_value() +{ + int ret; + int size = sizeof(JSON_FLOAT) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_float_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_FLOAT, size); + TEST_CHECK(ret >= 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Using integer type */ +void flb_test_integer_value() +{ + int ret; + int size = sizeof(JSON_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "add_integer_suffix", "true", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_int_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_negative_integer_value() +{ + int ret; + int size = sizeof(JSON_NEGATIVE_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "add_integer_suffix", "true", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_negative_int_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_NEGATIVE_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Not using integer type of line protocol */ +void flb_test_integer_as_float_value() +{ + int ret; + int size = sizeof(JSON_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "add_integer_suffix", "false", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_int_as_float_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_negative_integer_as_float_value() +{ + int ret; + int size = sizeof(JSON_NEGATIVE_INTEGER) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "influxdb", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + "add_integer_suffix", "false", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_negative_int_as_float_value, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_push(ctx, in_ffd, (char *) JSON_NEGATIVE_INTEGER, size); + TEST_CHECK(ret >= 0); + + sleep(2); + + ret = get_output_num(); + if (!TEST_CHECK(ret != 0)) { + TEST_MSG("no output"); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* Test list */ +TEST_LIST = { + {"basic" , flb_test_basic }, + {"float" , flb_test_float_value }, + {"int_integer" , flb_test_integer_value }, + {"int_negative_integer" , flb_test_negative_integer_value }, + {"int_integer_as_float" , flb_test_integer_as_float_value }, + {"int_negative_integer_as_float" , flb_test_negative_integer_as_float_value }, + {NULL, NULL} +};