Skip to content

Commit

Permalink
Refactor and code cleanup of filtering
Browse files Browse the repository at this point in the history
started working on this with the idea of fixing influxdata#1623, although I
realized that this was actually just a documentation issue around
a toml eccentricity.

closes influxdata#1623
  • Loading branch information
sparrc committed Sep 5, 2016
1 parent b63dedb commit 50ef328
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 173 deletions.
43 changes: 14 additions & 29 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,8 @@ func (ac *accumulator) makeMetric(
if len(fields) == 0 || len(measurement) == 0 {
return nil
}

if !ac.inputConfig.Filter.ShouldNamePass(measurement) {
return nil
}

if !ac.inputConfig.Filter.ShouldTagsPass(tags) {
return nil
if tags == nil {
tags = make(map[string]string)
}

// Override measurement name if set
Expand All @@ -104,9 +99,6 @@ func (ac *accumulator) makeMetric(
measurement = measurement + ac.inputConfig.MeasurementSuffix
}

if tags == nil {
tags = make(map[string]string)
}
// Apply plugin-wide tags if set
for k, v := range ac.inputConfig.Tags {
if _, ok := tags[k]; !ok {
Expand All @@ -119,25 +111,21 @@ func (ac *accumulator) makeMetric(
tags[k] = v
}
}
ac.inputConfig.Filter.FilterTags(tags)

result := make(map[string]interface{})
for k, v := range fields {
// Filter out any filtered fields
if ac.inputConfig != nil {
if !ac.inputConfig.Filter.ShouldFieldsPass(k) {
continue
}
}
// Apply the metric filter(s)
if ok := ac.inputConfig.Filter.Apply(measurement, fields, tags); !ok {
return nil
}

for k, v := range fields {
// Validate uint64 and float64 fields
switch val := v.(type) {
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
result[k] = int64(val)
fields[k] = int64(val)
} else {
result[k] = int64(9223372036854775807)
fields[k] = int64(9223372036854775807)
}
continue
case float64:
Expand All @@ -148,15 +136,12 @@ func (ac *accumulator) makeMetric(
"field, skipping",
measurement, k)
}
delete(fields, k)
continue
}
}

result[k] = v
}
fields = nil
if len(result) == 0 {
return nil
fields[k] = v
}

var timestamp time.Time
Expand All @@ -171,11 +156,11 @@ func (ac *accumulator) makeMetric(
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, result, timestamp)
m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, result, timestamp)
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp)
default:
m, err = telegraf.NewMetric(measurement, tags, result, timestamp)
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func TestAccFilterTags(t *testing.T) {
filter := models.Filter{
TagExclude: []string{"acc"},
}
assert.NoError(t, filter.CompileFilter())
assert.NoError(t, filter.Compile())
a.inputConfig = &models.InputConfig{}
a.inputConfig.Filter = filter

Expand Down
8 changes: 8 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ as it is more efficient to filter out tags at the ingestion point.
* **taginclude**: taginclude is the inverse of tagexclude. It will only include
the tag keys in the final measurement.

**NOTE** `tagpass` and `tagdrop` parameters must be defined at the _end_ of
the plugin definition, otherwise subsequent plugin config options will be
interpreted as part of the tagpass/tagdrop map.

## Input Configuration

Some configuration options are configurable per input:
Expand Down Expand Up @@ -129,6 +133,10 @@ fields which begin with `time_`.

#### Input Config: tagpass and tagdrop

**NOTE** `tagpass` and `tagdrop` parameters must be defined at the _end_ of
the plugin definition, otherwise subsequent plugin config options will be
interpreted as part of the tagpass/tagdrop map.

```toml
[[inputs.cpu]]
percpu = true
Expand Down
6 changes: 3 additions & 3 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ type Filter interface {
Match(string) bool
}

// CompileFilter takes a list of string filters and returns a Filter interface
// Compile takes a list of string filters and returns a Filter interface
// for matching a given string against the filter list. The filter list
// supports glob matching too, ie:
//
// f, _ := CompileFilter([]string{"cpu", "mem", "net*"})
// f, _ := Compile([]string{"cpu", "mem", "net*"})
// f.Match("cpu") // true
// f.Match("network") // true
// f.Match("memory") // false
//
func CompileFilter(filters []string) (Filter, error) {
func Compile(filters []string) (Filter, error) {
// return if there is nothing to compile
if len(filters) == 0 {
return nil, nil
Expand Down
24 changes: 12 additions & 12 deletions filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,30 @@ import (
"github.com/stretchr/testify/assert"
)

func TestCompileFilter(t *testing.T) {
f, err := CompileFilter([]string{})
func TestCompile(t *testing.T) {
f, err := Compile([]string{})
assert.NoError(t, err)
assert.Nil(t, f)

f, err = CompileFilter([]string{"cpu"})
f, err = Compile([]string{"cpu"})
assert.NoError(t, err)
assert.True(t, f.Match("cpu"))
assert.False(t, f.Match("cpu0"))
assert.False(t, f.Match("mem"))

f, err = CompileFilter([]string{"cpu*"})
f, err = Compile([]string{"cpu*"})
assert.NoError(t, err)
assert.True(t, f.Match("cpu"))
assert.True(t, f.Match("cpu0"))
assert.False(t, f.Match("mem"))

f, err = CompileFilter([]string{"cpu", "mem"})
f, err = Compile([]string{"cpu", "mem"})
assert.NoError(t, err)
assert.True(t, f.Match("cpu"))
assert.False(t, f.Match("cpu0"))
assert.True(t, f.Match("mem"))

f, err = CompileFilter([]string{"cpu", "mem", "net*"})
f, err = Compile([]string{"cpu", "mem", "net*"})
assert.NoError(t, err)
assert.True(t, f.Match("cpu"))
assert.False(t, f.Match("cpu0"))
Expand All @@ -40,7 +40,7 @@ func TestCompileFilter(t *testing.T) {
var benchbool bool

func BenchmarkFilterSingleNoGlobFalse(b *testing.B) {
f, _ := CompileFilter([]string{"cpu"})
f, _ := Compile([]string{"cpu"})
var tmp bool
for n := 0; n < b.N; n++ {
tmp = f.Match("network")
Expand All @@ -49,7 +49,7 @@ func BenchmarkFilterSingleNoGlobFalse(b *testing.B) {
}

func BenchmarkFilterSingleNoGlobTrue(b *testing.B) {
f, _ := CompileFilter([]string{"cpu"})
f, _ := Compile([]string{"cpu"})
var tmp bool
for n := 0; n < b.N; n++ {
tmp = f.Match("cpu")
Expand All @@ -58,7 +58,7 @@ func BenchmarkFilterSingleNoGlobTrue(b *testing.B) {
}

func BenchmarkFilter(b *testing.B) {
f, _ := CompileFilter([]string{"cpu", "mem", "net*"})
f, _ := Compile([]string{"cpu", "mem", "net*"})
var tmp bool
for n := 0; n < b.N; n++ {
tmp = f.Match("network")
Expand All @@ -67,7 +67,7 @@ func BenchmarkFilter(b *testing.B) {
}

func BenchmarkFilterNoGlob(b *testing.B) {
f, _ := CompileFilter([]string{"cpu", "mem", "net"})
f, _ := Compile([]string{"cpu", "mem", "net"})
var tmp bool
for n := 0; n < b.N; n++ {
tmp = f.Match("net")
Expand All @@ -76,7 +76,7 @@ func BenchmarkFilterNoGlob(b *testing.B) {
}

func BenchmarkFilter2(b *testing.B) {
f, _ := CompileFilter([]string{"aa", "bb", "c", "ad", "ar", "at", "aq",
f, _ := Compile([]string{"aa", "bb", "c", "ad", "ar", "at", "aq",
"aw", "az", "axxx", "ab", "cpu", "mem", "net*"})
var tmp bool
for n := 0; n < b.N; n++ {
Expand All @@ -86,7 +86,7 @@ func BenchmarkFilter2(b *testing.B) {
}

func BenchmarkFilter2NoGlob(b *testing.B) {
f, _ := CompileFilter([]string{"aa", "bb", "c", "ad", "ar", "at", "aq",
f, _ := Compile([]string{"aa", "bb", "c", "ad", "ar", "at", "aq",
"aw", "az", "axxx", "ab", "cpu", "mem", "net"})
var tmp bool
for n := 0; n < b.N; n++ {
Expand Down
8 changes: 1 addition & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,6 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.NamePass = append(f.NamePass, str.Value)
f.IsActive = true
}
}
}
Expand All @@ -678,7 +677,6 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.NameDrop = append(f.NameDrop, str.Value)
f.IsActive = true
}
}
}
Expand All @@ -693,7 +691,6 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.FieldPass = append(f.FieldPass, str.Value)
f.IsActive = true
}
}
}
Expand All @@ -709,7 +706,6 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.FieldDrop = append(f.FieldDrop, str.Value)
f.IsActive = true
}
}
}
Expand All @@ -730,7 +726,6 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
}
}
f.TagPass = append(f.TagPass, *tagfilter)
f.IsActive = true
}
}
}
Expand All @@ -749,7 +744,6 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
}
}
f.TagDrop = append(f.TagDrop, *tagfilter)
f.IsActive = true
}
}
}
Expand Down Expand Up @@ -778,7 +772,7 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
}
}
}
if err := f.CompileFilter(); err != nil {
if err := f.Compile(); err != nil {
return f, err
}

Expand Down
9 changes: 3 additions & 6 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
Filter: []string{"mytag"},
},
},
IsActive: true,
}
assert.NoError(t, filter.CompileFilter())
assert.NoError(t, filter.Compile())
mConfig := &models.InputConfig{
Name: "memcached",
Filter: filter,
Expand Down Expand Up @@ -83,9 +82,8 @@ func TestConfig_LoadSingleInput(t *testing.T) {
Filter: []string{"mytag"},
},
},
IsActive: true,
}
assert.NoError(t, filter.CompileFilter())
assert.NoError(t, filter.Compile())
mConfig := &models.InputConfig{
Name: "memcached",
Filter: filter,
Expand Down Expand Up @@ -130,9 +128,8 @@ func TestConfig_LoadDirectory(t *testing.T) {
Filter: []string{"mytag"},
},
},
IsActive: true,
}
assert.NoError(t, filter.CompileFilter())
assert.NoError(t, filter.Compile())
mConfig := &models.InputConfig{
Name: "memcached",
Filter: filter,
Expand Down
Loading

0 comments on commit 50ef328

Please sign in to comment.