From e81fcb07c644f0484244941051847aa736d1fe89 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 16 Dec 2021 08:33:36 -0600 Subject: [PATCH] in_forward: add new option 'tag_prefix' (#4455) (#4466) This patch adds a new option called 'tag_prefix' that allows to prefix the incoming Tag set by the remote client. Usage: --- config file --- [SERVICE] flush 1 log_level info [INPUT] name forward tag_prefix new. [OUTPUT] name stdout match * --- end --- > terminal 1: $ echo "{\"key\": \"test\"}" | fluent-cat test > terminal 2: $ bin/fluent-bit -i forward -p tag_prefix=new. -o stdout -f 1 ... [0] new.test: [1639626354.501658295, {"key"=>"test"}] Signed-off-by: Eduardo Silva --- plugins/in_forward/fw.c | 5 +++++ plugins/in_forward/fw.h | 2 ++ plugins/in_forward/fw_prot.c | 36 +++++++++++++++++++++++++++++++----- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index dc8b9d1b279..0961a4c2127 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -207,6 +207,11 @@ static int in_fw_exit(void *data, struct flb_config *config) /* Configuration properties map */ static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "tag_prefix", NULL, + 0, FLB_TRUE, offsetof(struct flb_in_fw_config, tag_prefix), + "Prefix incoming tag with the defined value." + }, { FLB_CONFIG_MAP_STR, "unix_path", NULL, 0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_path), diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 6be7e11fd84..6ce9549d0eb 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -33,6 +33,8 @@ struct flb_in_fw_config { char *listen; /* Listen interface */ char *tcp_port; /* TCP Port */ + flb_sds_t tag_prefix; /* tag prefix */ + /* Unix Socket (TCP only) */ char *unix_path; /* Unix path for socket */ diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index b0367e2902d..54323f4d68c 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -249,6 +249,7 @@ int fw_prot_process(struct fw_conn *conn) int c = 0; size_t chunk_id = -1; const char *stag; + flb_sds_t out_tag = NULL; size_t bytes; size_t buf_off = 0; size_t recv_len; @@ -271,6 +272,11 @@ int fw_prot_process(struct fw_conn *conn) * [tag, [[time,record], [time,record], ...]] */ + out_tag = flb_sds_create_size(1024); + if (!out_tag) { + return -1; + } + unp = msgpack_unpacker_new(1024); msgpack_unpacked_init(&result); conn->rest = conn->buf_len; @@ -288,6 +294,7 @@ int fw_prot_process(struct fw_conn *conn) conn->buf_len - all_used); conn->buf_len -= all_used; } + flb_sds_destroy(out_tag); return 0; } @@ -312,6 +319,7 @@ int fw_prot_process(struct fw_conn *conn) /* Cleanup buffers */ msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -346,6 +354,7 @@ int fw_prot_process(struct fw_conn *conn) root.type); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -354,6 +363,7 @@ int fw_prot_process(struct fw_conn *conn) "parser: array of invalid size, skip."); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -364,12 +374,19 @@ int fw_prot_process(struct fw_conn *conn) "parser: invalid tag format, skip."); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } - stag = tag.via.str.ptr; stag_len = tag.via.str.size; + /* Copy the tag to the new buffer, prefix it if required */ + if (ctx->tag_prefix) { + flb_sds_cat_safe(&out_tag, + ctx->tag_prefix, flb_sds_len(ctx->tag_prefix)); + } + flb_sds_cat_safe(&out_tag, stag, stag_len); + entry = root.via.array.ptr[1]; if (entry.type == MSGPACK_OBJECT_ARRAY) { @@ -384,12 +401,14 @@ int fw_prot_process(struct fw_conn *conn) flb_plg_debug(ctx->ins, "invalid options field"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } /* Process array */ fw_process_array(conn->in, conn, - stag, stag_len, &root, &entry, chunk_id); + out_tag, flb_sds_len(out_tag), + &root, &entry, chunk_id); } else if (entry.type == MSGPACK_OBJECT_POSITIVE_INTEGER || entry.type == MSGPACK_OBJECT_EXT) { @@ -402,6 +421,7 @@ int fw_prot_process(struct fw_conn *conn) flb_plg_warn(ctx->ins, "invalid data format, map expected"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -412,6 +432,7 @@ int fw_prot_process(struct fw_conn *conn) flb_plg_debug(ctx->ins, "invalid options field"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -424,7 +445,8 @@ int fw_prot_process(struct fw_conn *conn) msgpack_pack_object(&mp_pck, map); /* Register data object */ - flb_input_chunk_append_raw(conn->in, stag, stag_len, + flb_input_chunk_append_raw(conn->in, + out_tag, flb_sds_len(out_tag), mp_sbuf.data, mp_sbuf.size); msgpack_sbuffer_destroy(&mp_sbuf); c++; @@ -448,6 +470,7 @@ int fw_prot_process(struct fw_conn *conn) flb_plg_debug(ctx->ins, "invalid options field"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -466,6 +489,7 @@ int fw_prot_process(struct fw_conn *conn) flb_plg_error(ctx->ins, "invalid 'compressed' option"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } @@ -476,18 +500,19 @@ int fw_prot_process(struct fw_conn *conn) flb_plg_error(ctx->ins, "gzip uncompress failure"); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); return -1; } /* Append uncompressed data */ flb_input_chunk_append_raw(conn->in, - stag, stag_len, + out_tag, flb_sds_len(out_tag), gz_data, gz_size); flb_free(gz_data); } else { flb_input_chunk_append_raw(conn->in, - stag, stag_len, + out_tag, flb_sds_len(out_tag), data, len); } @@ -512,6 +537,7 @@ int fw_prot_process(struct fw_conn *conn) msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); switch (ret) { case MSGPACK_UNPACK_EXTRA_BYTES: