Skip to content

Commit

Permalink
Adds support for removing/keeping tags from metrics
Browse files Browse the repository at this point in the history
closes #706
  • Loading branch information
sparrc committed Apr 16, 2016
1 parent f76739c commit 4637a6b
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 229 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
github.com/eclipse/paho.mqtt.golang 0f7a459f04f13a41b7ed752d47944528d4bf9a86
github.com/go-sql-driver/mysql 1fca743146605a172a266e1654e01e5cd5669bee
github.com/gobwas/glob d877f6352135181470c40c73ebb81aefa22115fa
github.com/golang/protobuf 552c7b9542c194800fd493123b3798ef0a832032
github.com/golang/snappy 427fb6fc07997f43afa32f35e850833760e489a7
github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
Expand Down
1 change: 1 addition & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (ac *accumulator) AddFields(
tags[k] = v
}
}
ac.inputConfig.Filter.FilterTags(tags)

result := make(map[string]interface{})
for k, v := range fields {
Expand Down
32 changes: 32 additions & 0 deletions agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,35 @@ func TestAddBools(t *testing.T) {
fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()),
actual)
}

// Test that tag filters get applied to metrics.
func TestAccFilterTags(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
filter := internal_models.Filter{
TagExclude: []string{"acc"},
}
assert.NoError(t, filter.CompileFilter())
a.inputConfig = &internal_models.InputConfig{}
a.inputConfig.Filter = filter

a.Add("acctest", float64(101), map[string]string{})
a.Add("acctest", float64(101), map[string]string{"acc": "test"})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)

testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")

testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest value=101")

testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
actual)
}
77 changes: 53 additions & 24 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
## Generating a Configuration File

A default Telegraf config file can be generated using the -sample-config flag:
`telegraf -sample-config > telegraf.conf`

```
telegraf -sample-config > telegraf.conf
```

To generate a file with specific inputs and outputs, you can use the
-input-filter and -output-filter flags:
`telegraf -sample-config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka`

```
telegraf -sample-config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka
```

You can see the latest config file with all available plugins
[here](https://github.com/influxdata/telegraf/blob/master/etc/telegraf.conf)

## Environment Variables

Expand All @@ -17,8 +26,8 @@ for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)

## `[global_tags]` Configuration

Global tags can be specific in the `[global_tags]` section of the config file in
key="value" format. All metrics being gathered on this host will be tagged
Global tags can be specified in the `[global_tags]` section of the config file
in key="value" format. All metrics being gathered on this host will be tagged
with the tags specified here.

## `[agent]` Configuration
Expand Down Expand Up @@ -47,36 +56,42 @@ ie, a jitter of 5s and flush_interval 10s means flushes will happen every 10-15s
* **quiet**: Run telegraf in quiet mode.
* **hostname**: Override default hostname, if empty use os.Hostname().

## `[inputs.xxx]` Configuration

There are some configuration options that are configurable per input:

* **name_override**: Override the base name of the measurement.
(Default is the name of the input).
* **name_prefix**: Specifies a prefix to attach to the measurement name.
* **name_suffix**: Specifies a suffix to attach to the measurement name.
* **tags**: A map of tags to apply to a specific input's measurements.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular input should be run less or more often,
you can configure that here.

#### Input Filters
#### Measurement Filtering

There are also filters that can be configured per input:
Filters can be configured per input or output, see below for examples.

* **namepass**: An array of strings that is used to filter metrics generated by the
current input. Each string in the array is tested as a glob match against
measurement names and if it matches, the field is emitted.
* **namedrop**: The inverse of pass, if a measurement name matches, it is not emitted.
* **fieldpass**: An array of strings that is used to filter metrics generated by the
current input. Each string in the array is tested as a glob match against field names
and if it matches, the field is emitted.
and if it matches, the field is emitted. fieldpass is not available for outputs.
* **fielddrop**: The inverse of pass, if a field name matches, it is not emitted.
fielddrop is not available for outputs.
* **tagpass**: tag names and arrays of strings that are used to filter
measurements by the current input. Each string in the array is tested as a glob
match against the tag name, and if it matches the measurement is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not
emitted. This is tested on measurements that have passed the tagpass test.
* **tagexclude**: tagexclude can be used to exclude a tag from measurement(s).
As opposed to tagdrop, which will drop an entire measurement based on it's
tags, tagexclude simply strips the given tag keys from the measurement.
* **taginclude**: taginclude is the inverse of tagexclude. It will only include
the tag keys in the final measurement.

## Input Configuration

Some configuration options are configurable per input:

* **name_override**: Override the base name of the measurement.
(Default is the name of the input).
* **name_prefix**: Specifies a prefix to attach to the measurement name.
* **name_suffix**: Specifies a suffix to attach to the measurement name.
* **tags**: A map of tags to apply to a specific input's measurements.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular input should be run less or more often,
you can configure that here.

#### Input Configuration Examples

Expand Down Expand Up @@ -155,6 +170,20 @@ fields which begin with `time_`.
namepass = ["rest_client_*"]
```

#### Input Config: taginclude and tagexclude

```toml
# Only include the "cpu" tag in the measurements for the cpu plugin.
[[inputs.cpu]]
percpu = true
totalcpu = true
taginclude = ["cpu"]

# Exclude the "fstype" tag from the measurements for the disk plugin.
[[inputs.disk]]
tagexclude = ["fstype"]
```

#### Input config: prefix, suffix, and override

This plugin will emit measurements with the name `cpu_total`
Expand All @@ -180,6 +209,9 @@ This will emit measurements with the name `foobar`
This plugin will emit measurements with two additional tags: `tag1=foo` and
`tag2=bar`

NOTE: Order matters, the `[inputs.cpu.tags]` table must be at the _end_ of the
plugin definition.

```toml
[[inputs.cpu]]
percpu = false
Expand Down Expand Up @@ -208,15 +240,12 @@ to avoid measurement collisions:
fielddrop = ["cpu_time*"]
```

## `[outputs.xxx]` Configuration
## Output Configuration

Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.

Outputs also support the same configurable options as inputs
(namepass, namedrop, tagpass, tagdrop)

```toml
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
Expand Down
3 changes: 1 addition & 2 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@
- github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE)
- gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE)
- gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
- golang.org/x/crypto/* [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- internal Glob function [MIT LICENSE](https://github.com/ryanuber/go-glob/blob/master/LICENSE)
- golang.org/x/crypto/ [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)

52 changes: 45 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,9 @@ func (c *Config) addInput(name string, table *ast.Table) error {

// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix
// filtering on tags and measurements
func buildFilter(tbl *ast.Table) internal_models.Filter {
// be inserted into the internal_models.OutputConfig/internal_models.InputConfig
// to be used for glob filtering on tags and measurements
func buildFilter(tbl *ast.Table) (internal_models.Filter, error) {
f := internal_models.Filter{}

if node, ok := tbl.Fields["namepass"]; ok {
Expand Down Expand Up @@ -681,6 +681,33 @@ func buildFilter(tbl *ast.Table) internal_models.Filter {
}
}

if node, ok := tbl.Fields["tagexclude"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.TagExclude = append(f.TagExclude, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["taginclude"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
f.TagInclude = append(f.TagInclude, str.Value)
}
}
}
}
}
if err := f.CompileFilter(); err != nil {
return f, err
}

delete(tbl.Fields, "namedrop")
delete(tbl.Fields, "namepass")
delete(tbl.Fields, "fielddrop")
Expand All @@ -689,7 +716,9 @@ func buildFilter(tbl *ast.Table) internal_models.Filter {
delete(tbl.Fields, "pass")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return f
delete(tbl.Fields, "tagexclude")
delete(tbl.Fields, "taginclude")
return f, nil
}

// buildInput parses input specific items from the ast.Table,
Expand Down Expand Up @@ -748,7 +777,11 @@ func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, erro
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
cp.Filter = buildFilter(tbl)
var err error
cp.Filter, err = buildFilter(tbl)
if err != nil {
return cp, err
}
return cp, nil
}

Expand Down Expand Up @@ -864,13 +897,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
return serializers.NewSerializer(c)
}

// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// buildOutput parses output specific items from the ast.Table,
// builds the filter and returns an
// internal_models.OutputConfig to be inserted into internal_models.RunningInput
// Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) {
filter, err := buildFilter(tbl)
if err != nil {
return nil, err
}
oc := &internal_models.OutputConfig{
Name: name,
Filter: buildFilter(tbl),
Filter: filter,
}
// Outputs don't support FieldDrop/FieldPass, so set to NameDrop/NamePass
if len(oc.Filter.FieldDrop) > 0 {
Expand Down
Loading

0 comments on commit 4637a6b

Please sign in to comment.