Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_multiline: implement Docker partial_message support #5037

Closed
wants to merge 3 commits into from

Conversation

PettitWesley
Copy link
Contributor

Signed-off-by: Wesley Pettit [email protected]


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@PettitWesley
Copy link
Contributor Author

@edsiper there won't be another 1.8 version right? So I just need to submit a PR to master for this instead..

@patrick-stephens
Copy link
Contributor

@edsiper there won't be another 1.8 version right? So I just need to submit a PR to master for this instead..

I think the plan is still some maintenance releases @PettitWesley, I've submitted a few PRs for 1.8 for example.

@edsiper
Copy link
Member

edsiper commented Mar 11, 2022

multiline core feature already supports partial docker messages, what is different on this PR ?

@PettitWesley
Copy link
Contributor Author

@edsiper The logic is very different between your existing code for parsing split docker logs in json log files: https://github.com/fluent/fluent-bit/blob/master/src/multiline/flb_ml_parser_docker.c

And my code which is meant to be used with the fluentd docker log driver. The code I have added here is similar to the fluentd-plugin-concat and the underlying logic is much different. See this comment on my design and the sections of my design: #4309 (comment)

https://github.com/fluent-plugins-nursery/fluent-plugin-concat

Copy link
Contributor

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Added some comments for clairification


sched = flb_sched_ctx_get();

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this be the (flush_ms / x) where (flush_ms / x) is less than grace? And the cb timer altered to only run if in shutdown phase or every x runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush timer can't help on shutdown. I need another solution for that (which will come in a separate PR and I will do a separate write up in an issue on that for Eduardo, ping me if you want a verbal explanation before I get to it).

Dividing by X is potentially a good idea since then we are more likely to flush closer to when a log has exceeded flush_ms. I'll change this, I'm gonna make X be 2.

Comment on lines 59 to 77
msgpack_object_kv *get_key(msgpack_object *map, char *check_for_key);
int is_partial(msgpack_object *map);
int is_partial_last(msgpack_object *map);
int get_partial_id(msgpack_object *map,
char **partial_id_str,
size_t *partial_id_size);
struct split_message_packer *get_packer(struct mk_list *packers, const char *tag,
char *input_name,
char *partial_id_str, size_t partial_id_size);
struct split_message_packer *create_packer(const char *tag, char *input_name,
char *partial_id_str, size_t partial_id_size,
msgpack_object *map, char *multiline_key_content,
struct flb_time *tm);
int split_message_packer_write(struct split_message_packer *packer,
msgpack_object *map, char *multiline_key_content);
void split_message_packer_complete(struct split_message_packer *packer);
void split_message_packer_destroy(struct split_message_packer *packer);
void append_complete_record(char *data, size_t bytes, msgpack_packer *tmp_pck);
unsigned long long current_timestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may want to prefix these function names with flb_ml_concat_ or similar to avoid polluting the global namespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I keep forgetting about this. I wish C functions weren't in a global namespace...

Comment on lines +255 to +285
for(i=0; i < map->via.map.size; i++) {
if ((kv+i) == split_kv) {
continue;
}

key = (kv+i)->key;
if (key.type == MSGPACK_OBJECT_BIN) {
key_str = (char *) key.via.bin.ptr;
key_str_size = key.via.bin.size;
check_key = FLB_TRUE;
}
if (key.type == MSGPACK_OBJECT_STR) {
key_str = (char *) key.via.str.ptr;
key_str_size = key.via.str.size;
check_key = FLB_TRUE;
}

len = FLB_MULTILINE_PARTIAL_PREFIX_LEN;
if (key_str_size < len) {
len = key_str_size;
}

if (check_key == FLB_TRUE) {
if (strncmp(FLB_MULTILINE_PARTIAL_PREFIX, key_str, len) == 0) {
/* don't pack the partial keys */
continue;
}
}

map_size++;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated code 3 times. Would be nice to somehow extract for maintenance purposes. Maybe difficult.

Comment on lines 350 to 360
val = kv->val;
if (val.type == MSGPACK_OBJECT_BIN) {
val_str = (char *) val.via.bin.ptr;
val_str_size = val.via.bin.size;
}
if (val.type == MSGPACK_OBJECT_STR) {
val_str = (char *) val.via.str.ptr;
val_str_size = val.via.str.size;
}

flb_sds_cat_safe(&packer->buf, val_str, val_str_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe recommend discarding if not of type binary or string. This should, I guess, never happen because we trust the contents of the multiline value to be a string/binary, but what if it unexpectedly happens to be something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, that's safer.

Comment on lines +409 to +413
if (packer->mp_sbuf.data) {
msgpack_sbuffer_destroy(&packer->mp_sbuf);
}

flb_free(packer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does packer->mp_pck need to be freed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT it does not. The buffer is the actual memory, the packer seems to be like a helper. The existing code for multiline has a packer and a buffer and it only frees the buffer. Also I tested this with valgrind and it didn't show a memory leak.

Comment on lines +639 to +642
/* record passed from filter as-is */
msgpack_pack_array(&tmp_pck, 2);
flb_time_append_to_msgpack(&tm, &tmp_pck, 0);
msgpack_pack_object(&tmp_pck, *obj);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if for consistency, these non_partial logs should be re-emitted. That way if the multiline filter is not the first filter, all the logs will have prior filters applied twice, rather than only the multiline logs.

Also to consider is efficiency, which advocates for the current solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. See that emit only occurs after timeout, so it shouldn't regularly happen. Current solution seems fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too myself... but returning directly from the filter is more efficient... also I think most of hte filters can be safely passed through twice... this is also something I can fix if it turns out it does have weird side effects and someone complains.

There is an issue right now open that under high throughput the emitter fills up quickly... and remember the use case here is explicitly for huge logs, so I htink I want to stick with only using the emitter when needed.

}
mk_list_add(&packer->_head, &ctx->split_message_packers);
}
ret = split_message_packer_write(packer, obj, "log");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May prefer some #define for "log" string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also on line 609

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh boy. This is actually a bug. Thanks. The key is not guaranteed to be "log"... it is user configured.

Comment on lines +655 to +656
*out_buf = tmp_sbuf.data;
*out_bytes = tmp_sbuf.size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to the old out_buf, and out_bytes? This gets cleaned up properly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may also need to call free(tmp_sbuf) The data is still used, so msgpack_sbuffer_free might not be able to be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way this works is that you return the memory to the caller, and then the caller frees. If you look in the code in I think flb_filter.c it frees the returned buf. All of the filters work this way.

split_message_packer_complete(packer);
/* re-emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear how cycles are avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the beginning of cb_ml_filter:

    if (i_ins == ctx->ins_emitter) {
        flb_plg_trace(ctx->ins, "not processing records from the emitter");
        return FLB_FILTER_NOTOUCH;
    }

Comment on lines +220 to +224
} else {
flb_plg_error(ins, "'Mode' must be '%s' or '%s'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE,
FLB_MULTILINE_MODE_PARSER);
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is flb_free(ctx) needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its stylistically better yes. Strictly speaking, if you fail plugin init, then Fluent Bit shuts down, and so freeing memory isn't needed. Lots of the plugins don't free on init failure.

@lubingfeng
Copy link

@edsiper @koleini @fujimotos can we get this reviewed so that we can merge this? We get customers waiting for this fix.

@matthewfala
Copy link
Contributor

@lubingfeng Thanks for checking. Let me take a second look.

Copy link
Contributor

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minor changes look good, and comments towards most of the other issues make sense!

I personally feel that the multiline function prefix should be something like flb_ml_concat_..., rather than just ml to follow the prefix convention set forth in flb_ml_parser.h/c, flb_ml_rule.h/c and other fluent bit files, and also to avoid naming collision with other libraries via the flb_ prefix. This is up to you and Eduardo though.

Not sure if you care about this style. But if you do, line 278 has an invisible extra indent in ml.c: https://github.com/fluent/fluent-bit/pull/5037/files#diff-d882420079c3f1dedc230e33a45fb81bf06a4deae9e2df11bd8ecaf509c73b0dR278

Lines 359 to 363 in ml_concat.c have some extra indents:
https://github.com/fluent/fluent-bit/pull/5037/files#diff-f5839e19406e810f65cbda8799c752423d8fb3c5ff50de0e220864d56d7b7c87R359-R363

Approving.

@matthewfala
Copy link
Contributor

Approved with some optional change requests @lubingfeng

@PettitWesley
Copy link
Contributor Author

@edsiper we actually should not merge this PR into 1.8 since 1.9 series has started and this is not a bug fix it is a new feature. However I can not put up a PR against master for this change until this PR is merged: #4671

@PettitWesley
Copy link
Contributor Author

We will release this in 1.9 instead, and I will open a PR there once #4671 is merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants