Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_emitter: added automatic threading detection workaround #7450

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_ring_buffer.h>


#include <sys/types.h>
#include <sys/stat.h>

#define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000

struct em_chunk {
flb_sds_t tag;
struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */
Expand Down Expand Up @@ -108,24 +110,34 @@ int in_emitter_add_record(const char *tag, int tag_len,
const char *buf_data, size_t buf_size,
struct flb_input_instance *in)
{
struct em_chunk temporary_chunk;
struct mk_list *head;
struct em_chunk *ec = NULL;
struct em_chunk *ec;
struct flb_emitter *ctx;
int ret;

ctx = (struct flb_emitter *) in->context;
ec = NULL;

/* Use the ring buffer first if it exists */
if (ctx->msgs) {
ec = flb_calloc(1, sizeof(struct em_chunk));
if (ec == NULL) {
memset(&temporary_chunk, 0, sizeof(struct em_chunk));

temporary_chunk.tag = flb_sds_create_len(tag, tag_len);

if (temporary_chunk.tag == NULL) {
flb_plg_error(ctx->ins,
"cannot allocate memory for tag: %s",
tag);
return -1;
}
ec->tag = flb_sds_create_len(tag, tag_len);
msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size);
ret = flb_ring_buffer_write(ctx->msgs, (void *)ec, sizeof(struct em_chunk));
flb_free(ec);
return ret;

msgpack_sbuffer_init(&temporary_chunk.mp_sbuf);
msgpack_sbuffer_write(&temporary_chunk.mp_sbuf, buf_data, buf_size);

return flb_ring_buffer_write(ctx->msgs,
(void *) &temporary_chunk,
sizeof(struct em_chunk));
}

/* Check if any target chunk already exists */
Expand Down Expand Up @@ -207,9 +219,11 @@ static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct fl
static int cb_emitter_init(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
struct flb_sched *scheduler;
struct flb_emitter *ctx;
int ret;

scheduler = flb_sched_ctx_get();

ctx = flb_calloc(1, sizeof(struct flb_emitter));
if (!ctx) {
Expand All @@ -225,6 +239,18 @@ static int cb_emitter_init(struct flb_input_instance *in,
return -1;
}

if (scheduler != config->sched &&
scheduler != NULL &&
ctx->ring_buffer_size == 0) {

ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;

flb_plg_debug(in,
"threaded emitter instances require ring_buffer_size"
" being set, using default value of %u",
ctx->ring_buffer_size);
}

if (ctx->ring_buffer_size > 0) {
ret = in_emitter_start_ring_buffer(in, ctx);
if (ret == -1) {
Expand Down