From 5bb47ac8ab67a93ae1995aa666ee0e7d51602453 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 27 Oct 2024 15:33:31 -0600 Subject: [PATCH] WIP: processor_content_modifier: add support for metrics scope Signed-off-by: Eduardo Silva --- .../processor_content_modifier/cm_config.c | 3 + plugins/processor_content_modifier/cm_logs.c | 111 +--------- .../processor_content_modifier/cm_metrics.c | 54 ++--- .../cm_opentelemetry.c | 189 ++++++++++++++++-- .../cm_opentelemetry.h | 3 +- tests/runtime/processor_content_modifier.c | 3 +- 6 files changed, 197 insertions(+), 166 deletions(-) diff --git a/plugins/processor_content_modifier/cm_config.c b/plugins/processor_content_modifier/cm_config.c index 95880bbb34f..70ba9915715 100644 --- a/plugins/processor_content_modifier/cm_config.c +++ b/plugins/processor_content_modifier/cm_config.c @@ -226,6 +226,9 @@ static int set_context(struct content_modifier_ctx *ctx) else if (strcasecmp(ctx->context_str, "otel_resource_attributes") == 0) { context = CM_CONTEXT_OTEL_RESOURCE_ATTR; } + else if (strcasecmp(ctx->context_str, "otel_scope_attributes") == 0) { + context = CM_CONTEXT_OTEL_SCOPE_ATTR; + } else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) { /* * scope name is restricted to specific actions, make sure the user diff --git a/plugins/processor_content_modifier/cm_logs.c b/plugins/processor_content_modifier/cm_logs.c index f54e56d0398..49e6e2fe08d 100644 --- a/plugins/processor_content_modifier/cm_logs.c +++ b/plugins/processor_content_modifier/cm_logs.c @@ -28,6 +28,7 @@ #include "cm.h" #include "cm_utils.h" +#include "cm_opentelemetry.h" #include @@ -268,133 +269,27 @@ static int run_action_convert(struct content_modifier_ctx *ctx, return 0; } -static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist) -{ - int ret; - struct cfl_list *head; - struct cfl_list *tmp; - struct cfl_kvpair *kvpair; - struct cfl_variant *val; - struct cfl_kvlist *kvlist_tmp; - - /* iterate resource to find the attributes field */ - cfl_list_foreach_safe(head, tmp, &kvlist->list) { - kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); - if (cfl_sds_len(kvpair->key) != 10) { - continue; - } - - if (strncmp(kvpair->key, "attributes", 10) == 0) { - val = kvpair->val; - if (val->type != CFL_VARIANT_KVLIST) { - return NULL; - } - - return val; - } - } - - /* create an empty kvlist as the value of attributes */ - kvlist_tmp = cfl_kvlist_create(); - if (!kvlist_tmp) { - return NULL; - } - - /* create the attributes kvpair */ - ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp); - if (ret != 0) { - cfl_kvlist_destroy(kvlist_tmp); - return NULL; - } - - /* get the last kvpair from the list */ - kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head); - if (!kvpair) { - return NULL; - } - - return kvpair->val; -} - static struct cfl_variant *otel_get_attributes(int context, struct flb_mp_chunk_record *record) { - int key_len; - const char *key_buf; - struct cfl_list *head; struct cfl_object *obj = NULL; - struct cfl_variant *val; struct cfl_kvlist *kvlist; - struct cfl_kvpair *kvpair; - struct cfl_variant *var_attr; - - if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) { - key_buf = "resource"; - key_len = 8; - } - else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) { - key_buf = "scope"; - key_len = 5; - } - else { - return NULL; - } obj = record->cobj_record; kvlist = obj->variant->data.as_kvlist; - cfl_list_foreach(head, &kvlist->list) { - kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); - - if (cfl_sds_len(kvpair->key) != key_len) { - continue; - } - - if (strncmp(kvpair->key, key_buf, key_len) == 0) { - val = kvpair->val; - if (val->type != CFL_VARIANT_KVLIST) { - return NULL; - } - - var_attr = otel_get_or_create_attributes(val->data.as_kvlist); - if (!var_attr) { - return NULL; - } - - return var_attr; - } - } - return NULL; + return cm_otel_get_attributes(CM_TELEMETRY_LOGS, context, kvlist); } static struct cfl_variant *otel_get_scope(struct flb_mp_chunk_record *record) { - struct cfl_list *head; struct cfl_object *obj; - struct cfl_variant *val; struct cfl_kvlist *kvlist; - struct cfl_kvpair *kvpair; obj = record->cobj_record; kvlist = obj->variant->data.as_kvlist; - cfl_list_foreach(head, &kvlist->list) { - kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); - - if (cfl_sds_len(kvpair->key) != 5) { - continue; - } - - if (strncmp(kvpair->key, "scope", 5) == 0) { - val = kvpair->val; - if (val->type != CFL_VARIANT_KVLIST) { - return NULL; - } - return val; - } - } - - return NULL; + return cm_otel_get_scope_metadata(CM_TELEMETRY_LOGS, kvlist); } int cm_logs_process(struct flb_processor_instance *ins, diff --git a/plugins/processor_content_modifier/cm_metrics.c b/plugins/processor_content_modifier/cm_metrics.c index c80db5cdd5c..d4e97aec533 100644 --- a/plugins/processor_content_modifier/cm_metrics.c +++ b/plugins/processor_content_modifier/cm_metrics.c @@ -260,12 +260,7 @@ int cm_metrics_process(struct flb_processor_instance *ins, const char *tag, int tag_len) { int ret = -1; - struct cfl_list *head; struct cfl_variant *var = NULL; - struct cfl_variant *val; - struct cfl_kvlist *kvlist; - struct cfl_kvpair *kvpair; - struct cfl_variant *var_attr; printf("\n\n==== BEFORE =====\n"); cfl_kvlist_print(stdout, in_cmt->internal_metadata); @@ -297,74 +292,55 @@ int cm_metrics_process(struct flb_processor_instance *ins, } var = NULL; - cfl_list_foreach(head, &in_cmt->external_metadata->list) { - kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); - - if (cfl_sds_len(kvpair->key) != 8 /* resource */) { - continue; - } - - if (strncmp(kvpair->key, "resource", 8) == 0) { - val = kvpair->val; - if (val->type != CFL_VARIANT_KVLIST) { - return FLB_PROCESSOR_FAILURE; - } - - var_attr = cm_otel_get_or_create_attributes(val->data.as_kvlist); - if (!var_attr) { - return FLB_PROCESSOR_FAILURE; - } - - if (var_attr->type != CFL_VARIANT_KVLIST) { - return FLB_PROCESSOR_FAILURE; - } - - var = var_attr; - break; //FiXME return ; - } + + var = cm_otel_get_attributes(CM_TELEMETRY_METRICS, ctx->context_type, in_cmt->external_metadata); + if (!var) { + return FLB_PROCESSOR_FAILURE; } } + else if (ctx->context_type == CM_CONTEXT_OTEL_SCOPE_ATTR) { + var = cm_otel_get_attributes(CM_TELEMETRY_METRICS, ctx->context_type, in_cmt->external_metadata); + } else if ((ctx->context_type == CM_CONTEXT_OTEL_SCOPE_NAME || ctx->context_type == CM_CONTEXT_OTEL_SCOPE_VERSION)) { - var = cm_otel_get_scope(in_cmt->external); + var = cm_otel_get_scope_metadata(CM_TELEMETRY_METRICS, in_cmt->external_metadata); } - if (!var) { return FLB_PROCESSOR_FAILURE; } if (ctx->action_type == CM_ACTION_INSERT) { ret = run_action_insert(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->value); - printf("\n run action insert: %i\n", ret); } else if (ctx->action_type == CM_ACTION_UPSERT) { ret = run_action_upsert(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->value); - printf("\n run action upsert: %i\n", ret); } else if (ctx->action_type == CM_ACTION_DELETE) { ret = run_action_delete(ctx, var->data.as_kvlist, tag, tag_len, ctx->key); - printf("\n run action delete: %i\n", ret); } else if (ctx->action_type == CM_ACTION_RENAME) { ret = run_action_rename(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->value); - printf("\n run action rename: %i\n", ret); } else if (ctx->action_type == CM_ACTION_HASH) { ret = run_action_hash(ctx, var->data.as_kvlist, tag, tag_len, ctx->key); - printf("\n run action hash: %i\n", ret); } else if (ctx->action_type == CM_ACTION_EXTRACT) { ret = run_action_extract(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->regex); - printf("\n run action extract: %i\n", ret); } else if (ctx->action_type == CM_ACTION_CONVERT) { ret = run_action_convert(ctx, var->data.as_kvlist, tag, tag_len, ctx->key, ctx->converted_type); - printf("\n run action convert: %i\n", ret); } if (ret != 0) { return FLB_PROCESSOR_FAILURE; } + printf("\n\n==== AFTER =====\n"); + cfl_kvlist_print(stdout, in_cmt->internal_metadata); + printf("\n"); + printf("-----external----\n"); + cfl_kvlist_print(stdout, in_cmt->external_metadata); + fflush(stdout); + return FLB_PROCESSOR_SUCCESS; } diff --git a/plugins/processor_content_modifier/cm_opentelemetry.c b/plugins/processor_content_modifier/cm_opentelemetry.c index 22b2394d1e1..0a37a89a2b5 100644 --- a/plugins/processor_content_modifier/cm_opentelemetry.c +++ b/plugins/processor_content_modifier/cm_opentelemetry.c @@ -80,36 +80,193 @@ struct cfl_variant *cm_otel_get_or_create_attributes(struct cfl_kvlist *kvlist) return kvpair->val; } -struct cfl_variant *cm_otel_get_scope(struct kvlist *kvlist) +static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist) { + int ret; struct cfl_list *head; - struct cfl_object *obj; - struct cfl_variant *val; - struct cfl_kvlist *kvlist; + struct cfl_list *tmp; struct cfl_kvpair *kvpair; + struct cfl_variant *val = NULL; + struct cfl_kvlist *kvlist_tmp; - if (!kvlist) { - return NULL; - } - - //obj = record->cobj_record; - //kvlist = obj->variant->data.as_kvlist; - cfl_list_foreach(head, &kvlist->list) { + /* iterate resource to find the attributes field */ + cfl_list_foreach_safe(head, tmp, &kvlist->list) { kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); - - if (cfl_sds_len(kvpair->key) != 5) { + if (cfl_sds_len(kvpair->key) != 10) { continue; } - if (strncmp(kvpair->key, "scope", 5) == 0) { + if (strncmp(kvpair->key, "attributes", 10) == 0) { val = kvpair->val; if (val->type != CFL_VARIANT_KVLIST) { return NULL; } - return val; } } - return NULL; + /* create an empty kvlist as the value of attributes */ + kvlist_tmp = cfl_kvlist_create(); + if (!kvlist_tmp) { + return NULL; + } + + /* create the attributes kvpair */ + ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp); + if (ret != 0) { + cfl_kvlist_destroy(kvlist_tmp); + return NULL; + } + + /* get the last kvpair from the list */ + kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head); + if (!kvpair) { + return NULL; + } + + return kvpair->val; +} + +/* + * get attributes for resources and scope, context must be one of: + * + * - CM_CONTEXT_OTEL_RESOURCE_ATTR + * - CM_CONTEXT_OTEL_SCOPE_ATTR + */ +struct cfl_variant *cm_otel_get_attributes(int telemetry_type, int context, struct cfl_kvlist *kvlist) +{ + int key_len; + const char *key_buf; + struct cfl_variant *var; + struct cfl_variant *var_attr = NULL; + + if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) { + key_buf = "resource"; + key_len = 8; + } + else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) { + key_buf = "scope"; + key_len = 5; + } + else { + return NULL; + } + + var = cfl_kvlist_fetch_s(kvlist, (char *) key_buf, key_len); + if (!var) { + return NULL; + } + + if (var->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + var_attr = otel_get_or_create_attributes(var->data.as_kvlist); + if (!var_attr) { + return NULL; + } + + return var_attr; +} + +static struct cfl_variant *otel_get_or_create_scope_metadata(int telemetry_type, struct cfl_kvlist *kvlist) +{ + int ret; + struct cfl_variant *var; + struct cfl_kvpair *kvpair; + struct cfl_kvlist *kvlist_tmp; + + /* kvlist is the value of 'scope', lookup for scope->metadata */ + + var = cfl_kvlist_fetch(kvlist, "metadata"); + if (var) { + if (var->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + return var; + } + + /* metadata don't exists, create it */ + kvlist_tmp = cfl_kvlist_create(); + if (!kvlist_tmp) { + return NULL; + } + + ret = cfl_kvlist_insert_kvlist_s(kvlist, "metadata", 8, kvlist_tmp); + if (ret != 0) { + cfl_kvlist_destroy(kvlist_tmp); + return NULL; + } + + kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head); + if (!kvpair) { + return NULL; + } + + return kvpair->val; +} + +/* + * Retrieve the kvlist that contains the scope metadata such as name and version, + * based on the telemetry type, the kvlist is expected to be in the following format: + * + * - Logs: scope -> {name, version} + * - Metrics: scope -> metadata -> {name, version} + * + * If the paths are not found, those are "created". + */ +struct cfl_variant *cm_otel_get_scope_metadata(int telemetry_type, struct cfl_kvlist *kvlist) +{ + int ret; + struct cfl_variant *var = NULL; + struct cfl_kvpair *kvpair = NULL; + struct cfl_kvlist *kvlist_tmp; + + if (!kvlist) { + return NULL; + } + + /* retrieve the scope if exists */ + var = cfl_kvlist_fetch(kvlist, "scope"); + if (var) { + if (var->type != CFL_VARIANT_KVLIST) { + /* if exists and is not valid just fail */ + return NULL; + } + } + else { + /* create "scope" inside kvlist */ + kvlist_tmp = cfl_kvlist_create(); + if (!kvlist_tmp) { + return NULL; + } + + ret = cfl_kvlist_insert_kvlist_s(kvlist, "scope", 5, kvlist_tmp); + if (ret != 0) { + cfl_kvlist_destroy(kvlist_tmp); + return NULL; + } + + kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head); + if (!kvpair) { + return NULL; + } + + var = kvpair->val; + } + + /* + * 'var' points to the value of 'scope', for logs telemetry data, just return + * the current variant, for metrics lookup for 'metadata' kvpair (or create it) + */ + + if (telemetry_type == CM_TELEMETRY_LOGS) { + return var; + } + else if (telemetry_type == CM_TELEMETRY_METRICS) { + var = otel_get_or_create_scope_metadata(telemetry_type, var->data.as_kvlist); + } + + return var; } diff --git a/plugins/processor_content_modifier/cm_opentelemetry.h b/plugins/processor_content_modifier/cm_opentelemetry.h index 04cfe2e130a..ddd8823f61b 100644 --- a/plugins/processor_content_modifier/cm_opentelemetry.h +++ b/plugins/processor_content_modifier/cm_opentelemetry.h @@ -22,6 +22,7 @@ #include -struct cfl_variant *cm_otel_get_or_create_attributes(struct cfl_kvlist *kvlist); +struct cfl_variant *cm_otel_get_attributes(int telemetry_type, int context, struct cfl_kvlist *kvlist); +struct cfl_variant *cm_otel_get_scope_metadata(int telemetry_type, struct cfl_kvlist *kvlist); #endif \ No newline at end of file diff --git a/tests/runtime/processor_content_modifier.c b/tests/runtime/processor_content_modifier.c index ac9cc5eac31..739c640978a 100644 --- a/tests/runtime/processor_content_modifier.c +++ b/tests/runtime/processor_content_modifier.c @@ -2,8 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2019-2024 The Fluent Bit Authors - * Copyright (C) 2015-2018 Treasure Data Inc. + * Copyright (C) 2015-2024 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.