diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 11bf71f7e53..64a504ef905 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -142,10 +142,6 @@ static int cb_parser_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) { - (void) f_ins; - (void) config; - (void) data; - struct filter_parser_ctx *ctx = NULL; /* Create context */ @@ -156,13 +152,12 @@ static int cb_parser_init(struct flb_filter_instance *f_ins, } ctx->ins = f_ins; - if ( configure(ctx, f_ins, config) < 0 ){ + if (configure(ctx, f_ins, config) < 0) { flb_free(ctx); return -1; } flb_filter_set_context(f_ins, ctx); - return 0; } @@ -174,11 +169,9 @@ static int cb_parser_filter(const void *data, size_t bytes, void *context, struct flb_config *config) { - int continue_parsing; struct filter_parser_ctx *ctx = context; struct flb_time tm; msgpack_object *obj; - msgpack_object_kv *kv; int i; int ret = FLB_FILTER_NOTOUCH; @@ -191,10 +184,8 @@ static int cb_parser_filter(const void *data, size_t bytes, char *out_buf; size_t out_size; struct flb_time parsed_time; - msgpack_object_kv **append_arr = NULL; size_t append_arr_len = 0; - int append_arr_i; struct mk_list *head; struct filter_parser *fp; struct flb_log_event_encoder log_encoder; @@ -209,9 +200,7 @@ static int cb_parser_filter(const void *data, size_t bytes, ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); if (ret != FLB_EVENT_DECODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event decoder initialization error : %d", ret); - + flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); return FLB_FILTER_NOTOUCH; } @@ -219,11 +208,8 @@ static int cb_parser_filter(const void *data, size_t bytes, FLB_LOG_EVENT_FORMAT_DEFAULT); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event encoder initialization error : %d", ret); - + flb_plg_error(ctx->ins, "Log event encoder initialization error : %d", ret); flb_log_event_decoder_destroy(&log_decoder); - return FLB_FILTER_NOTOUCH; } @@ -231,77 +217,69 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { out_buf = NULL; - append_arr_i = 0; flb_time_copy(&tm, &log_event.timestamp); obj = log_event.body; if (obj->type == MSGPACK_OBJECT_MAP) { map_num = obj->via.map.size; - if (ctx->reserve_data) { - append_arr_len = obj->via.map.size; - append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *)); + /* Calculate initial array size based on configuration */ + append_arr_len = (ctx->reserve_data ? map_num : 0); + if (ctx->preserve_key && !ctx->reserve_data) { + append_arr_len = 1; /* Space for preserved key */ + } + if (append_arr_len > 0) { + append_arr = flb_calloc(append_arr_len, sizeof(msgpack_object_kv *)); if (append_arr == NULL) { flb_errno(); - flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); - return FLB_FILTER_NOTOUCH; } - } - continue_parsing = FLB_TRUE; - for (i = 0; i < map_num && continue_parsing; i++) { - kv = &obj->via.map.ptr[i]; + /* Initialize array */ if (ctx->reserve_data) { - append_arr[append_arr_i] = kv; - append_arr_i++; + for (i = 0; i < map_num; i++) { + append_arr[i] = &obj->via.map.ptr[i]; + } } - if ( msgpackobj2char(&kv->key, &key_str, &key_len) < 0 ) { - /* key is not string */ + } + + /* Process the target key */ + for (i = 0; i < map_num; i++) { + kv = &obj->via.map.ptr[i]; + if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) { continue; } + if (key_len == ctx->key_name_len && !strncmp(key_str, ctx->key_name, key_len)) { - if ( msgpackobj2char(&kv->val, &val_str, &val_len) < 0 ) { - /* val is not string */ + if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) { continue; } /* Lookup parser */ mk_list_foreach(head, &ctx->parsers) { fp = mk_list_entry(head, struct filter_parser, _head); - - /* Reset time */ flb_time_zero(&parsed_time); parse_ret = flb_parser_do(fp->parser, val_str, val_len, (void **) &out_buf, &out_size, &parsed_time); if (parse_ret >= 0) { - /* - * If the parser succeeded we need to check the - * status of the parsed time. If the time was - * parsed successfully 'parsed_time' will be - * different than zero, if so, override the time - * holder with the new value, otherwise keep the - * original. - */ if (flb_time_to_nanosec(&parsed_time) != 0L) { flb_time_copy(&tm, &parsed_time); } - if (ctx->reserve_data) { + if (append_arr != NULL) { if (!ctx->preserve_key) { - append_arr_i--; - append_arr_len--; - append_arr[append_arr_i] = NULL; + append_arr[i] = NULL; + } + else if (!ctx->reserve_data) { + /* Store only the key being preserved */ + append_arr[0] = kv; } - } - else { - continue_parsing = FLB_FALSE; } break; } @@ -322,27 +300,58 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_encoder, log_event.metadata); } - if (out_buf != NULL) { - if (ctx->reserve_data) { + if (out_buf != NULL && parse_ret >= 0) { + if (append_arr != NULL && append_arr_len > 0) { char *new_buf = NULL; - int new_size; - int ret; - ret = flb_msgpack_expand_map(out_buf, out_size, - append_arr, append_arr_len, - &new_buf, &new_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot expand map"); - - flb_log_event_decoder_destroy(&log_decoder); - flb_log_event_encoder_destroy(&log_encoder); - flb_free(append_arr); - - return FLB_FILTER_NOTOUCH; + int new_size; + size_t valid_kv_count = 0; + msgpack_object_kv **valid_kv = NULL; + + /* Count valid entries */ + for (i = 0; i < append_arr_len; i++) { + if (append_arr[i] != NULL) { + valid_kv_count++; + } } - flb_free(out_buf); - out_buf = new_buf; - out_size = new_size; + if (valid_kv_count > 0) { + valid_kv = flb_calloc(valid_kv_count, sizeof(msgpack_object_kv *)); + if (!valid_kv) { + flb_errno(); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + flb_free(out_buf); + return FLB_FILTER_NOTOUCH; + } + + /* Fill valid entries */ + valid_kv_count = 0; + for (i = 0; i < append_arr_len; i++) { + if (append_arr[i] != NULL) { + valid_kv[valid_kv_count++] = append_arr[i]; + } + } + + ret = flb_msgpack_expand_map(out_buf, out_size, + valid_kv, valid_kv_count, + &new_buf, &new_size); + + flb_free(valid_kv); + + if (ret == -1) { + flb_plg_error(ctx->ins, "cannot expand map"); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + flb_free(out_buf); + return FLB_FILTER_NOTOUCH; + } + + flb_free(out_buf); + out_buf = new_buf; + out_size = new_size; + } } if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { @@ -355,7 +364,6 @@ static int cb_parser_filter(const void *data, size_t bytes, ret = FLB_FILTER_MODIFIED; } else { - /* re-use original data*/ if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { encoder_result = \ flb_log_event_encoder_set_body_from_msgpack_object( @@ -371,26 +379,22 @@ static int cb_parser_filter(const void *data, size_t bytes, flb_plg_error(ctx->ins, "log event encoder error : %d", encoder_result); } - flb_free(append_arr); - append_arr = NULL; - } - else { - continue; + if (append_arr != NULL) { + flb_free(append_arr); + append_arr = NULL; + } } } if (log_encoder.output_length > 0) { - *ret_buf = log_encoder.output_buffer; + *ret_buf = log_encoder.output_buffer; *ret_bytes = log_encoder.output_length; ret = FLB_FILTER_MODIFIED; - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); } else { - flb_plg_error(ctx->ins, - "Log event encoder error : %d", ret); - + flb_plg_error(ctx->ins, "Log event encoder error : %d", ret); ret = FLB_FILTER_NOTOUCH; } @@ -400,7 +404,6 @@ static int cb_parser_filter(const void *data, size_t bytes, return ret; } - static int cb_parser_exit(void *data, struct flb_config *config) { struct filter_parser_ctx *ctx = data; diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index 8f25fec0e6e..aa5fc7a2d1b 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -493,32 +493,61 @@ void flb_test_filter_parser_handle_time_key_with_time_zone() } void test_parser_timestamp_timezone(char *tz, - char *time_fmt, - char *timestamp, - char *expected_epoch, - int use_system_timezone) + char *time_fmt, + char *timestamp, + char *expected_epoch, + int use_system_timezone) { int ret; int bytes; char *output, *original_tz = NULL; + char *saved_tz = NULL; char p[256]; - char expected[12]; + char expected[256]; flb_ctx_t *ctx; int in_ffd; int out_ffd; int filter_ffd; struct flb_parser *parser; + struct flb_lib_out_cb *cb; - struct flb_lib_out_cb cb; - cb.cb = callback_test; - cb.data = NULL; + /* Allocate and initialize callback */ + cb = flb_malloc(sizeof(struct flb_lib_out_cb)); + if (!cb) { + flb_errno(); + return; + } + cb->cb = callback_test; + cb->data = NULL; clear_output(); + /* Save current TZ if exists */ + original_tz = getenv("TZ"); + if (original_tz) { + saved_tz = strdup(original_tz); + if (!saved_tz) { + flb_free(cb); + return; + } + } + + /* Set new timezone if provided */ + if (tz) { + ret = setenv("TZ", tz, 1); + TEST_CHECK(ret == 0); + tzset(); /* Make sure timezone changes take effect */ + } + ctx = flb_create(); + TEST_CHECK(ctx != NULL); /* Configure service */ - flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace", "1", "Log_Level", "debug", NULL); + flb_service_set(ctx, + "Flush", FLUSH_INTERVAL, + "Grace", "1", + "Log_Level", "debug", + NULL); /* Input */ in_ffd = flb_input(ctx, (char *) "lib", NULL); @@ -528,21 +557,20 @@ void test_parser_timestamp_timezone(char *tz, NULL); /* Parser */ - parser = flb_parser_create("timestamp", // name - "regex", // format - "^(?