diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index adb13785f09b..8fab71a09b33 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -335,6 +335,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] +- Add evaluation state dump debugging option to CEL input. {pull}41335[41335] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index a96e8df5f3dd..8e062025b248 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -795,6 +795,26 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced. +[float] +==== `failure_dump.enabled` + +It is possible to log CEL program evaluation failures to a local file-system for debugging configurations. +This option is enabled by setting `failure_dump.enabled` to true and setting the `failure_dump.filename` value. +To delete existing failure dumps, set `failure_dump.enabled` to false without unsetting the filename option. + +Enabling this option compromises security and should only be used for debugging. + +[float] +==== `failure_dump.filename` + +This specifies a directory path to write failure dumps to. If it is not empty and a CEL program evaluation fails, +the complete set of states for the CEL program's evaluation will be written as a JSON file, along with the error +that was reported. This option should only be used when debugging a failure as it imposes a significant performance +impact on the input and may potentially use large quantities of memory to hold the full set of states. If a failure +dump is configured, it is recommended that data input sizes be reduced to avoid excessive memory consumption, and +making dumps that are intractable to analysis. To delete existing failure dumps, set `failure_dump.enabled` to +false without unsetting the filename option. + [float] === Metrics diff --git a/x-pack/filebeat/input/cel/config.go b/x-pack/filebeat/input/cel/config.go index b04b78457198..aee095b199b9 100644 --- a/x-pack/filebeat/input/cel/config.go +++ b/x-pack/filebeat/input/cel/config.go @@ -58,6 +58,9 @@ type config struct { // Resource is the configuration for establishing an // HTTP request or for locating a local resource. Resource *ResourceConfig `config:"resource" validate:"required"` + + // FailureDump configures failure dump behaviour. + FailureDump *dumpConfig `config:"failure_dump"` } type redact struct { @@ -69,6 +72,19 @@ type redact struct { Delete bool `config:"delete"` } +// dumpConfig configures the CEL program to retain +// the full evaluation state using the cel.OptTrackState +// option. The state is written to a file in the path if +// the evaluation fails. +type dumpConfig struct { + Enabled *bool `config:"enabled"` + Filename string `config:"filename"` +} + +func (t *dumpConfig) enabled() bool { + return t != nil && (t.Enabled == nil || *t.Enabled) +} + func (c config) Validate() error { if c.Redact == nil { logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " + @@ -89,7 +105,8 @@ func (c config) Validate() error { if len(c.Regexps) != 0 { patterns = map[string]*regexp.Regexp{".": nil} } - _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil) + wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != "" + _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump) if err != nil { return fmt.Errorf("failed to check program: %w", err) } diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index ff4f1dccf51f..97ab8c9bee05 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -10,6 +10,7 @@ package cel import ( "compress/gzip" "context" + "encoding/json" "errors" "fmt" "io" @@ -166,7 +167,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p Password: cfg.Auth.Basic.Password, } } - prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace) + wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != "" + prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump) if err != nil { return err } @@ -251,12 +253,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact}) metrics.executions.Add(1) start := i.now().In(time.UTC) - state, err = evalWith(ctx, prg, ast, state, start) + state, err = evalWith(ctx, prg, ast, state, start, wantDump) log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact}) if err != nil { + var dump dumpError switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): return err + case errors.As(err, &dump): + path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName)) + dir := filepath.Dir(path) + base := filepath.Base(path) + ext := filepath.Ext(base) + prefix := strings.TrimSuffix(base, ext) + path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext) + log.Debugw("writing failure dump file", "path", path) + err := dump.writeToFile(path) + if err != nil { + log.Errorw("failed to write failure dump", "path", path, "error", err) + } } log.Errorw("failed evaluation", "error", err) env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error()) @@ -785,6 +800,26 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin } } } + if !cfg.FailureDump.enabled() && cfg.FailureDump != nil && cfg.FailureDump.Filename != "" { + // We have a fail-dump name, but we are not enabled, + // so remove all dumps we own. + err = os.Remove(cfg.FailureDump.Filename) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", cfg.FailureDump.Filename, "error", err) + } + ext := filepath.Ext(cfg.FailureDump.Filename) + base := strings.TrimSuffix(cfg.FailureDump.Filename, ext) + paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext) + if err != nil { + log.Errorw("failed to collect request trace log path names", "error", err) + } + for _, p := range paths { + err = os.Remove(p) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", p, "error", err) + } + } + } if reg != nil { c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg) @@ -1004,7 +1039,7 @@ func getEnv(allowed []string) map[string]string { return env } -func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, *cel.Ast, error) { +func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details bool) (cel.Program, *cel.Ast, error) { xml, err := lib.XML(nil, xsd) if err != nil { return nil, nil, fmt.Errorf("failed to build xml type hints: %w", err) @@ -1043,7 +1078,11 @@ func newProgram(ctx context.Context, src, root string, vars map[string]string, c return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err()) } - prg, err := env.Program(ast) + var progOpts []cel.ProgramOption + if details { + progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)} + } + prg, err := env.Program(ast, progOpts...) if err != nil { return nil, nil, fmt.Errorf("failed program instantiation: %w", err) } @@ -1065,8 +1104,8 @@ func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, an } } -func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, map[string]interface{}{ +func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time, details bool) (map[string]interface{}, error) { + out, det, err := prg.ContextEval(ctx, map[string]interface{}{ // Replace global program "now" with current time. This is necessary // as the lib.Time now global is static at program instantiation time // which will persist over multiple evaluations. The lib.Time behaviour @@ -1081,6 +1120,9 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri }) if err != nil { err = lib.DecoratedError{AST: ast, Err: err} + if details { + err = dumpError{error: err, dump: lib.NewDump(ast, det)} + } } if e := ctx.Err(); e != nil { err = e @@ -1109,6 +1151,36 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri } } +// dumpError is an evaluation state dump associated with an error. +type dumpError struct { + error + dump *lib.Dump +} + +func (e dumpError) writeToFile(path string) (err error) { + err = os.MkdirAll(filepath.Dir(path), 0o700) + if err != nil { + return err + } + f, err := os.Create(path) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, f.Sync(), f.Close()) + }() + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + type dump struct { + Error string `json:"error"` + State []lib.NodeValue `json:"state"` + } + return enc.Encode(dump{ + Error: e.Error(), + State: e.dump.NodeValues(), + }) +} + // clearWantMore sets the state to not request additional work in a periodic evaluation. // It leaves state intact if there is no "want_more" element, and sets the element to false // if there is. This is necessary instead of just doing delete(state, "want_more") as diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index 0d91710ca093..143402d98347 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -45,6 +45,7 @@ var inputTests = []struct { want []map[string]interface{} wantCursor []map[string]interface{} wantErr error + prepare func() error wantFile string wantNoFile string }{ @@ -1685,6 +1686,88 @@ var inputTests = []struct { }, }}, }, + { + name: "dump_no_error", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"value": try(debug("divide by zero", 0/0))}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + "failure_dump": map[string]interface{}{ + "enabled": true, + "filename": "failure_dumps/dump.json", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) }, + wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-08T00-00-00.000.json"), + want: []map[string]interface{}{{ + "message": map[string]interface{}{ + "value": "division by zero", + }, + }}, + }, + { + name: "dump_error", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + "failure_dump": map[string]interface{}{ + "enabled": true, + "filename": "failure_dumps/dump.json", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) }, + wantFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case. + want: []map[string]interface{}{ + { + "error": map[string]interface{}{ + "message": `failed eval: ERROR: :1:58: division by zero + | {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]} + | .........................................................^`, + }, + }, + }, + }, + { + name: "dump_error_delete", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + "failure_dump": map[string]interface{}{ + "enabled": false, // We have a name but are disabled, so delete. + "filename": "failure_dumps/dump.json", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) }, + prepare: func() error { + // Make a file that the configuration should delete. + err := os.MkdirAll("failure_dumps", 0o700) + if err != nil { + return err + } + return os.WriteFile(filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), nil, 0o600) + }, + wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case. + want: []map[string]interface{}{ + { + "error": map[string]interface{}{ + "message": `failed eval: ERROR: :1:58: division by zero + | {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]} + | .........................................................^`, + }, + }, + }, + }, // not yet done from httpjson (some are redundant since they are compositional products). // @@ -1708,6 +1791,11 @@ func TestInput(t *testing.T) { os.Setenv("CELTESTENVVAR", "TESTVALUE") os.Setenv("DISALLOWEDCELTESTENVVAR", "DISALLOWEDTESTVALUE") + err := os.RemoveAll("failure_dumps") + if err != nil { + t.Fatalf("failed to remove failure_dumps directory: %v", err) + } + logp.TestingSetup() for _, test := range inputTests { t.Run(test.name, func(t *testing.T) { @@ -1718,6 +1806,13 @@ func TestInput(t *testing.T) { t.Skip("skipping remote endpoint test") } + if test.prepare != nil { + err := test.prepare() + if err != nil { + t.Fatalf("unexpected from prepare(): %v", err) + } + } + if test.server != nil { test.server(t, test.handler, test.config) } @@ -1770,6 +1865,20 @@ func TestInput(t *testing.T) { if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr) } + if test.wantFile != "" { + if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil { + t.Errorf("expected log file not found: %v", err) + } + } + if test.wantNoFile != "" { + paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile)) + if err != nil { + t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err) + } + if len(paths) != 0 { + t.Errorf("unexpected files found: %v", paths) + } + } if test.wantErr != nil { return } @@ -1802,20 +1911,6 @@ func TestInput(t *testing.T) { t.Errorf("unexpected cursor for event %d: got:- want:+\n%s", i, cmp.Diff(got, test.wantCursor[i])) } } - if test.wantFile != "" { - if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil { - t.Errorf("expected log file not found: %v", err) - } - } - if test.wantNoFile != "" { - paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile)) - if err != nil { - t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err) - } - if len(paths) != 0 { - t.Errorf("unexpected files found: %v", paths) - } - } }) } }