Skip to content

Commit

Permalink
Add nagios parser for exec input plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
titilambert committed Mar 3, 2016
1 parent ed9937b commit afd2b03
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ Currently implemented sources:
* docker
* dovecot
* elasticsearch
* exec (generic executable plugin, support JSON, influx and graphite)
* exec (generic executable plugin, support JSON, influx, graphite and nagios)
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* influxdb
Expand Down
42 changes: 37 additions & 5 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"os/exec"
"sync"
"syscall"

"github.com/gonuts/go-shellquote"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
)

const sampleConfig = `
Expand All @@ -20,7 +22,7 @@ const sampleConfig = `
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Data format to consume. This can be "json", "influx" or "graphite"
## Data format to consume. This can be "json", "influx", "graphite" or "nagios
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
Expand All @@ -46,12 +48,32 @@ func NewExec() *Exec {
}

type Runner interface {
Run(*Exec, string) ([]byte, error)
Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
}

type CommandRunner struct{}

func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
nagiosState := 0
if exitCode != nil {
exiterr, ok := exitCode.(*exec.ExitError)
if ok {
status, ok := exiterr.Sys().(syscall.WaitStatus)
if ok {
nagiosState = status.ExitStatus()
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
}
fields := map[string]interface{}{"state": nagiosState}
acc.AddFields("nagios_state", fields, nil)
return nil
}

func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
Expand All @@ -63,7 +85,17 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
cmd.Stdout = &out

if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
}
} else {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(nil, acc)
}
}

return out.Bytes(), nil
Expand All @@ -72,7 +104,7 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done()

out, err := e.runner.Run(e, command)
out, err := e.runner.Run(e, command, acc)
if err != nil {
e.errChan <- err
return
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"

"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -57,7 +58,7 @@ func newRunnerMock(out []byte, err error) Runner {
}
}

func (r runnerMock) Run(e *Exec, command string) ([]byte, error) {
func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
Expand Down
102 changes: 102 additions & 0 deletions plugins/parsers/nagios/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package nagios

import (
"regexp"
"strings"
"time"

"github.com/influxdata/telegraf"
)

type NagiosParser struct {
MetricName string
DefaultTags map[string]string
}

// Got from Alignak
// https://github.com/Alignak-monitoring/alignak/blob/develop/alignak/misc/perfdata.py
var perfSplitRegExp, _ = regexp.Compile(`([^=]+=\S+)`)
var nagiosRegExp, _ = regexp.Compile(`^([^=]+)=([\d\.\-\+eE]+)([\w\/%]*);?([\d\.\-\+eE:~@]+)?;?([\d\.\-\+eE:~@]+)?;?([\d\.\-\+eE]+)?;?([\d\.\-\+eE]+)?;?\s*`)

func (p *NagiosParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
return metrics[0], err
}

func (p *NagiosParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

//> rta,host=absol,unit=ms critical=6000,min=0,value=0.332,warning=4000 1456374625003628099
//> pl,host=absol,unit=% critical=90,min=0,value=0,warning=80 1456374625003693967

func (p *NagiosParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
// Convert to string
out := string(buf)
// Prepare output for splitting
// Delete escaped pipes
out = strings.Replace(out, `\|`, "___PROTECT_PIPE___", -1)
// Split lines and get the first one
lines := strings.Split(out, "\n")
// Split output and perfdatas
data_splitted := strings.Split(lines[0], "|")
if len(data_splitted) <= 1 {
// No pipe == no perf data
return nil, nil
}
// Get perfdatas
perfdatas := data_splitted[1]
// Add escaped pipes
perfdatas = strings.Replace(perfdatas, "___PROTECT_PIPE___", `\|`, -1)
// Split perfs
unParsedPerfs := perfSplitRegExp.FindAllSubmatch([]byte(perfdatas), -1)
// Iterate on all perfs
for _, unParsedPerfs := range unParsedPerfs {
// Get metrics
// Trim perf
trimedPerf := strings.Trim(string(unParsedPerfs[0]), " ")
// Parse perf
perf := nagiosRegExp.FindAllSubmatch([]byte(trimedPerf), -1)
// Bad string
if len(perf) == 0 {
continue
}
if len(perf[0]) <= 2 {
continue
}
if perf[0][1] == nil || perf[0][2] == nil {
continue
}
fieldName := string(perf[0][1])
tags := make(map[string]string)
if perf[0][3] != nil {
tags["unit"] = string(perf[0][3])
}
fields := make(map[string]interface{})
fields["value"] = perf[0][2]
// TODO should we set empty field
// if metric if there is no data ?
if perf[0][4] != nil {
fields["warning"] = perf[0][4]
}
if perf[0][5] != nil {
fields["critical"] = perf[0][5]
}
if perf[0][6] != nil {
fields["min"] = perf[0][6]
}
if perf[0][7] != nil {
fields["max"] = perf[0][7]
}
// Create metric
metric, err := telegraf.NewMetric(fieldName, tags, fields, time.Now().UTC())
if err != nil {
return nil, err
}
// Add Metric
metrics = append(metrics, metric)
}

return metrics, nil
}
89 changes: 89 additions & 0 deletions plugins/parsers/nagios/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package nagios

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const validOutput1 = `PING OK - Packet loss = 0%, RTA = 0.30 ms|rta=0.298000ms;4000.000000;6000.000000;0.000000 pl=0%;80;90;0;100
This is a long output
with three lines
`
const validOutput2 = "TCP OK - 0.008 second response time on port 80|time=0.008457s;;;0.000000;10.000000"
const validOutput3 = "TCP OK - 0.008 second response time on port 80|time=0.008457"
const invalidOutput3 = "PING OK - Packet loss = 0%, RTA = 0.30 ms"
const invalidOutput4 = "PING OK - Packet loss = 0%, RTA = 0.30 ms| =3;;;; dgasdg =;;;; sff=;;;;"

func TestParseValidOutput(t *testing.T) {
parser := NagiosParser{
MetricName: "nagios_test",
}

// Output1
metrics, err := parser.Parse([]byte(validOutput1))
require.NoError(t, err)
assert.Len(t, metrics, 2)
// rta
assert.Equal(t, "rta", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.298),
"warning": float64(4000),
"critical": float64(6000),
"min": float64(0),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"unit": "ms"}, metrics[0].Tags())
// pl
assert.Equal(t, "pl", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0),
"warning": float64(80),
"critical": float64(90),
"min": float64(0),
"max": float64(100),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{"unit": "%"}, metrics[1].Tags())

// Output2
metrics, err = parser.Parse([]byte(validOutput2))
require.NoError(t, err)
assert.Len(t, metrics, 1)
// time
assert.Equal(t, "time", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.008457),
"min": float64(0),
"max": float64(10),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"unit": "s"}, metrics[0].Tags())

// Output3
metrics, err = parser.Parse([]byte(validOutput3))
require.NoError(t, err)
assert.Len(t, metrics, 1)
// time
assert.Equal(t, "time", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.008457),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())

}

func TestParseInvalidOutput(t *testing.T) {
parser := NagiosParser{
MetricName: "nagios_test",
}

// invalidOutput3
metrics, err := parser.Parse([]byte(invalidOutput3))
require.NoError(t, err)
assert.Len(t, metrics, 0)

// invalidOutput4
metrics, err = parser.Parse([]byte(invalidOutput4))
require.NoError(t, err)
assert.Len(t, metrics, 0)

}
9 changes: 8 additions & 1 deletion plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
)

// ParserInput is an interface for input plugins that are able to parse
Expand Down Expand Up @@ -38,7 +39,7 @@ type Parser interface {
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
// Dataformat can be one of: json, influx, graphite
// Dataformat can be one of: json, influx, graphite, nagios
DataFormat string

// Separator only applied to Graphite data.
Expand All @@ -65,6 +66,8 @@ func NewParser(config *Config) (Parser, error) {
config.TagKeys, config.DefaultTags)
case "influx":
parser, err = NewInfluxParser()
case "nagios":
parser, err = NewNagiosParser()
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
Expand All @@ -87,6 +90,10 @@ func NewJSONParser(
return parser, nil
}

func NewNagiosParser() (Parser, error) {
return &nagios.NagiosParser{}, nil
}

func NewInfluxParser() (Parser, error) {
return &influx.InfluxParser{}, nil
}
Expand Down

0 comments on commit afd2b03

Please sign in to comment.