From 2ffae42a5fcf6f976ce5f9d1881a1fa0155a32e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=B0=E5=A4=A9=E9=9B=AA=E5=9C=B0?= Date: Thu, 17 Nov 2022 03:10:23 +0800 Subject: [PATCH] libbeat: add processors "decode_duration", "move_fields" (#31301) Co-authored-by: hxms Co-authored-by: DeDe Morton --- libbeat/cmd/instance/imports_common.go | 2 + libbeat/docs/processors-list.asciidoc | 12 ++ .../decode_duration/decode_duration.go | 95 +++++++++++++ .../decode_duration/decode_duration_test.go | 73 ++++++++++ .../docs/decode_duration.asciidoc | 26 ++++ .../move_fields/docs/move_fields.asciidoc | 101 +++++++++++++ libbeat/processors/move_fields/move_fields.go | 118 +++++++++++++++ .../move_fields/move_fields_test.go | 134 ++++++++++++++++++ 8 files changed, 561 insertions(+) create mode 100644 libbeat/processors/decode_duration/decode_duration.go create mode 100644 libbeat/processors/decode_duration/decode_duration_test.go create mode 100644 libbeat/processors/decode_duration/docs/decode_duration.asciidoc create mode 100644 libbeat/processors/move_fields/docs/move_fields.asciidoc create mode 100644 libbeat/processors/move_fields/move_fields.go create mode 100644 libbeat/processors/move_fields/move_fields_test.go diff --git a/libbeat/cmd/instance/imports_common.go b/libbeat/cmd/instance/imports_common.go index 736bebc2e967..540bef793ab5 100644 --- a/libbeat/cmd/instance/imports_common.go +++ b/libbeat/cmd/instance/imports_common.go @@ -30,12 +30,14 @@ import ( _ "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" _ "github.com/elastic/beats/v7/libbeat/processors/communityid" _ "github.com/elastic/beats/v7/libbeat/processors/convert" + _ "github.com/elastic/beats/v7/libbeat/processors/decode_duration" _ "github.com/elastic/beats/v7/libbeat/processors/decode_xml" _ "github.com/elastic/beats/v7/libbeat/processors/decode_xml_wineventlog" _ "github.com/elastic/beats/v7/libbeat/processors/dissect" _ "github.com/elastic/beats/v7/libbeat/processors/dns" _ "github.com/elastic/beats/v7/libbeat/processors/extract_array" _ "github.com/elastic/beats/v7/libbeat/processors/fingerprint" + _ "github.com/elastic/beats/v7/libbeat/processors/move_fields" _ "github.com/elastic/beats/v7/libbeat/processors/ratelimit" _ "github.com/elastic/beats/v7/libbeat/processors/registered_domain" _ "github.com/elastic/beats/v7/libbeat/processors/script" diff --git a/libbeat/docs/processors-list.asciidoc b/libbeat/docs/processors-list.asciidoc index 19d352816506..44a85dc4d263 100644 --- a/libbeat/docs/processors-list.asciidoc +++ b/libbeat/docs/processors-list.asciidoc @@ -59,6 +59,9 @@ endif::[] ifndef::no_decode_csv_fields_processor[] * <> endif::[] +ifndef::no_decode_duration_processor[] +* <> +endif::[] ifndef::no_decode_json_fields_processor[] * <> endif::[] @@ -95,6 +98,9 @@ endif::[] ifndef::no_include_fields_processor[] * <> endif::[] +ifndef::no_move_fields_processor[] +* <> +endif::[] ifndef::no_include_rate_limit_processor[] * <> endif::[] @@ -189,6 +195,9 @@ endif::[] ifndef::no_decode_csv_fields_processor[] include::{libbeat-processors-dir}/decode_csv_fields/docs/decode_csv_fields.asciidoc[] endif::[] +ifndef::no_include_decode_duration_processor[] +include::{libbeat-processors-dir}/decode_duration/docs/decode_duration.asciidoc[] +endif::[] ifndef::no_decode_json_fields_processor[] include::{libbeat-processors-dir}/actions/docs/decode_json_fields.asciidoc[] endif::[] @@ -225,6 +234,9 @@ endif::[] ifndef::no_include_fields_processor[] include::{libbeat-processors-dir}/actions/docs/include_fields.asciidoc[] endif::[] +ifndef::no_include_move_fields_processor[] +include::{libbeat-processors-dir}/move_fields/docs/move_fields.asciidoc[] +endif::[] ifndef::no_include_rate_limit_processor[] include::{libbeat-processors-dir}/ratelimit/docs/rate_limit.asciidoc[] endif::[] diff --git a/libbeat/processors/decode_duration/decode_duration.go b/libbeat/processors/decode_duration/decode_duration.go new file mode 100644 index 000000000000..4cf6d17ab54c --- /dev/null +++ b/libbeat/processors/decode_duration/decode_duration.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package decode_duration + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/checks" + jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + "github.com/elastic/elastic-agent-libs/config" +) + +func init() { + processors.RegisterPlugin("decode_duration", + checks.ConfigChecked(NewDecodeDuration, + checks.RequireFields("field", "format"))) + jsprocessor.RegisterPlugin("DecodeDuration", NewDecodeDuration) +} + +type decodeDurationConfig struct { + Field string `config:"field"` + Format string `config:"format"` +} + +type decodeDuration struct { + config decodeDurationConfig +} + +func (u decodeDuration) Run(event *beat.Event) (*beat.Event, error) { + fields := event.Fields + fieldName := u.config.Field + x, err := fields.GetValue(fieldName) + if err != nil { + return event, fmt.Errorf("field '%s' not present on event", fieldName) + } + durationString, ok := x.(string) + if !ok { + return event, fmt.Errorf("field '%s' is not a string, value: '%#v'", fieldName, x) + } + d, err := time.ParseDuration(durationString) + if err != nil { + return event, nil + } + switch u.config.Format { + case "milliseconds": + // keep the result is type float64 + x = float64(d.Milliseconds()) + case "seconds": + x = d.Seconds() + case "minutes": + x = d.Minutes() + case "hours": + x = d.Hours() + default: + x = float64(d.Milliseconds()) + } + if _, err = fields.Put(fieldName, x); err != nil { + return event, fmt.Errorf("put field '%s' back to event failed: %w", fieldName, err) + } + return event, nil +} + +func (u decodeDuration) String() string { + return "decode_duration" +} + +func NewDecodeDuration(c *config.C) (processors.Processor, error) { + fc := decodeDurationConfig{} + err := c.Unpack(&fc) + if err != nil { + return nil, fmt.Errorf("failed to unpack decode duration config: %w", err) + } + + return &decodeDuration{ + config: fc, + }, nil +} diff --git a/libbeat/processors/decode_duration/decode_duration_test.go b/libbeat/processors/decode_duration/decode_duration_test.go new file mode 100644 index 000000000000..e4945439863b --- /dev/null +++ b/libbeat/processors/decode_duration/decode_duration_test.go @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package decode_duration + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestDecodeDuration(t *testing.T) { + cases := []struct { + Duration time.Duration + Format string + Result float64 + }{ + {time.Second + time.Millisecond, "", 1001}, + {time.Second + time.Millisecond, "milliseconds", 1001}, + {time.Second + time.Millisecond, "seconds", 1.001}, + {3 * time.Second, "minutes", 0.05}, + {3 * time.Minute, "hours", 0.05}, + } + + for _, testCase := range cases { + t.Run(fmt.Sprintf("%s format as %s", testCase.Duration, testCase.Format), func(t *testing.T) { + evt := &beat.Event{Fields: mapstr.M{}} + c := &decodeDuration{ + config: decodeDurationConfig{ + Field: "duration", + Format: testCase.Format, + }, + } + if _, err := evt.PutValue("duration", testCase.Duration.String()); err != nil { + t.Fatal(err) + } + evt, err := c.Run(evt) + if err != nil { + t.Fatal(err) + } + d, err := evt.GetValue("duration") + if err != nil { + t.Fatal(err) + } + floatD, ok := d.(float64) + if !ok { + t.Fatal("result value is not duration") + } + floatD = math.Round(floatD*math.Pow10(6)) / math.Pow10(6) + if floatD != testCase.Result { + t.Fatalf("test case except: %f, actual: %f", testCase.Result, floatD) + } + }) + } +} diff --git a/libbeat/processors/decode_duration/docs/decode_duration.asciidoc b/libbeat/processors/decode_duration/docs/decode_duration.asciidoc new file mode 100644 index 000000000000..b333f18bb618 --- /dev/null +++ b/libbeat/processors/decode_duration/docs/decode_duration.asciidoc @@ -0,0 +1,26 @@ +[[decode-duration]] +=== Decode duration + +++++ +decode_duration +++++ + +The `decode_duration` processor decodes a Go-style duration string into a specific `format`. + +For more information about the Go `time.Duration` string style, refer to the https://pkg.go.dev/time#Duration[Go documentation]. + +.Decode-Duration options +[options="header"] +|====== +| Name | Required | Default | Description | +| `field` | yes | | Which field of event needs to be decoded as `time.Duration` | +| `format` | yes | `milliseconds` | Supported formats: `milliseconds`/`seconds`/`minutes`/`hours` | +|====== + +[source,yaml] +---- +processors: + - decode_duration: + field: "app.rpc.cost" + format: "milliseconds" +---- diff --git a/libbeat/processors/move_fields/docs/move_fields.asciidoc b/libbeat/processors/move_fields/docs/move_fields.asciidoc new file mode 100644 index 000000000000..181bc33613b4 --- /dev/null +++ b/libbeat/processors/move_fields/docs/move_fields.asciidoc @@ -0,0 +1,101 @@ +[[move-fields]] +=== Move fields + +++++ +move_fields +++++ + +The `move_fields` processor moves event fields from one object into another. It can also rearrange fields or add a prefix to fields. + +The processor extracts fields from `from`, then uses `fields` and `exclude` as filters to choose which fields to move into the `to` field. + +For example, given the following event: + +[source,json] +---- +{ + "app": { + "method": "a", + "elapsed_time": 100, + "user_id": 100, + "message": "i'm a message" + } +} +---- + +To move `method` and `elapsed_time` into another object, use this configuration: + +[source,yaml] +---- +processors: + - move_fields: + from: "app" + fields: ["method", "elapsed_time"], + to: "rpc." +---- + +Your final event will be: + +[source,json] +---- +{ + "app": { + "user_id": 100, + "message": "i'm a message", + "rpc": { + "method": "a", + "elapsed_time": 100 + } + } +} +---- + + +To add a prefix to the whole event: + +[source,json] +---- +{ + "app": { "method": "a"}, + "cost": 100 +} +---- + +Use this configuration: + +[source,yaml] +---- +processors: + - move_fields: + to: "my_prefix_" +---- + +Your final event will be: + +[source,json] +---- +{ + "my_prefix_app": { "method": "a"}, + "my_prefix_cost": 100 +} +---- + +.Move-fields options +[options="header"] +|====== +| Name | Required | Default | Description | +| `from` | no | | Which field you want extract. This field and any nested fields will be moved into `to` unless they are filtered out. If empty, indicates event root. | +| `fields` | no | | Which fields to extract from `from` and move to `to`. An empty list indicates all fields. | +| `ignore_missing` | no | false | Ignore "not found" errors when extracting fields. | +| `exclude` | no | | A list of fields to exclude and not move. | +| `to` | yes | | These fields extract from `from` destination field prefix the `to` will base on fields root. | +|====== + +[source,yaml] +---- +processors: + - move_fields: + from: "app" + fields: [ "method", "elapsed_time" ] + to: "rpc." +---- diff --git a/libbeat/processors/move_fields/move_fields.go b/libbeat/processors/move_fields/move_fields.go new file mode 100644 index 000000000000..2603d8e07caa --- /dev/null +++ b/libbeat/processors/move_fields/move_fields.go @@ -0,0 +1,118 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package move_fields + +import ( + "errors" + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/checks" + jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func init() { + processors.RegisterPlugin("move_fields", + checks.ConfigChecked(NewMoveFields, checks.RequireFields("to"))) + jsprocessor.RegisterPlugin("MoveFields", NewMoveFields) +} + +type moveFieldsConfig struct { + Exclude []string `config:"exclude"` + Fields []string `config:"fields"` + From string `config:"from"` + To string `config:"to"` + IgnoreMissing bool `config:"ignore_missing"` +} + +type moveFields struct { + config moveFieldsConfig + excludeMap map[string]struct{} +} + +func (u moveFields) Run(event *beat.Event) (*beat.Event, error) { + root := event.Fields.Clone() + parent := root + if p := u.config.From; p != "" { + parentValue, err := root.GetValue(p) + if err != nil { + return nil, fmt.Errorf("cannot get value from key '%s': %w", p, err) + } + var ok bool + parent, ok = parentValue.(mapstr.M) + if !ok { + return nil, fmt.Errorf("'%s' does not contain an object, it contains '%#v'", p, parentValue) + } + } + + keys := u.config.Fields + if len(keys) == 0 { + keys = make([]string, 0, len(parent)) + for k := range parent { + keys = append(keys, k) + } + } + + for _, k := range keys { + if _, ok := u.excludeMap[k]; ok { + continue + } + v, err := parent.GetValue(k) + if u.config.IgnoreMissing && errors.Is(err, mapstr.ErrKeyNotFound) { + continue + } + if err != nil { + return nil, fmt.Errorf("move field read field from parent, sub key: %s, failed: %w", k, err) + } + if err = parent.Delete(k); err != nil { + return nil, fmt.Errorf("move field delete field from parent sub key: %s, failed: %w", k, err) + } + newKey := fmt.Sprintf("%s%s", u.config.To, k) + if _, err = root.Put(newKey, v); err != nil { + return nil, fmt.Errorf("move field write field to sub key: %s, new key: %s, failed: %w", k, newKey, err) + } + } + + event.Fields = root + return event, nil +} + +func (u moveFields) String() string { + return "move_fields" +} + +func NewMoveFields(c *config.C) (processors.Processor, error) { + fc := moveFieldsConfig{} + err := c.Unpack(&fc) + if err != nil { + return nil, fmt.Errorf("failed to unpack move fields config: %w", err) + } + + p := &moveFields{ + config: fc, + excludeMap: make(map[string]struct{}), + } + for _, k := range fc.Exclude { + p.excludeMap[k] = struct{}{} + } + + return p, nil +} diff --git a/libbeat/processors/move_fields/move_fields_test.go b/libbeat/processors/move_fields/move_fields_test.go new file mode 100644 index 000000000000..58f48ad385c8 --- /dev/null +++ b/libbeat/processors/move_fields/move_fields_test.go @@ -0,0 +1,134 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package move_fields + +import ( + "reflect" + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestMoveFields(t *testing.T) { + cases := []struct { + name string + in, expected mapstr.M + p *moveFields + }{ + { + "move fields to app.rpc exclude 'method'", + mapstr.M{"app": mapstr.M{"version": 1, "method": "2"}, "other": 3}, + mapstr.M{"app": mapstr.M{"method": "2"}, "rpc": mapstr.M{"version": 1}, "other": 3}, + &moveFields{ + config: moveFieldsConfig{ + From: "app", + Fields: nil, + To: "rpc.", + Exclude: []string{"method"}, + }, + excludeMap: map[string]struct{}{"method": {}}, + }, + }, + { + "move 'version' to app.rpc", + mapstr.M{"app": mapstr.M{"version": 1, "method": "2", "other": 3}}, + mapstr.M{"app": mapstr.M{"method": "2", "other": 3}, "rpc": mapstr.M{"version": 1}}, + &moveFields{ + config: moveFieldsConfig{ + From: "app", + Fields: []string{"version"}, + To: "rpc.", + Exclude: nil, + }, + excludeMap: nil, + }, + }, + { + "move fields from app object to rpc object", + mapstr.M{"app": mapstr.M{"version": 1, "method": "2"}, "other": 3}, + mapstr.M{"app": mapstr.M{}, "rpc": mapstr.M{"method": "2", "version": 1}, "other": 3}, + &moveFields{ + config: moveFieldsConfig{ + From: "app", + Fields: nil, + To: "rpc.", + Exclude: nil, + }, + excludeMap: nil, + }, + }, + { + "add prefix to fields in app object", + mapstr.M{"app": mapstr.M{"version": 1, "method": "2"}, "other": 3}, + mapstr.M{"app": mapstr.M{}, "rpc_method": "2", "rpc_version": 1, "other": 3}, + &moveFields{ + config: moveFieldsConfig{ + From: "app", + Fields: nil, + To: "rpc_", + Exclude: nil, + }, + excludeMap: nil, + }, + }, + { + "add prefix to fields in event root", + mapstr.M{"app": mapstr.M{"version": 1, "method": "2"}, "other": 3}, + mapstr.M{"my_prefix_app": mapstr.M{"version": 1, "method": "2"}, "my_prefix_other": 3}, + &moveFields{ + config: moveFieldsConfig{ + From: "", + Fields: nil, + To: "my_prefix_", + Exclude: nil, + }, + excludeMap: nil, + }, + }, + { + `move field "other" into app.b object`, + mapstr.M{"app": mapstr.M{"version": 1, "method": "2"}, "other": 3}, + mapstr.M{"app": mapstr.M{"version": 1, "method": "2", "b": mapstr.M{"my_prefix_other": 3}}}, + &moveFields{ + config: moveFieldsConfig{ + From: "", + Fields: []string{"other"}, + To: "app.b.my_prefix_", + Exclude: nil, + }, + excludeMap: nil, + }, + }, + } + + for idx, c := range cases { + t.Run(c.name, func(t *testing.T) { + evt := &beat.Event{Fields: c.in.Clone()} + out, err := c.p.Run(evt) + if err != nil { + t.Fatal(err) + } + except, output := c.expected.String(), out.Fields.String() + if !reflect.DeepEqual(c.expected, out.Fields) { + t.Fatalf("move field test case failed, out: %s, except: %s, index: %d\n", + output, except, idx) + } + }) + } +}