From d8c43756576d09f91eee8d33fd6abe6b52d1f311 Mon Sep 17 00:00:00 2001 From: Nicolas Filotto Date: Fri, 13 Nov 2020 20:26:07 +0100 Subject: [PATCH] Allow to catch errors that occur in the apply function (#8401) --- plugins/processors/starlark/README.md | 21 ++++++++++ plugins/processors/starlark/builtins.go | 13 +++++++ plugins/processors/starlark/starlark.go | 1 + plugins/processors/starlark/starlark_test.go | 40 ++++++++++++++++++++ 4 files changed, 75 insertions(+) diff --git a/plugins/processors/starlark/README.md b/plugins/processors/starlark/README.md index 1b541c33857ed..96da69e499fe9 100644 --- a/plugins/processors/starlark/README.md +++ b/plugins/processors/starlark/README.md @@ -154,6 +154,27 @@ def apply(metric): Telegraf freezes the global scope, which prevents it from being modified. Attempting to modify the global scope will fail with an error. +**How to manage errors that occur in the apply function?** + +In case you need to call some code that may return an error, you can delegate the call +to the built-in function `catch` which takes as argument a `Callable` and returns the error +that occured if any, `None` otherwise. + +So for example: + +```python +load("json.star", "json") + +def apply(metric): + error = catch(lambda: failing(metric)) + if error != None: + # Some code to execute in case of an error + metric.fields["error"] = error + return metric + +def failing(metric): + json.decode("non-json-content") +``` ### Examples diff --git a/plugins/processors/starlark/builtins.go b/plugins/processors/starlark/builtins.go index 4eda39b7d8d12..c2a30a0dc9379 100644 --- a/plugins/processors/starlark/builtins.go +++ b/plugins/processors/starlark/builtins.go @@ -34,6 +34,19 @@ func deepcopy(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, return &Metric{metric: dup}, nil } +// catch(f) evaluates f() and returns its evaluation error message +// if it failed or None if it succeeded. +func catch(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var fn starlark.Callable + if err := starlark.UnpackArgs("catch", args, kwargs, "fn", &fn); err != nil { + return nil, err + } + if _, err := starlark.Call(thread, fn, nil, nil); err != nil { + return starlark.String(err.Error()), nil + } + return starlark.None, nil +} + type builtinMethod func(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) func builtinAttr(recv starlark.Value, name string, methods map[string]builtinMethod) (starlark.Value, error) { diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index cf791b3f155e3..a39b341f235c9 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -58,6 +58,7 @@ func (s *Starlark) Init() error { builtins := starlark.StringDict{} builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) + builtins["catch"] = starlark.NewBuiltin("catch", catch) program, err := s.sourceProgram(builtins) if err != nil { diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index ce0b1803c959c..f83767210c3c1 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -2378,6 +2378,46 @@ def apply(metric): ), }, }, + { + name: "support errors", + source: ` +load("json.star", "json") + +def apply(metric): + msg = catch(lambda: process(metric)) + if msg != None: + metric.fields["error"] = msg + metric.fields["value"] = "default" + return metric + +def process(metric): + metric.fields["field1"] = "value1" + metric.tags["tags1"] = "value2" + # Throw an error + json.decode(metric.fields.get('value')) + # Should never be called + metric.fields["msg"] = "value4" +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": "non-json-content", "msg": "value3"}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{"tags1": "value2"}, + map[string]interface{}{ + "value": "default", + "field1": "value1", + "msg": "value3", + "error": "json.decode: at offset 0, unexpected character 'n'", + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests {