Skip to content

Commit

Permalink
Add collectd parser (#2654)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Apr 12, 2017
1 parent 0193cbe commit 2c98e5a
Show file tree
Hide file tree
Showing 12 changed files with 592 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ be deprecated eventually.
- [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
- [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
- [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input
- [#1100](https://github.com/influxdata/telegraf/issues/1100): Add collectd parser

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ Telegraf can also collect metrics via the following service plugins:
* [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar)

Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option:

* [InfluxDB Line Protocol](./docs/DATA_FORMATS_INPUT.md#influx)
* [JSON](./docs/DATA_FORMATS_INPUT.md#json)
* [Graphite](./docs/DATA_FORMATS_INPUT.md#graphite)
* [Value](./docs/DATA_FORMATS_INPUT.md#value)
* [Nagios](./docs/DATA_FORMATS_INPUT.md#nagios)
* [Collectd](./docs/DATA_FORMATS_INPUT.md#collectd)

## Processor Plugins

* [printer](./plugins/processors/printer)
Expand Down
41 changes: 41 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -438,3 +439,43 @@ Note: Nagios Input Data Formats is only supported in `exec` input plugin.
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "nagios"
```

# Collectd:

The collectd format parses the collectd binary network protocol. Tags are
created for host, instance, type, and type instance. All collectd values are
added as float64 fields.

For more information about the binary network protocol see
[here](https://collectd.org/wiki/index.php/Binary_protocol).

You can control the cryptographic settings with parser options. Create an
authentication file and set `collectd_auth_file` to the path of the file, then
set the desired security level in `collectd_security_level`.

Additional information including client setup can be found
[here](https://collectd.org/wiki/index.php/Networking_introduction#Cryptographic_setup).

You can also change the path to the typesdb or add additional typesdb using
`collectd_typesdb`.

#### Collectd Configuration:

```toml
[[inputs.socket_listener]]
service_address = "udp://127.0.0.1:25826"
name_prefix = "collectd_"

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "collectd"

## Authentication file for cryptographic security levels
collectd_auth_file = "/etc/collectd/auth_file"
## One of none (default), sign, or encrypt
collectd_security_level = "encrypt"
## Path of to TypesDB specifications
collectd_typesdb = ["/usr/share/collectd/types.db"]
```
2 changes: 1 addition & 1 deletion docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# List
- collectd.org [MIT LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
- github.com/Shopify/sarama [MIT LICENSE](https://github.com/Shopify/sarama/blob/master/MIT-LICENSE)
- github.com/Sirupsen/logrus [MIT LICENSE](https://github.com/Sirupsen/logrus/blob/master/LICENSE)
- github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE)
Expand Down Expand Up @@ -30,4 +31,3 @@
- gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE)
- gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
- golang.org/x/crypto/ [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)

31 changes: 31 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,13 +1230,44 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["collectd_auth_file"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CollectdAuthFile = str.Value
}
}
}

if node, ok := tbl.Fields["collectd_security_level"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CollectdSecurityLevel = str.Value
}
}
}

if node, ok := tbl.Fields["collectd_typesdb"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CollectdTypesDB = append(c.CollectdTypesDB, str.Value)
}
}
}
}
}

c.MetricName = name

delete(tbl.Fields, "data_format")
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
delete(tbl.Fields, "collectd_typesdb")

return parsers.NewParser(c)
}
Expand Down
11 changes: 10 additions & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"io"
"log"
"os"
"regexp"
"time"

"github.com/influxdata/wlog"
)

var prefixRegex = regexp.MustCompile("^[DIWE]!")

// newTelegrafWriter returns a logging-wrapped writer.
func newTelegrafWriter(w io.Writer) io.Writer {
return &telegrafLog{
Expand All @@ -21,7 +24,13 @@ type telegrafLog struct {
}

func (t *telegrafLog) Write(b []byte) (n int, err error) {
return t.writer.Write(append([]byte(time.Now().UTC().Format(time.RFC3339)+" "), b...))
var line []byte
if !prefixRegex.Match(b) {
line = append([]byte(time.Now().UTC().Format(time.RFC3339)+" I! "), b...)
} else {
line = append([]byte(time.Now().UTC().Format(time.RFC3339)+" "), b...)
}
return t.writer.Write(line)
}

// SetupLogging configures the logging output.
Expand Down
13 changes: 13 additions & 0 deletions logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ func TestErrorWriteLogToFile(t *testing.T) {
assert.Equal(t, f[19:], []byte("Z E! TEST\n"))
}

func TestAddDefaultLogLevel(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
assert.NoError(t, err)
defer func() { os.Remove(tmpfile.Name()) }()

SetupLogging(true, false, tmpfile.Name())
log.Printf("TEST")

f, err := ioutil.ReadFile(tmpfile.Name())
assert.NoError(t, err)
assert.Equal(t, f[19:], []byte("Z I! TEST\n"))
}

func BenchmarkTelegrafLogWrite(b *testing.B) {
var msg = []byte("test")
var buf bytes.Buffer
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
for scnr.Scan() {
metrics, err := ssl.Parse(scnr.Bytes())
if err != nil {
ssl.AddError(fmt.Errorf("unable to parse incoming line"))
ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
//TODO rate limit
continue
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (psl *packetSocketListener) listen() {

metrics, err := psl.Parse(buf[:n])
if err != nil {
psl.AddError(fmt.Errorf("unable to parse incoming packet"))
psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
//TODO rate limit
continue
}
Expand Down
165 changes: 165 additions & 0 deletions plugins/parsers/collectd/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package collectd

import (
"errors"
"fmt"
"log"
"os"

"collectd.org/api"
"collectd.org/network"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

const (
DefaultAuthFile = "/etc/collectd/auth_file"
)

type CollectdParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string

popts network.ParseOpts
}

func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
p.popts = *popts
}

func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
) (*CollectdParser, error) {
popts := network.ParseOpts{}

switch securityLevel {
case "none":
popts.SecurityLevel = network.None
case "sign":
popts.SecurityLevel = network.Sign
case "encrypt":
popts.SecurityLevel = network.Encrypt
default:
popts.SecurityLevel = network.None
}

if authFile == "" {
authFile = DefaultAuthFile
}
popts.PasswordLookup = network.NewAuthFile(authFile)

for _, path := range typesDB {
db, err := LoadTypesDB(path)
if err != nil {
return nil, err
}

if popts.TypesDB != nil {
popts.TypesDB.Merge(db)
} else {
popts.TypesDB = db
}
}

parser := CollectdParser{popts: popts}
return &parser, nil
}

func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
valueLists, err := network.Parse(buf, p.popts)
if err != nil {
return nil, fmt.Errorf("Collectd parser error: %s", err)
}

metrics := []telegraf.Metric{}
for _, valueList := range valueLists {
metrics = append(metrics, UnmarshalValueList(valueList)...)
}

if len(p.DefaultTags) > 0 {
for _, m := range metrics {
for k, v := range p.DefaultTags {
// only set the default tag if it doesn't already exist:
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}

return metrics, nil
}

func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}

if len(metrics) != 1 {
return nil, errors.New("Line contains multiple metrics")
}

return metrics[0], nil
}

func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

// UnmarshalValueList translates a ValueList into a Telegraf metric.
func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric {
timestamp := vl.Time.UTC()

var metrics []telegraf.Metric
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})

// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}

if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}

// Drop invalid points
m, err := metric.New(name, tags, fields, timestamp)
if err != nil {
log.Printf("E! Dropping metric %v: %v", name, err)
continue
}

metrics = append(metrics, m)
}
return metrics
}

func LoadTypesDB(path string) (*api.TypesDB, error) {
reader, err := os.Open(path)
if err != nil {
return nil, err
}
return api.NewTypesDB(reader)
}
Loading

0 comments on commit 2c98e5a

Please sign in to comment.