Skip to content

Commit

Permalink
Add replace function to strings processor (influxdata#4686)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Zhao authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent 0de3b2b commit 2c805f9
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 19 deletions.
18 changes: 18 additions & 0 deletions plugins/processors/strings/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ Implemented functions are:
- trim_right
- trim_prefix
- trim_suffix
- replace

Please note that in this implementation these are processed in the order that they appear above.

Specify the `measurement`, `tag` or `field` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor.

If you'd like to apply the change to every `tag`, `field`, or `measurement`, use the value "*" for each respective field. Note that the `dest` field will be ignored if "*" is used

### Configuration:

```toml
Expand Down Expand Up @@ -45,6 +48,11 @@ Specify the `measurement`, `tag` or `field` that you want processed in each sect
# [[processors.strings.trim_suffix]]
# field = "read_count"
# suffix = "_count"

# [[processors.strings.replace]]
# measurement = "*"
# old = ":"
# new = "_"
```

#### Trim, TrimLeft, TrimRight
Expand All @@ -56,6 +64,16 @@ The `trim`, `trim_left`, and `trim_right` functions take an optional parameter:
The `trim_prefix` and `trim_suffix` functions remote the given `prefix` or `suffix`
respectively from the string.

#### Replace

The `replace` function does a substring replacement across the entire
string to allow for different conventions between various input and output
plugins. Some example usages are eliminating disallowed characters in
field names or replacing separators between different separators.
Can also be used to eliminate unneeded chars that were in metrics.
If the entire name would be deleted, it will refuse to perform
the operation and keep the old name.

### Example
**Config**
```toml
Expand Down
74 changes: 55 additions & 19 deletions plugins/processors/strings/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Strings struct {
TrimRight []converter `toml:"trim_right"`
TrimPrefix []converter `toml:"trim_prefix"`
TrimSuffix []converter `toml:"trim_suffix"`
Replace []converter `toml:"replace"`

converters []converter
init bool
Expand All @@ -31,6 +32,8 @@ type converter struct {
Cutset string
Suffix string
Prefix string
Old string
New string

fn ConvertFunc
}
Expand Down Expand Up @@ -68,6 +71,12 @@ const sampleConfig = `
# [[processors.strings.trim_suffix]]
# field = "read_count"
# suffix = "_count"
## Replace substrings within field names
# [[processors.strings.trim_suffix]]
# measurement = "*"
# old = ":"
# new = "_"
`

func (s *Strings) SampleConfig() string {
Expand All @@ -79,37 +88,53 @@ func (s *Strings) Description() string {
}

func (c *converter) convertTag(metric telegraf.Metric) {
tv, ok := metric.GetTag(c.Tag)
if !ok {
return
var tags map[string]string
if c.Tag == "*" {
tags = metric.Tags()
} else {
tags = make(map[string]string)
tv, ok := metric.GetTag(c.Tag)
if !ok {
return
}
tags[c.Tag] = tv
}

dest := c.Tag
if c.Dest != "" {
dest = c.Dest
for tag, value := range tags {
dest := tag
if c.Tag != "*" && c.Dest != "" {
dest = c.Dest
}
metric.AddTag(dest, c.fn(value))
}

metric.AddTag(dest, c.fn(tv))
}

func (c *converter) convertField(metric telegraf.Metric) {
fv, ok := metric.GetField(c.Field)
if !ok {
return
}

dest := c.Field
if c.Dest != "" {
dest = c.Dest
var fields map[string]interface{}
if c.Field == "*" {
fields = metric.Fields()
} else {
fields = make(map[string]interface{})
fv, ok := metric.GetField(c.Field)
if !ok {
return
}
fields[c.Field] = fv
}

if fv, ok := fv.(string); ok {
metric.AddField(dest, c.fn(fv))
for tag, value := range fields {
dest := tag
if c.Tag != "*" && c.Dest != "" {
dest = c.Dest
}
if fv, ok := value.(string); ok {
metric.AddField(dest, c.fn(fv))
}
}
}

func (c *converter) convertMeasurement(metric telegraf.Metric) {
if metric.Name() != c.Measurement {
if metric.Name() != c.Measurement && c.Measurement != "*" {
return
}

Expand Down Expand Up @@ -176,6 +201,17 @@ func (s *Strings) initOnce() {
c.fn = func(s string) string { return strings.TrimSuffix(s, c.Suffix) }
s.converters = append(s.converters, c)
}
for _, c := range s.Replace {
c.fn = func(s string) string {
newString := strings.Replace(s, c.Old, c.New, -1)
if newString == "" {
return s
} else {
return newString
}
}
s.converters = append(s.converters, c)
}

s.init = true
}
Expand Down
49 changes: 49 additions & 0 deletions plugins/processors/strings/strings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,52 @@ func TestReadmeExample(t *testing.T) {
assert.Equal(t, expectedFields, processed[0].Fields())
assert.Equal(t, expectedTags, processed[0].Tags())
}

func newMetric(name string) telegraf.Metric {
tags := map[string]string{}
fields := map[string]interface{}{}
m, _ := metric.New(name, tags, fields, time.Now())
return m
}

func TestMeasurementReplace(t *testing.T) {
plugin := &Strings{
Replace: []converter{
converter{
Old: "_",
New: "-",
Measurement: "*",
},
},
}
metrics := []telegraf.Metric{
newMetric("foo:some_value:bar"),
newMetric("average:cpu:usage"),
newMetric("average_cpu_usage"),
}
results := plugin.Apply(metrics...)
assert.Equal(t, "foo:some-value:bar", results[0].Name(), "`_` was not changed to `-`")
assert.Equal(t, "average:cpu:usage", results[1].Name(), "Input name should have been unchanged")
assert.Equal(t, "average-cpu-usage", results[2].Name(), "All instances of `_` should have been changed to `-`")
}

func TestMeasurementCharDeletion(t *testing.T) {
plugin := &Strings{
Replace: []converter{
converter{
Old: "foo",
New: "",
Measurement: "*",
},
},
}
metrics := []telegraf.Metric{
newMetric("foo:bar:baz"),
newMetric("foofoofoo"),
newMetric("barbarbar"),
}
results := plugin.Apply(metrics...)
assert.Equal(t, ":bar:baz", results[0].Name(), "Should have deleted the initial `foo`")
assert.Equal(t, "foofoofoo", results[1].Name(), "Should have refused to delete the whole string")
assert.Equal(t, "barbarbar", results[2].Name(), "Should not have changed the input")
}

0 comments on commit 2c805f9

Please sign in to comment.