Skip to content

Commit

Permalink
Make Logparser Plugin Check For New Files (influxdata#2141)
Browse files Browse the repository at this point in the history
* Make Logparser Plugin Check For New Files

Check in the Gather metric to see if any new files matching the glob
have appeared. If so, start tailing them from the beginning.

* changelog update for influxdata#2141
  • Loading branch information
njwhite authored and Eldad Zack committed Mar 27, 2017
1 parent 3b6ffb3 commit 3a9b62a
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 7 deletions.
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
## v1.2 [unreleased]
## v1.3 [unreleased]

### Release Notes

- The [Riemann output plugin](./plugins/outputs/riemann) has been rewritten
and the previous riemann plugin is _incompatible_ with the new one. The reasons
for this are outlined in issue [#1878](https://github.com/influxdata/telegraf/issues/1878).
The previous riemann output will still be available using
`outputs.riemann_legacy` if needed, but that will eventually be deprecated.
It is highly recommended that all users migrate to the new riemann output plugin.

### Features

- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
- [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin.
- [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin.
- [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations.
- [#1900](https://github.com/influxdata/telegraf/pull/1900): Riemann plugin rewrite.
- [#1453](https://github.com/influxdata/telegraf/pull/1453): diskio: add support for name templates and udev tags.
- [#2277](https://github.com/influxdata/telegraf/pull/2277): add integer metrics for Consul check health state.

### Bugfixes

- [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int.
- [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection.

## v1.2.1 [2017-02-01]

### Bugfixes

- [#2317](https://github.com/influxdata/telegraf/issues/2317): Fix segfault on nil metrics with influxdb output.
- [#2324](https://github.com/influxdata/telegraf/issues/2324): Fix negative number handling.

### Features

- [#2348](https://github.com/influxdata/telegraf/pull/2348): Go version 1.7.4 -> 1.7.5

## v1.2 [2017-01-00]

### Release Notes

Expand Down
36 changes: 29 additions & 7 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogParserPlugin struct {
Files []string
FromBeginning bool

tailers []*tail.Tail
tailers map[string]*tail.Tail
lines chan string
done chan struct{}
wg sync.WaitGroup
Expand All @@ -46,7 +46,9 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## Read file from beginning.
## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
from_beginning = false
## Parse logstash-style "grok" patterns:
Expand Down Expand Up @@ -77,7 +79,11 @@ func (l *LogParserPlugin) Description() string {
}

func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
return nil
l.Lock()
defer l.Unlock()

// always start from the beginning of files that appear while we're running
return l.tailNewfiles(true)
}

func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
Expand All @@ -87,6 +93,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.acc = acc
l.lines = make(chan string, 1000)
l.done = make(chan struct{})
l.tailers = make(map[string]*tail.Tail)

// Looks for fields which implement LogParser interface
l.parsers = []LogParser{}
Expand Down Expand Up @@ -121,14 +128,22 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
return err
}

l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !l.FromBeginning {
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

l.wg.Add(1)
go l.parser()
errChan := errchan.New(len(l.Files))

// Create a "tailer" for each file
for _, filepath := range l.Files {
Expand All @@ -139,7 +154,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
files := g.Match()
errChan = errchan.New(len(files))

for file, _ := range files {
if _, ok := l.tailers[file]; ok {
// we're already tailing this file
continue
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand All @@ -152,7 +173,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
l.tailers[file] = tailer
}
}

Expand All @@ -166,6 +187,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {

var line *tail.Line
for line = range tailer.Lines {

if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, line.Err)
Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/logparser/logparser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logparser

import (
"io/ioutil"
"os"
"runtime"
"strings"
"testing"
Expand Down Expand Up @@ -80,6 +82,47 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{})
}

func TestGrokParseLogFilesAppearLater(t *testing.T) {
emptydir, err := ioutil.TempDir("", "TestGrokParseLogFilesAppearLater")
defer os.RemoveAll(emptydir)
assert.NoError(t, err)

thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}

logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{emptydir + "/*.log"},
GrokParser: p,
}

acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc))

time.Sleep(time.Millisecond * 500)
assert.Equal(t, acc.NFields(), 0)

os.Symlink(
thisdir+"grok/testdata/test_a.log",
emptydir+"/test_a.log")
assert.NoError(t, logparser.Gather(&acc))
time.Sleep(time.Millisecond * 500)

logparser.Stop()

acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
}

// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) {
Expand Down

0 comments on commit 3a9b62a

Please sign in to comment.