Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/cel: add evaluation state dump debugging feature #41335

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]

*Auditbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[float]
==== `failure_dump`

This specifies a directory path to write failure dumps to. If it is not empty and a CEL program evaluation fails, the complete set of states for the CEL program's evaluation will be written as a JSON file, along with the error that was reported. This option should only be used when debugging a failure as it imposes a significant performance impact on the input and may potentially use large quantities of memory to hold the full set of states. If a failure dump is configured, it is recommended that data input sizes be reduced to avoid excessive memory consumption, and making dumps that are intractable to analysis.

kcreddy marked this conversation as resolved.
Show resolved Hide resolved
[float]
=== Metrics

Expand Down
19 changes: 18 additions & 1 deletion x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type config struct {
// Resource is the configuration for establishing an
// HTTP request or for locating a local resource.
Resource *ResourceConfig `config:"resource" validate:"required"`

// FailureDump configures failure dump behaviour.
FailureDump *dumpConfig `config:"failure_dump"`
}

type redact struct {
Expand All @@ -69,6 +72,19 @@ type redact struct {
Delete bool `config:"delete"`
}

// dumpConfig configures the CEL program to retain
// the full evaluation state using the cel.OptTrackState
// option. The state is written to a file in the path if
// the evaluation fails.
type dumpConfig struct {
Enabled *bool `config:"enabled"`
Filename string `config:"filename"`
}

func (t *dumpConfig) enabled() bool {
return t != nil && (t.Enabled == nil || *t.Enabled)
}

func (c config) Validate() error {
if c.Redact == nil {
logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " +
Expand All @@ -89,7 +105,8 @@ func (c config) Validate() error {
if len(c.Regexps) != 0 {
patterns = map[string]*regexp.Regexp{".": nil}
}
_, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil)
wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != ""
_, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump)
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
Expand Down
84 changes: 78 additions & 6 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -166,7 +167,8 @@
Password: cfg.Auth.Basic.Password,
}
}
prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace)
wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,12 +253,25 @@
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
metrics.executions.Add(1)
start := i.now().In(time.UTC)
state, err = evalWith(ctx, prg, ast, state, start)
state, err = evalWith(ctx, prg, ast, state, start, wantDump)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
var dump dumpError
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
case errors.As(err, &dump):
path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName))
dir := filepath.Dir(path)
base := filepath.Base(path)
ext := filepath.Ext(base)
prefix := strings.TrimSuffix(base, ext)
path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext)
log.Debugw("writing failure dump file", "path", path)
err := dump.writeToFile(path)
if err != nil {
log.Errorw("failed to write failure dump", "path", path, "error", err)
}
}
log.Errorw("failed evaluation", "error", err)
env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
Expand Down Expand Up @@ -785,6 +800,26 @@
}
}
}
if !cfg.FailureDump.enabled() && cfg.FailureDump != nil && cfg.FailureDump.Filename != "" {
// We have a fail-dump name, but we are not enabled,
// so remove all dumps we own.
err = os.Remove(cfg.FailureDump.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.FailureDump.Filename, "error", err)
}
ext := filepath.Ext(cfg.FailureDump.Filename)
base := strings.TrimSuffix(cfg.FailureDump.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
}

if reg != nil {
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg)
Expand Down Expand Up @@ -1004,7 +1039,7 @@
return env
}

func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, *cel.Ast, error) {
func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details bool) (cel.Program, *cel.Ast, error) {
xml, err := lib.XML(nil, xsd)
if err != nil {
return nil, nil, fmt.Errorf("failed to build xml type hints: %w", err)
Expand Down Expand Up @@ -1043,7 +1078,11 @@
return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err())
}

prg, err := env.Program(ast)
var progOpts []cel.ProgramOption
if details {
progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)}
}
prg, err := env.Program(ast, progOpts...)
if err != nil {
return nil, nil, fmt.Errorf("failed program instantiation: %w", err)
}
Expand All @@ -1065,8 +1104,8 @@
}
}

func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{
func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time, details bool) (map[string]interface{}, error) {
out, det, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
Expand All @@ -1081,6 +1120,9 @@
})
if err != nil {
err = lib.DecoratedError{AST: ast, Err: err}
if details {
err = dumpError{error: err, dump: lib.NewDump(ast, det)}
}
}
if e := ctx.Err(); e != nil {
err = e
Expand Down Expand Up @@ -1109,6 +1151,36 @@
}
}

// dumpError is an evaluation state dump associated with an error.
type dumpError struct {
error
dump *lib.Dump
}

func (e dumpError) writeToFile(path string) (err error) {
err = os.MkdirAll(filepath.Dir(path), 0o700)
if err != nil {
return err
}
f, err := os.Create(path)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, f.Sync(), f.Close())
}()
enc := json.NewEncoder(f)
enc.SetEscapeHTML(false)
type dump struct {
Error string `json:"error"`
State []lib.NodeValue `json:"state"`
}
return enc.Encode(dump{
Error: e.Error(),
State: e.dump.NodeValues(),
})
}

// clearWantMore sets the state to not request additional work in a periodic evaluation.
// It leaves state intact if there is no "want_more" element, and sets the element to false
// if there is. This is necessary instead of just doing delete(state, "want_more") as
Expand Down Expand Up @@ -1260,7 +1332,7 @@
// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")

Check failure on line 1335 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

rest declared and not used (typecheck)
v, ok := m[key]
if !ok {
return
Expand Down
123 changes: 109 additions & 14 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var inputTests = []struct {
want []map[string]interface{}
wantCursor []map[string]interface{}
wantErr error
prepare func() error
wantFile string
wantNoFile string
}{
Expand Down Expand Up @@ -1685,6 +1686,88 @@ var inputTests = []struct {
},
}},
},
{
name: "dump_no_error",
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"value": try(debug("divide by zero", 0/0))}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
"failure_dump": map[string]interface{}{
"enabled": true,
"filename": "failure_dumps/dump.json",
},
},
time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) },
wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-08T00-00-00.000.json"),
want: []map[string]interface{}{{
"message": map[string]interface{}{
"value": "division by zero",
},
}},
},
{
name: "dump_error",
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
"failure_dump": map[string]interface{}{
"enabled": true,
"filename": "failure_dumps/dump.json",
},
},
time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
wantFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case.
want: []map[string]interface{}{
{
"error": map[string]interface{}{
"message": `failed eval: ERROR: <input>:1:58: division by zero
| {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}
| .........................................................^`,
},
},
},
},
{
name: "dump_error_delete",
kcreddy marked this conversation as resolved.
Show resolved Hide resolved
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
"failure_dump": map[string]interface{}{
"enabled": false, // We have a name but are disabled, so delete.
"filename": "failure_dumps/dump.json",
},
},
time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
prepare: func() error {
// Make a file that the configuration should delete.
err := os.MkdirAll("failure_dumps", 0o700)
if err != nil {
return err
}
return os.WriteFile(filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), nil, 0o600)
},
wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case.
want: []map[string]interface{}{
{
"error": map[string]interface{}{
"message": `failed eval: ERROR: <input>:1:58: division by zero
| {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}
| .........................................................^`,
},
},
},
},

// not yet done from httpjson (some are redundant since they are compositional products).
//
Expand All @@ -1708,6 +1791,11 @@ func TestInput(t *testing.T) {
os.Setenv("CELTESTENVVAR", "TESTVALUE")
os.Setenv("DISALLOWEDCELTESTENVVAR", "DISALLOWEDTESTVALUE")

err := os.RemoveAll("failure_dumps")
if err != nil {
t.Fatalf("failed to remove failure_dumps directory: %v", err)
}

logp.TestingSetup()
for _, test := range inputTests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -1718,6 +1806,13 @@ func TestInput(t *testing.T) {
t.Skip("skipping remote endpoint test")
}

if test.prepare != nil {
err := test.prepare()
if err != nil {
t.Fatalf("unexpected from prepare(): %v", err)
}
}

if test.server != nil {
test.server(t, test.handler, test.config)
}
Expand Down Expand Up @@ -1770,6 +1865,20 @@ func TestInput(t *testing.T) {
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr)
}
if test.wantFile != "" {
if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil {
t.Errorf("expected log file not found: %v", err)
}
}
if test.wantNoFile != "" {
paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
if err != nil {
t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
}
if len(paths) != 0 {
t.Errorf("unexpected files found: %v", paths)
}
}
if test.wantErr != nil {
return
}
Expand Down Expand Up @@ -1802,20 +1911,6 @@ func TestInput(t *testing.T) {
t.Errorf("unexpected cursor for event %d: got:- want:+\n%s", i, cmp.Diff(got, test.wantCursor[i]))
}
}
if test.wantFile != "" {
if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil {
t.Errorf("expected log file not found: %v", err)
}
}
if test.wantNoFile != "" {
paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
if err != nil {
t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
}
if len(paths) != 0 {
t.Errorf("unexpected files found: %v", paths)
}
}
})
}
}
Expand Down
Loading