Skip to content

Commit

Permalink
Internally name all patterns for log parsing flexibility
Browse files Browse the repository at this point in the history
closes #1436

This also fixes the bad behavior of waiting until runtime to return log
parsing pattern compile errors when a pattern was simply unfound.

closes #1418

Also protect against user error when the telegraf user does not have
permission to open the provided file. We will now error and exit in this
case, rather than silently waiting to get permission to open it.
  • Loading branch information
sparrc committed Jul 18, 2016
1 parent 281a4d5 commit 6a6358d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ should now look like:
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.

## v1.0 beta 2 [2016-06-21]

Expand Down
12 changes: 11 additions & 1 deletion plugins/inputs/logparser/grok/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,23 @@ func (p *Parser) Compile() error {
return err
}

p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
for i, pattern := range p.Patterns {
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.Patterns[i] = "%{" + name + "}"
}

// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}

// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename)
if err != nil {
Expand Down
39 changes: 37 additions & 2 deletions plugins/inputs/logparser/grok/grok_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {

func TestCompileStringAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
Patterns: []string{"%{TEST_LOG_A}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
Expand All @@ -230,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) {
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
}

func TestCompileErrorsOnInvalidPattern(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
assert.Error(t, p.Compile())

metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`)
require.Nil(t, metricA)
}

func TestParsePatternsWithoutCustom(t *testing.T) {
p := &Parser{
Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"},
}
assert.NoError(t, p.Compile())

metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time())
}

func TestParseEpochNano(t *testing.T) {
p := &Parser{
Patterns: []string{"%{MYAPP}"},
Expand Down Expand Up @@ -413,7 +448,7 @@ func TestParseErrors(t *testing.T) {
TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{}
`,
}
assert.NoError(t, p.Compile())
assert.Error(t, p.Compile())
_, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`)
assert.Error(t, err)

Expand Down
28 changes: 15 additions & 13 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/hpcloud/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"

Expand Down Expand Up @@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}

// compile log parser patterns:
errChan := errchan.New(len(l.parsers))
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
return err
errChan.C <- err
}
}
if err := errChan.Error(); err != nil {
return err
}

var seek tail.SeekInfo
if !l.FromBeginning {
Expand All @@ -125,35 +130,32 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()

var errS string
errChan = errchan.New(len(l.Files))
// Create a "tailer" for each file
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
continue
}
for file, _ := range g.Match() {
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
ReOpen: true,
Follow: true,
Location: &seek,
MustExist: true,
})
if err != nil {
errS += err.Error() + " "
continue
}
errChan.C <- err

// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
}
}

if errS != "" {
return fmt.Errorf(errS)
}
return nil
return errChan.Error()
}

// receiver is launched as a goroutine to continuously watch a tailed logfile
Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
for file, _ := range g.Match() {
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
ReOpen: true,
Follow: true,
Location: &seek,
MustExist: true,
})
if err != nil {
errS += err.Error() + " "
Expand Down

0 comments on commit 6a6358d

Please sign in to comment.