diff --git a/src/flb_metrics_exporter.c b/src/flb_metrics_exporter.c index 2e1b98f21a4..31a6dcf902b 100644 --- a/src/flb_metrics_exporter.c +++ b/src/flb_metrics_exporter.c @@ -266,9 +266,10 @@ int flb_me_destroy(struct flb_me *me) struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) { int ret; - struct mk_list *head; + struct mk_list *head, *processor_head; struct flb_input_instance *i; /* inputs */ - struct flb_filter_instance *f; /* filter */ + struct flb_processor_unit *pu; /* processors */ + struct flb_filter_instance *f, *pf; /* filter */ struct flb_output_instance *o; /* output */ struct cmt *cmt; @@ -308,6 +309,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) cmt_destroy(cmt); return NULL; } + + mk_list_foreach(processor_head, &i->processor->logs) { + pu = mk_list_entry(processor_head, struct flb_processor_unit, _head); + if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { + pf = (struct flb_filter_instance *) pu->ctx; + ret = cmt_cat(cmt, pf->cmt); + if (ret == -1) { + flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf)); + cmt_destroy(cmt); + return NULL; + } + } + } } mk_list_foreach(head, &ctx->filters) { @@ -330,6 +344,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) cmt_destroy(cmt); return NULL; } + + mk_list_foreach(processor_head, &o->processor->logs) { + pu = mk_list_entry(processor_head, struct flb_processor_unit, _head); + if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { + pf = (struct flb_filter_instance *) pu->ctx; + ret = cmt_cat(cmt, pf->cmt); + if (ret == -1) { + flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf)); + cmt_destroy(cmt); + return NULL; + } + } + } } return cmt; diff --git a/src/flb_processor.c b/src/flb_processor.c index a756940861a..eeb43606459 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -442,6 +442,13 @@ int flb_processor_run(struct flb_processor *proc, struct flb_filter_instance *f_ins; struct flb_processor_instance *p_ins; struct flb_mp_chunk_cobj *chunk_cobj = NULL; +#ifdef FLB_HAVE_METRICS + int in_records = 0; + int out_records = 0; + int diff = 0; + uint64_t ts; + char *name; +#endif if (type == FLB_PROCESSOR_LOGS) { list = &proc->logs; @@ -453,6 +460,11 @@ int flb_processor_run(struct flb_processor *proc, list = &proc->traces; } +#ifdef FLB_HAVE_METRICS + /* timestamp */ + ts = cfl_time_now(); +#endif + /* set current data buffer */ cur_buf = data; cur_size = data_size; @@ -493,7 +505,17 @@ int flb_processor_run(struct flb_processor *proc, proc->data, /* (input/output) instance context */ f_ins->context, /* filter context */ proc->config); - +#ifdef FLB_HAVE_METRICS + name = (char *) (flb_filter_name(f_ins)); + in_records = flb_mp_count(cur_buf, cur_size); + cmt_counter_add(f_ins->cmt_records, ts, in_records, + 1, (char *[]) {name}); + cmt_counter_add(f_ins->cmt_bytes, ts, tmp_size, + 1, (char *[]) {name}); + + flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, tmp_size, f_ins->metrics); +#endif /* * The cb_filter() function return status tells us if something changed * during it process. The possible values are: @@ -520,6 +542,15 @@ int flb_processor_run(struct flb_processor *proc, *out_buf = NULL; *out_size = 0; +#ifdef FLB_HAVE_METRICS + /* cmetrics */ + cmt_counter_add(f_ins->cmt_drop_records, ts, in_records, + 1, (char *[]) {name}); + + /* [OLD] Summarize all records removed */ + flb_metrics_sum(FLB_METRIC_N_DROPPED, + in_records, f_ins->metrics); +#endif release_lock(&pu->lock, FLB_PROCESSOR_LOCK_RETRY_LIMIT, FLB_PROCESSOR_LOCK_RETRY_DELAY); @@ -530,6 +561,32 @@ int flb_processor_run(struct flb_processor *proc, /* set new buffer */ cur_buf = tmp_buf; cur_size = tmp_size; + out_records = flb_mp_count(tmp_buf, tmp_size); +#ifdef FLB_HAVE_METRICS + if (out_records > in_records) { + diff = (out_records - in_records); + + /* cmetrics */ + cmt_counter_add(f_ins->cmt_add_records, ts, diff, + 1, (char *[]) {name}); + + /* [OLD] Summarize new records */ + flb_metrics_sum(FLB_METRIC_N_ADDED, + diff, f_ins->metrics); + } + else if (out_records < in_records) { + diff = (in_records - out_records); + + /* cmetrics */ + cmt_counter_add(f_ins->cmt_drop_records, ts, diff, + 1, (char *[]) {name}); + + /* [OLD] Summarize dropped records */ + flb_metrics_sum(FLB_METRIC_N_DROPPED, + diff, f_ins->metrics); + } +#endif + } else if (ret == FLB_FILTER_NOTOUCH) { /* keep original data, do nothing */