diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index adb13785f09b..8fab71a09b33 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -335,6 +335,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
+- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
*Auditbeat*
diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc
index a96e8df5f3dd..8e062025b248 100644
--- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc
+++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc
@@ -795,6 +795,26 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel
This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.
+[float]
+==== `failure_dump.enabled`
+
+It is possible to log CEL program evaluation failures to a local file-system for debugging configurations.
+This option is enabled by setting `failure_dump.enabled` to true and setting the `failure_dump.filename` value.
+To delete existing failure dumps, set `failure_dump.enabled` to false without unsetting the filename option.
+
+Enabling this option compromises security and should only be used for debugging.
+
+[float]
+==== `failure_dump.filename`
+
+This specifies a directory path to write failure dumps to. If it is not empty and a CEL program evaluation fails,
+the complete set of states for the CEL program's evaluation will be written as a JSON file, along with the error
+that was reported. This option should only be used when debugging a failure as it imposes a significant performance
+impact on the input and may potentially use large quantities of memory to hold the full set of states. If a failure
+dump is configured, it is recommended that data input sizes be reduced to avoid excessive memory consumption, and
+making dumps that are intractable to analysis. To delete existing failure dumps, set `failure_dump.enabled` to
+false without unsetting the filename option.
+
[float]
=== Metrics
diff --git a/x-pack/filebeat/input/cel/config.go b/x-pack/filebeat/input/cel/config.go
index b04b78457198..aee095b199b9 100644
--- a/x-pack/filebeat/input/cel/config.go
+++ b/x-pack/filebeat/input/cel/config.go
@@ -58,6 +58,9 @@ type config struct {
// Resource is the configuration for establishing an
// HTTP request or for locating a local resource.
Resource *ResourceConfig `config:"resource" validate:"required"`
+
+ // FailureDump configures failure dump behaviour.
+ FailureDump *dumpConfig `config:"failure_dump"`
}
type redact struct {
@@ -69,6 +72,19 @@ type redact struct {
Delete bool `config:"delete"`
}
+// dumpConfig configures the CEL program to retain
+// the full evaluation state using the cel.OptTrackState
+// option. The state is written to a file in the path if
+// the evaluation fails.
+type dumpConfig struct {
+ Enabled *bool `config:"enabled"`
+ Filename string `config:"filename"`
+}
+
+func (t *dumpConfig) enabled() bool {
+ return t != nil && (t.Enabled == nil || *t.Enabled)
+}
+
func (c config) Validate() error {
if c.Redact == nil {
logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " +
@@ -89,7 +105,8 @@ func (c config) Validate() error {
if len(c.Regexps) != 0 {
patterns = map[string]*regexp.Regexp{".": nil}
}
- _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil)
+ wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != ""
+ _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump)
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go
index ff4f1dccf51f..97ab8c9bee05 100644
--- a/x-pack/filebeat/input/cel/input.go
+++ b/x-pack/filebeat/input/cel/input.go
@@ -10,6 +10,7 @@ package cel
import (
"compress/gzip"
"context"
+ "encoding/json"
"errors"
"fmt"
"io"
@@ -166,7 +167,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
Password: cfg.Auth.Basic.Password,
}
}
- prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace)
+ wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
+ prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump)
if err != nil {
return err
}
@@ -251,12 +253,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
metrics.executions.Add(1)
start := i.now().In(time.UTC)
- state, err = evalWith(ctx, prg, ast, state, start)
+ state, err = evalWith(ctx, prg, ast, state, start, wantDump)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
+ var dump dumpError
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
+ case errors.As(err, &dump):
+ path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName))
+ dir := filepath.Dir(path)
+ base := filepath.Base(path)
+ ext := filepath.Ext(base)
+ prefix := strings.TrimSuffix(base, ext)
+ path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext)
+ log.Debugw("writing failure dump file", "path", path)
+ err := dump.writeToFile(path)
+ if err != nil {
+ log.Errorw("failed to write failure dump", "path", path, "error", err)
+ }
}
log.Errorw("failed evaluation", "error", err)
env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
@@ -785,6 +800,26 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
}
}
}
+ if !cfg.FailureDump.enabled() && cfg.FailureDump != nil && cfg.FailureDump.Filename != "" {
+ // We have a fail-dump name, but we are not enabled,
+ // so remove all dumps we own.
+ err = os.Remove(cfg.FailureDump.Filename)
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ log.Errorw("failed to remove request trace log", "path", cfg.FailureDump.Filename, "error", err)
+ }
+ ext := filepath.Ext(cfg.FailureDump.Filename)
+ base := strings.TrimSuffix(cfg.FailureDump.Filename, ext)
+ paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
+ if err != nil {
+ log.Errorw("failed to collect request trace log path names", "error", err)
+ }
+ for _, p := range paths {
+ err = os.Remove(p)
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ log.Errorw("failed to remove request trace log", "path", p, "error", err)
+ }
+ }
+ }
if reg != nil {
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg)
@@ -1004,7 +1039,7 @@ func getEnv(allowed []string) map[string]string {
return env
}
-func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, *cel.Ast, error) {
+func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details bool) (cel.Program, *cel.Ast, error) {
xml, err := lib.XML(nil, xsd)
if err != nil {
return nil, nil, fmt.Errorf("failed to build xml type hints: %w", err)
@@ -1043,7 +1078,11 @@ func newProgram(ctx context.Context, src, root string, vars map[string]string, c
return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err())
}
- prg, err := env.Program(ast)
+ var progOpts []cel.ProgramOption
+ if details {
+ progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)}
+ }
+ prg, err := env.Program(ast, progOpts...)
if err != nil {
return nil, nil, fmt.Errorf("failed program instantiation: %w", err)
}
@@ -1065,8 +1104,8 @@ func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, an
}
}
-func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (map[string]interface{}, error) {
- out, _, err := prg.ContextEval(ctx, map[string]interface{}{
+func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time, details bool) (map[string]interface{}, error) {
+ out, det, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
@@ -1081,6 +1120,9 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri
})
if err != nil {
err = lib.DecoratedError{AST: ast, Err: err}
+ if details {
+ err = dumpError{error: err, dump: lib.NewDump(ast, det)}
+ }
}
if e := ctx.Err(); e != nil {
err = e
@@ -1109,6 +1151,36 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri
}
}
+// dumpError is an evaluation state dump associated with an error.
+type dumpError struct {
+ error
+ dump *lib.Dump
+}
+
+func (e dumpError) writeToFile(path string) (err error) {
+ err = os.MkdirAll(filepath.Dir(path), 0o700)
+ if err != nil {
+ return err
+ }
+ f, err := os.Create(path)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ err = errors.Join(err, f.Sync(), f.Close())
+ }()
+ enc := json.NewEncoder(f)
+ enc.SetEscapeHTML(false)
+ type dump struct {
+ Error string `json:"error"`
+ State []lib.NodeValue `json:"state"`
+ }
+ return enc.Encode(dump{
+ Error: e.Error(),
+ State: e.dump.NodeValues(),
+ })
+}
+
// clearWantMore sets the state to not request additional work in a periodic evaluation.
// It leaves state intact if there is no "want_more" element, and sets the element to false
// if there is. This is necessary instead of just doing delete(state, "want_more") as
diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go
index 0d91710ca093..143402d98347 100644
--- a/x-pack/filebeat/input/cel/input_test.go
+++ b/x-pack/filebeat/input/cel/input_test.go
@@ -45,6 +45,7 @@ var inputTests = []struct {
want []map[string]interface{}
wantCursor []map[string]interface{}
wantErr error
+ prepare func() error
wantFile string
wantNoFile string
}{
@@ -1685,6 +1686,88 @@ var inputTests = []struct {
},
}},
},
+ {
+ name: "dump_no_error",
+ config: map[string]interface{}{
+ "interval": 1,
+ "program": `{"events":[{"message":{"value": try(debug("divide by zero", 0/0))}}]}`,
+ "state": nil,
+ "resource": map[string]interface{}{
+ "url": "",
+ },
+ "failure_dump": map[string]interface{}{
+ "enabled": true,
+ "filename": "failure_dumps/dump.json",
+ },
+ },
+ time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) },
+ wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-08T00-00-00.000.json"),
+ want: []map[string]interface{}{{
+ "message": map[string]interface{}{
+ "value": "division by zero",
+ },
+ }},
+ },
+ {
+ name: "dump_error",
+ config: map[string]interface{}{
+ "interval": 1,
+ "program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`,
+ "state": nil,
+ "resource": map[string]interface{}{
+ "url": "",
+ },
+ "failure_dump": map[string]interface{}{
+ "enabled": true,
+ "filename": "failure_dumps/dump.json",
+ },
+ },
+ time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
+ wantFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case.
+ want: []map[string]interface{}{
+ {
+ "error": map[string]interface{}{
+ "message": `failed eval: ERROR: :1:58: division by zero
+ | {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}
+ | .........................................................^`,
+ },
+ },
+ },
+ },
+ {
+ name: "dump_error_delete",
+ config: map[string]interface{}{
+ "interval": 1,
+ "program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`,
+ "state": nil,
+ "resource": map[string]interface{}{
+ "url": "",
+ },
+ "failure_dump": map[string]interface{}{
+ "enabled": false, // We have a name but are disabled, so delete.
+ "filename": "failure_dumps/dump.json",
+ },
+ },
+ time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
+ prepare: func() error {
+ // Make a file that the configuration should delete.
+ err := os.MkdirAll("failure_dumps", 0o700)
+ if err != nil {
+ return err
+ }
+ return os.WriteFile(filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), nil, 0o600)
+ },
+ wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case.
+ want: []map[string]interface{}{
+ {
+ "error": map[string]interface{}{
+ "message": `failed eval: ERROR: :1:58: division by zero
+ | {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}
+ | .........................................................^`,
+ },
+ },
+ },
+ },
// not yet done from httpjson (some are redundant since they are compositional products).
//
@@ -1708,6 +1791,11 @@ func TestInput(t *testing.T) {
os.Setenv("CELTESTENVVAR", "TESTVALUE")
os.Setenv("DISALLOWEDCELTESTENVVAR", "DISALLOWEDTESTVALUE")
+ err := os.RemoveAll("failure_dumps")
+ if err != nil {
+ t.Fatalf("failed to remove failure_dumps directory: %v", err)
+ }
+
logp.TestingSetup()
for _, test := range inputTests {
t.Run(test.name, func(t *testing.T) {
@@ -1718,6 +1806,13 @@ func TestInput(t *testing.T) {
t.Skip("skipping remote endpoint test")
}
+ if test.prepare != nil {
+ err := test.prepare()
+ if err != nil {
+ t.Fatalf("unexpected from prepare(): %v", err)
+ }
+ }
+
if test.server != nil {
test.server(t, test.handler, test.config)
}
@@ -1770,6 +1865,20 @@ func TestInput(t *testing.T) {
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr)
}
+ if test.wantFile != "" {
+ if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil {
+ t.Errorf("expected log file not found: %v", err)
+ }
+ }
+ if test.wantNoFile != "" {
+ paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
+ if err != nil {
+ t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
+ }
+ if len(paths) != 0 {
+ t.Errorf("unexpected files found: %v", paths)
+ }
+ }
if test.wantErr != nil {
return
}
@@ -1802,20 +1911,6 @@ func TestInput(t *testing.T) {
t.Errorf("unexpected cursor for event %d: got:- want:+\n%s", i, cmp.Diff(got, test.wantCursor[i]))
}
}
- if test.wantFile != "" {
- if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil {
- t.Errorf("expected log file not found: %v", err)
- }
- }
- if test.wantNoFile != "" {
- paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
- if err != nil {
- t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
- }
- if len(paths) != 0 {
- t.Errorf("unexpected files found: %v", paths)
- }
- }
})
}
}