-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
libbeat: add processors "decode_duration", "move_fields" (#31301)
Co-authored-by: hxms <[email protected]> Co-authored-by: DeDe Morton <[email protected]>
- Loading branch information
1 parent
0301a82
commit 2ffae42
Showing
8 changed files
with
561 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package decode_duration | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/processors" | ||
"github.com/elastic/beats/v7/libbeat/processors/checks" | ||
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" | ||
"github.com/elastic/elastic-agent-libs/config" | ||
) | ||
|
||
func init() { | ||
processors.RegisterPlugin("decode_duration", | ||
checks.ConfigChecked(NewDecodeDuration, | ||
checks.RequireFields("field", "format"))) | ||
jsprocessor.RegisterPlugin("DecodeDuration", NewDecodeDuration) | ||
} | ||
|
||
type decodeDurationConfig struct { | ||
Field string `config:"field"` | ||
Format string `config:"format"` | ||
} | ||
|
||
type decodeDuration struct { | ||
config decodeDurationConfig | ||
} | ||
|
||
func (u decodeDuration) Run(event *beat.Event) (*beat.Event, error) { | ||
fields := event.Fields | ||
fieldName := u.config.Field | ||
x, err := fields.GetValue(fieldName) | ||
if err != nil { | ||
return event, fmt.Errorf("field '%s' not present on event", fieldName) | ||
} | ||
durationString, ok := x.(string) | ||
if !ok { | ||
return event, fmt.Errorf("field '%s' is not a string, value: '%#v'", fieldName, x) | ||
} | ||
d, err := time.ParseDuration(durationString) | ||
if err != nil { | ||
return event, nil | ||
} | ||
switch u.config.Format { | ||
case "milliseconds": | ||
// keep the result is type float64 | ||
x = float64(d.Milliseconds()) | ||
case "seconds": | ||
x = d.Seconds() | ||
case "minutes": | ||
x = d.Minutes() | ||
case "hours": | ||
x = d.Hours() | ||
default: | ||
x = float64(d.Milliseconds()) | ||
} | ||
if _, err = fields.Put(fieldName, x); err != nil { | ||
return event, fmt.Errorf("put field '%s' back to event failed: %w", fieldName, err) | ||
} | ||
return event, nil | ||
} | ||
|
||
func (u decodeDuration) String() string { | ||
return "decode_duration" | ||
} | ||
|
||
func NewDecodeDuration(c *config.C) (processors.Processor, error) { | ||
fc := decodeDurationConfig{} | ||
err := c.Unpack(&fc) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to unpack decode duration config: %w", err) | ||
} | ||
|
||
return &decodeDuration{ | ||
config: fc, | ||
}, nil | ||
} |
73 changes: 73 additions & 0 deletions
73
libbeat/processors/decode_duration/decode_duration_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package decode_duration | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/elastic-agent-libs/mapstr" | ||
) | ||
|
||
func TestDecodeDuration(t *testing.T) { | ||
cases := []struct { | ||
Duration time.Duration | ||
Format string | ||
Result float64 | ||
}{ | ||
{time.Second + time.Millisecond, "", 1001}, | ||
{time.Second + time.Millisecond, "milliseconds", 1001}, | ||
{time.Second + time.Millisecond, "seconds", 1.001}, | ||
{3 * time.Second, "minutes", 0.05}, | ||
{3 * time.Minute, "hours", 0.05}, | ||
} | ||
|
||
for _, testCase := range cases { | ||
t.Run(fmt.Sprintf("%s format as %s", testCase.Duration, testCase.Format), func(t *testing.T) { | ||
evt := &beat.Event{Fields: mapstr.M{}} | ||
c := &decodeDuration{ | ||
config: decodeDurationConfig{ | ||
Field: "duration", | ||
Format: testCase.Format, | ||
}, | ||
} | ||
if _, err := evt.PutValue("duration", testCase.Duration.String()); err != nil { | ||
t.Fatal(err) | ||
} | ||
evt, err := c.Run(evt) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
d, err := evt.GetValue("duration") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
floatD, ok := d.(float64) | ||
if !ok { | ||
t.Fatal("result value is not duration") | ||
} | ||
floatD = math.Round(floatD*math.Pow10(6)) / math.Pow10(6) | ||
if floatD != testCase.Result { | ||
t.Fatalf("test case except: %f, actual: %f", testCase.Result, floatD) | ||
} | ||
}) | ||
} | ||
} |
26 changes: 26 additions & 0 deletions
26
libbeat/processors/decode_duration/docs/decode_duration.asciidoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
[[decode-duration]] | ||
=== Decode duration | ||
|
||
++++ | ||
<titleabbrev>decode_duration</titleabbrev> | ||
++++ | ||
|
||
The `decode_duration` processor decodes a Go-style duration string into a specific `format`. | ||
|
||
For more information about the Go `time.Duration` string style, refer to the https://pkg.go.dev/time#Duration[Go documentation]. | ||
|
||
.Decode-Duration options | ||
[options="header"] | ||
|====== | ||
| Name | Required | Default | Description | | ||
| `field` | yes | | Which field of event needs to be decoded as `time.Duration` | | ||
| `format` | yes | `milliseconds` | Supported formats: `milliseconds`/`seconds`/`minutes`/`hours` | | ||
|====== | ||
|
||
[source,yaml] | ||
---- | ||
processors: | ||
- decode_duration: | ||
field: "app.rpc.cost" | ||
format: "milliseconds" | ||
---- |
101 changes: 101 additions & 0 deletions
101
libbeat/processors/move_fields/docs/move_fields.asciidoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
[[move-fields]] | ||
=== Move fields | ||
|
||
++++ | ||
<titleabbrev>move_fields</titleabbrev> | ||
++++ | ||
|
||
The `move_fields` processor moves event fields from one object into another. It can also rearrange fields or add a prefix to fields. | ||
|
||
The processor extracts fields from `from`, then uses `fields` and `exclude` as filters to choose which fields to move into the `to` field. | ||
|
||
For example, given the following event: | ||
|
||
[source,json] | ||
---- | ||
{ | ||
"app": { | ||
"method": "a", | ||
"elapsed_time": 100, | ||
"user_id": 100, | ||
"message": "i'm a message" | ||
} | ||
} | ||
---- | ||
|
||
To move `method` and `elapsed_time` into another object, use this configuration: | ||
|
||
[source,yaml] | ||
---- | ||
processors: | ||
- move_fields: | ||
from: "app" | ||
fields: ["method", "elapsed_time"], | ||
to: "rpc." | ||
---- | ||
|
||
Your final event will be: | ||
|
||
[source,json] | ||
---- | ||
{ | ||
"app": { | ||
"user_id": 100, | ||
"message": "i'm a message", | ||
"rpc": { | ||
"method": "a", | ||
"elapsed_time": 100 | ||
} | ||
} | ||
} | ||
---- | ||
|
||
|
||
To add a prefix to the whole event: | ||
|
||
[source,json] | ||
---- | ||
{ | ||
"app": { "method": "a"}, | ||
"cost": 100 | ||
} | ||
---- | ||
|
||
Use this configuration: | ||
|
||
[source,yaml] | ||
---- | ||
processors: | ||
- move_fields: | ||
to: "my_prefix_" | ||
---- | ||
|
||
Your final event will be: | ||
|
||
[source,json] | ||
---- | ||
{ | ||
"my_prefix_app": { "method": "a"}, | ||
"my_prefix_cost": 100 | ||
} | ||
---- | ||
|
||
.Move-fields options | ||
[options="header"] | ||
|====== | ||
| Name | Required | Default | Description | | ||
| `from` | no | | Which field you want extract. This field and any nested fields will be moved into `to` unless they are filtered out. If empty, indicates event root. | | ||
| `fields` | no | | Which fields to extract from `from` and move to `to`. An empty list indicates all fields. | | ||
| `ignore_missing` | no | false | Ignore "not found" errors when extracting fields. | | ||
| `exclude` | no | | A list of fields to exclude and not move. | | ||
| `to` | yes | | These fields extract from `from` destination field prefix the `to` will base on fields root. | | ||
|====== | ||
|
||
[source,yaml] | ||
---- | ||
processors: | ||
- move_fields: | ||
from: "app" | ||
fields: [ "method", "elapsed_time" ] | ||
to: "rpc." | ||
---- |
Oops, something went wrong.