Skip to content

Commit

Permalink
core: minimal synchronous scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala authored and edsiper committed Nov 25, 2022
1 parent cae9a8f commit 9f569b0
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 8 deletions.
13 changes: 12 additions & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
#define FLB_OUTPUT_NET 32 /* output address may set host and port */
#define FLB_OUTPUT_PLUGIN_CORE 0
#define FLB_OUTPUT_PLUGIN_PROXY 1
#define FLB_OUTPUT_NO_MULTIPLEX 512
#define FLB_OUTPUT_NO_MULTIPLEX 512 /* run one task at a time, one task per flush */
#define FLB_OUTPUT_PRIVATE 1024
#define FLB_OUTPUT_SYNCHRONOUS 2048 /* run one task at a time, no flush cycle limit */


/* Event type handlers */
Expand Down Expand Up @@ -357,6 +358,7 @@ struct flb_output_instance {
* loaded (in backlog)
*/
size_t fs_backlog_chunks_size;

/*
* Buffer limit: optional limit set by configuration so this output instance
* cannot buffer more than total_limit_size (bytes unit).
Expand All @@ -367,6 +369,9 @@ struct flb_output_instance {
*/
size_t total_limit_size;

/* Queue for singleplexed tasks */
struct flb_task_queue *singleplex_queue;

/* Thread Pool: this is optional for the caller */
int tp_workers;
struct flb_tp *tp;
Expand Down Expand Up @@ -721,6 +726,12 @@ struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
int out_id);
int flb_output_flush_finished(struct flb_config *config, int out_id);

int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
struct flb_task_retry *retry,
struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config);
int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue);
struct flb_output_instance *flb_output_new(struct flb_config *config,
const char *output, void *data,
int public_only);
Expand Down
30 changes: 30 additions & 0 deletions include/fluent-bit/flb_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,34 @@ struct flb_task {
struct flb_config *config; /* parent flb config */
};

/*
* A queue of flb_task_enqueued tasks
*
* This structure is currently used to track pending flushes when FLB_OUTPUT_SYNCHRONOUS
* is used.
*/
struct flb_task_queue {
struct mk_list pending;
struct mk_list in_progress;
};

/*
* An enqueued task is a task that is not yet dispatched to a thread
* or started on the engine.
*
* There may be multiple enqueued instances of the same task on different out instances.
*
* This structure is currently used to track pending flushes when FLB_OUTPUT_SYNCHRONOUS
* is used.
*/
struct flb_task_enqueued {
struct flb_task *task;
struct flb_task_retry *retry;
struct flb_output_instance *out_instance;
struct flb_config *config;
struct mk_list _head;
};

int flb_task_running_count(struct flb_config *config);
int flb_task_running_print(struct flb_config *config);

Expand All @@ -104,6 +132,8 @@ void flb_task_add_coro(struct flb_task *task, struct flb_coro *coro);

void flb_task_destroy(struct flb_task *task, int del);

struct flb_task_queue* flb_task_queue_create();
void flb_task_queue_destroy(struct flb_task_queue *queue);
struct flb_task_retry *flb_task_retry_create(struct flb_task *task,
struct flb_output_instance *ins);

Expand Down
7 changes: 7 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
}
name = (char *) flb_output_name(ins);

/* If we are in synchronous mode, flush the next waiting task */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
if (ret == FLB_OK || ret == FLB_RETRY || ret == FLB_ERROR) {
flb_output_task_singleplex_flush_next(ins->singleplex_queue);
}
}

/* A task has finished, delete it */
if (ret == FLB_OK) {
/* cmetrics */
Expand Down
38 changes: 31 additions & 7 deletions src/flb_engine_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,25 @@ int flb_engine_dispatch_retry(struct flb_task_retry *retry,
flb_event_chunk_update(task->event_chunk, buf_data, buf_size);

/* flush the task */
ret = flb_output_task_flush(task, retry->o_ins, config);
if (ret == -1) {
flb_task_retry_destroy(retry);
return -1;
if (retry->o_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
/*
* If the plugin doesn't allow for multiplexing.
* singleplex_enqueue deletes retry context on flush or delayed flush failure
*/
ret = flb_output_task_singleplex_enqueue(retry->o_ins->singleplex_queue, retry,
task, retry->o_ins, config);
if (ret == -1) {
return -1;
}
}
else {
ret = flb_output_task_flush(task, retry->o_ins, config);
if (ret == -1) {
flb_task_retry_destroy(retry);
return -1;
}
}

return 0;
}

Expand Down Expand Up @@ -183,10 +197,20 @@ static int tasks_start(struct flb_input_instance *in,
hits++;

/*
* We have the Task and the Route, created a thread context for the
* data handling.
* If the plugin is in synchronous mode, enqueue the task and flush
* when appropriate.
*/
flb_output_task_flush(task, route->out, config);
if (out->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_output_task_singleplex_enqueue(route->out->singleplex_queue, NULL,
task, route->out, config);
}
else {
/*
* We have the Task and the Route, created a thread context for the
* data handling.
*/
flb_output_task_flush(task, route->out, config);
}

/*
th = flb_output_thread(task,
Expand Down
157 changes: 157 additions & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,125 @@ void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
mk_list_add(&out_flush->_head, &ins->flush_list);
}

/*
* Queue a task to be flushed at a later time
* Deletes retry context if enqueue fails
*/
static int flb_output_task_queue_enqueue(struct flb_task_queue *queue,
struct flb_task_retry *retry,
struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config)
{
struct flb_task_enqueued *queued_task;

queued_task = flb_malloc(sizeof(struct flb_task_enqueued));
if (!queued_task) {
flb_errno();
if (retry) {
flb_task_retry_destroy(retry);
}
return -1;
}
queued_task->retry = retry;
queued_task->out_instance = out_ins;
queued_task->task = task;
queued_task->config = config;

mk_list_add(&queued_task->_head, &queue->pending);
return 0;
}

/*
* Pop task from pending queue and flush it
* Will delete retry context if flush fails
*/
static int flb_output_task_queue_flush_one(struct flb_task_queue *queue)
{
struct flb_task_enqueued *queued_task;
int ret;
int is_empty;

is_empty = mk_list_is_empty(&queue->pending) == 0;
if (is_empty) {
flb_error("Attempting to flush task from an empty in_progress queue");
return -1;
}

queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head);
mk_list_del(&queued_task->_head);
mk_list_add(&queued_task->_head, &queue->in_progress);
ret = flb_output_task_flush(queued_task->task,
queued_task->out_instance,
queued_task->config);

/* Destroy retry context if needed */
if (ret == -1) {
if (queued_task->retry) {
flb_task_retry_destroy(queued_task->retry);
}
/* Flush the next task */
flb_output_task_singleplex_flush_next(queue);
return -1;
}

return ret;
}

/*
* Will either run or queue running a single task
* Deletes retry context if enqueue fails
*/
int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
struct flb_task_retry *retry,
struct flb_task *task,
struct flb_output_instance *out_ins,
struct flb_config *config)
{
int ret;
int is_empty;

/* Enqueue task */
ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config);
if (ret == -1) {
return -1;
}

/* Launch task if nothing is running */
is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0;
if (is_empty) {
return flb_output_task_queue_flush_one(out_ins->singleplex_queue);
}

return 0;
}

/*
* Clear in progress task and flush a single queued task if exists
* Deletes retry context on next flush if flush fails
*/
int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue)
{
int is_empty;
struct flb_task_enqueued *ended_task;

/* Remove in progress task */
is_empty = mk_list_is_empty(&queue->in_progress) == 0;
if (!is_empty) {
ended_task = mk_list_entry_first(&queue->in_progress,
struct flb_task_enqueued, _head);
mk_list_del(&ended_task->_head);
flb_free(ended_task);
}

/* Flush if there is a pending task queued */
is_empty = mk_list_is_empty(&queue->pending) == 0;
if (!is_empty) {
return flb_output_task_queue_flush_one(queue);
}
return 0;
}

/*
* Flush a task through the output plugin, either using a worker thread + coroutine
* or a simple co-routine in the current thread.
Expand All @@ -191,6 +310,11 @@ int flb_output_task_flush(struct flb_task *task,
ret = flb_output_thread_pool_flush(task, out_ins, config);
if (ret == -1) {
flb_task_users_dec(task, FLB_FALSE);

/* If we are in synchronous mode, flush one waiting task */
if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
}
}
}
else {
Expand All @@ -208,6 +332,14 @@ int flb_output_task_flush(struct flb_task *task,
sizeof(struct flb_output_flush*));
if (ret == -1) {
flb_errno();
flb_output_flush_destroy(out_flush);
flb_task_users_dec(task, FLB_FALSE);

/* If we are in synchronous mode, flush one waiting task */
if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
}

return -1;
}
}
Expand Down Expand Up @@ -285,6 +417,11 @@ int flb_output_instance_destroy(struct flb_output_instance *ins)
/* release properties */
flb_output_free_properties(ins);

/* free singleplex queue */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(ins->singleplex_queue);
}

mk_list_del(&ins->_head);
flb_free(ins);

Expand Down Expand Up @@ -461,6 +598,9 @@ struct flb_output_instance *flb_output_new(struct flb_config *config,
instance->p = plugin;
instance->callback = flb_callback_create(instance->name);
if (!instance->callback) {
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(instance->singleplex_queue);
}
flb_free(instance);
return NULL;
}
Expand All @@ -474,6 +614,9 @@ struct flb_output_instance *flb_output_new(struct flb_config *config,
ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
if (!ctx) {
flb_errno();
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(instance->singleplex_queue);
}
flb_free(instance);
return NULL;
}
Expand Down Expand Up @@ -527,11 +670,25 @@ struct flb_output_instance *flb_output_new(struct flb_config *config,
if (plugin->flags & FLB_OUTPUT_NET) {
ret = flb_net_host_set(plugin->name, &instance->host, output);
if (ret != 0) {
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
flb_task_queue_destroy(instance->singleplex_queue);
}
flb_free(instance);
return NULL;
}
}

/* Create singleplex queue if SYNCHRONOUS mode is used */
instance->singleplex_queue = NULL;
if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
instance->singleplex_queue = flb_task_queue_create();
if (!instance->singleplex_queue) {
flb_free(instance);
flb_errno();
return NULL;
}
}

flb_kv_init(&instance->properties);
flb_kv_init(&instance->net_properties);
mk_list_init(&instance->upstreams);
Expand Down
Loading

0 comments on commit 9f569b0

Please sign in to comment.