Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_forward: add new option 'tag_prefix' (#4455) #4466

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ static int in_fw_exit(void *data, struct flb_config *config)

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "tag_prefix", NULL,
0, FLB_TRUE, offsetof(struct flb_in_fw_config, tag_prefix),
"Prefix incoming tag with the defined value."
},
{
FLB_CONFIG_MAP_STR, "unix_path", NULL,
0, FLB_TRUE, offsetof(struct flb_in_fw_config, unix_path),
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct flb_in_fw_config {
char *listen; /* Listen interface */
char *tcp_port; /* TCP Port */

flb_sds_t tag_prefix; /* tag prefix */

/* Unix Socket (TCP only) */
char *unix_path; /* Unix path for socket */

Expand Down
36 changes: 31 additions & 5 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ int fw_prot_process(struct fw_conn *conn)
int c = 0;
size_t chunk_id = -1;
const char *stag;
flb_sds_t out_tag = NULL;
size_t bytes;
size_t buf_off = 0;
size_t recv_len;
Expand All @@ -271,6 +272,11 @@ int fw_prot_process(struct fw_conn *conn)
* [tag, [[time,record], [time,record], ...]]
*/

out_tag = flb_sds_create_size(1024);
if (!out_tag) {
return -1;
}

unp = msgpack_unpacker_new(1024);
msgpack_unpacked_init(&result);
conn->rest = conn->buf_len;
Expand All @@ -288,6 +294,7 @@ int fw_prot_process(struct fw_conn *conn)
conn->buf_len - all_used);
conn->buf_len -= all_used;
}
flb_sds_destroy(out_tag);
return 0;
}

Expand All @@ -312,6 +319,7 @@ int fw_prot_process(struct fw_conn *conn)
/* Cleanup buffers */
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);

return -1;
}
Expand Down Expand Up @@ -346,6 +354,7 @@ int fw_prot_process(struct fw_conn *conn)
root.type);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -354,6 +363,7 @@ int fw_prot_process(struct fw_conn *conn)
"parser: array of invalid size, skip.");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -364,12 +374,19 @@ int fw_prot_process(struct fw_conn *conn)
"parser: invalid tag format, skip.");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

stag = tag.via.str.ptr;
stag_len = tag.via.str.size;

/* Copy the tag to the new buffer, prefix it if required */
if (ctx->tag_prefix) {
flb_sds_cat_safe(&out_tag,
ctx->tag_prefix, flb_sds_len(ctx->tag_prefix));
}
flb_sds_cat_safe(&out_tag, stag, stag_len);

entry = root.via.array.ptr[1];

if (entry.type == MSGPACK_OBJECT_ARRAY) {
Expand All @@ -384,12 +401,14 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_debug(ctx->ins, "invalid options field");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

/* Process array */
fw_process_array(conn->in, conn,
stag, stag_len, &root, &entry, chunk_id);
out_tag, flb_sds_len(out_tag),
&root, &entry, chunk_id);
}
else if (entry.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
entry.type == MSGPACK_OBJECT_EXT) {
Expand All @@ -402,6 +421,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_warn(ctx->ins, "invalid data format, map expected");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -412,6 +432,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_debug(ctx->ins, "invalid options field");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -424,7 +445,8 @@ int fw_prot_process(struct fw_conn *conn)
msgpack_pack_object(&mp_pck, map);

/* Register data object */
flb_input_chunk_append_raw(conn->in, stag, stag_len,
flb_input_chunk_append_raw(conn->in,
out_tag, flb_sds_len(out_tag),
mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
c++;
Expand All @@ -448,6 +470,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_debug(ctx->ins, "invalid options field");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -466,6 +489,7 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_error(ctx->ins, "invalid 'compressed' option");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

Expand All @@ -476,18 +500,19 @@ int fw_prot_process(struct fw_conn *conn)
flb_plg_error(ctx->ins, "gzip uncompress failure");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}

/* Append uncompressed data */
flb_input_chunk_append_raw(conn->in,
stag, stag_len,
out_tag, flb_sds_len(out_tag),
gz_data, gz_size);
flb_free(gz_data);
}
else {
flb_input_chunk_append_raw(conn->in,
stag, stag_len,
out_tag, flb_sds_len(out_tag),
data, len);
}

Expand All @@ -512,6 +537,7 @@ int fw_prot_process(struct fw_conn *conn)

msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);

switch (ret) {
case MSGPACK_UNPACK_EXTRA_BYTES:
Expand Down