Skip to content

Commit

Permalink
in_forward: add new option 'tag_prefix' (fluent#4455) (fluent#4466)
Browse files Browse the repository at this point in the history
This patch adds a new option called 'tag_prefix' that allows to prefix the
incoming Tag set by the remote client.

Usage:

--- config file ---

 [SERVICE]
     flush        1
     log_level    info

 [INPUT]
     name         forward
     tag_prefix   new.

 [OUTPUT]
     name         stdout
     match        *

--- end ---

> terminal 1:

  $ echo "{\"key\": \"test\"}" | fluent-cat test

> terminal 2:

  $ bin/fluent-bit -i forward -p tag_prefix=new. -o stdout -f 1
  ...
  [0] new.test: [1639626354.501658295, {"key"=>"test"}]

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper authored and 0Delta committed Jan 20, 2022
1 parent 0dde062 commit e81fcb0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
5 changes: 5 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ static int in_fw_exit(void *data, struct flb_config *config)

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "tag_prefix", NULL,
0, FLB_TRUE, offsetof(struct flb_in_fw_config, tag_prefix),
"Prefix incoming tag with the defined value."
},
{
FLB_CONFIG_MAP_STR, "unix_path", NULL,
0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_path),
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct flb_in_fw_config {
char *listen; /* Listen interface */
char *tcp_port; /* TCP Port */

flb_sds_t tag_prefix; /* tag prefix */

/* Unix Socket (TCP only) */
char *unix_path; /* Unix path for socket */

Expand Down
36 changes: 31 additions & 5 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ int fw_prot_process(struct fw_conn *conn)
int c = 0;
size_t chunk_id = -1;
const char *stag;
flb_sds_t out_tag = NULL;
size_t bytes;
size_t buf_off = 0;
size_t recv_len;
Expand All @@ -271,6 +272,11 @@ int fw_prot_process(struct fw_conn *conn)
* [tag, [[time,record], [time,record], ...]]
*/

out_tag = flb_sds_create_size(1024);
if (!out_tag) {
return -1;
}

unp = msgpack_unpacker_new(1024);
msgpack_unpacked_init(&result);
conn->rest = conn->buf_len;
Expand All @@ -288,6 +294,7 @@ int fw_prot_process(struct fw_conn *conn)
conn->buf_len - all_used);
conn->buf_len -= all_used;
}
flb_sds_destroy(out_tag);
return 0;
}

Expand All @@ -312,6 +319,7 @@ int fw_prot_process(struct fw_conn *conn)
/* Cleanup buffers */
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);

return -1;
}
Expand Down Expand Up @@ -346,6 +354,7 @@ int fw_prot_process(struct fw_conn *conn)
root.type);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -354,6 +363,7 @@ int fw_prot_process(struct fw_conn *conn)
"parser: array of invalid size, skip.");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -364,12 +374,19 @@ int fw_prot_process(struct fw_conn *conn)
"parser: invalid tag format, skip.");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

stag = tag.via.str.ptr;
stag_len = tag.via.str.size;

/* Copy the tag to the new buffer, prefix it if required */
if (ctx->tag_prefix) {
flb_sds_cat_safe(&out_tag,
ctx->tag_prefix, flb_sds_len(ctx->tag_prefix));
}
flb_sds_cat_safe(&out_tag, stag, stag_len);

entry = root.via.array.ptr[1];

if (entry.type == MSGPACK_OBJECT_ARRAY) {
Expand All @@ -384,12 +401,14 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_debug(ctx->ins, "invalid options field");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

/* Process array */
fw_process_array(conn->in, conn,
stag, stag_len, &root, &entry, chunk_id);
out_tag, flb_sds_len(out_tag),
&root, &entry, chunk_id);
}
else if (entry.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
entry.type == MSGPACK_OBJECT_EXT) {
Expand All @@ -402,6 +421,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_warn(ctx->ins, "invalid data format, map expected");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -412,6 +432,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_debug(ctx->ins, "invalid options field");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -424,7 +445,8 @@ int fw_prot_process(struct fw_conn *conn)
msgpack_pack_object(&mp_pck, map);

/* Register data object */
flb_input_chunk_append_raw(conn->in, stag, stag_len,
flb_input_chunk_append_raw(conn->in,
out_tag, flb_sds_len(out_tag),
mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
c++;
Expand All @@ -448,6 +470,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_debug(ctx->ins, "invalid options field");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -466,6 +489,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_error(ctx->ins, "invalid 'compressed' option");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -476,18 +500,19 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_error(ctx->ins, "gzip uncompress failure");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

/* Append uncompressed data */
flb_input_chunk_append_raw(conn->in,
stag, stag_len,
out_tag, flb_sds_len(out_tag),
gz_data, gz_size);
flb_free(gz_data);
}
else {
flb_input_chunk_append_raw(conn->in,
stag, stag_len,
out_tag, flb_sds_len(out_tag),
data, len);
}

Expand All @@ -512,6 +537,7 @@ int fw_prot_process(struct fw_conn *conn)

msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);

switch (ret) {
case MSGPACK_UNPACK_EXTRA_BYTES:
Expand Down

0 comments on commit e81fcb0

Please sign in to comment.