Skip to content

Commit

Permalink
Make Logparser Plugin Check For New Files
Browse files Browse the repository at this point in the history
Check in the Gather metric to see if any new files matching the glob
have appeared. If so, start tailing them from the beginning.
  • Loading branch information
Nick White committed Jan 30, 2017
1 parent 4f6087a commit 7db7591
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
36 changes: 29 additions & 7 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogParserPlugin struct {
Files []string
FromBeginning bool

tailers []*tail.Tail
tailers map[string]*tail.Tail
lines chan string
done chan struct{}
wg sync.WaitGroup
Expand All @@ -46,7 +46,9 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## Read file from beginning.
## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
from_beginning = false
## Parse logstash-style "grok" patterns:
Expand Down Expand Up @@ -77,7 +79,11 @@ func (l *LogParserPlugin) Description() string {
}

func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
return nil
l.Lock()
defer l.Unlock()

// always start from the beginning of files that appear while we're running
return l.tailNewfiles(true)
}

func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
Expand All @@ -87,6 +93,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.acc = acc
l.lines = make(chan string, 1000)
l.done = make(chan struct{})
l.tailers = make(map[string]*tail.Tail)

// Looks for fields which implement LogParser interface
l.parsers = []LogParser{}
Expand Down Expand Up @@ -121,14 +128,22 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
return err
}

l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !l.FromBeginning {
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

l.wg.Add(1)
go l.parser()
errChan := errchan.New(len(l.Files))

// Create a "tailer" for each file
for _, filepath := range l.Files {
Expand All @@ -139,7 +154,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
files := g.Match()
errChan = errchan.New(len(files))

for file, _ := range files {
if _, ok := l.tailers[file]; ok {
// we're already tailing this file
continue
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand All @@ -152,7 +173,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
l.tailers[file] = tailer
}
}

Expand All @@ -166,6 +187,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {

var line *tail.Line
for line = range tailer.Lines {

if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, line.Err)
Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/logparser/logparser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logparser

import (
"io/ioutil"
"os"
"runtime"
"strings"
"testing"
Expand Down Expand Up @@ -80,6 +82,47 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{})
}

func TestGrokParseLogFilesAppearLater(t *testing.T) {
emptydir, err := ioutil.TempDir("", "TestGrokParseLogFilesAppearLater")
defer os.RemoveAll(emptydir)
assert.NoError(t, err)

thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}

logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{emptydir + "/*.log"},
GrokParser: p,
}

acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc))

time.Sleep(time.Millisecond * 500)
assert.Equal(t, acc.NFields(), 0)

os.Symlink(
thisdir+"grok/testdata/test_a.log",
emptydir+"/test_a.log")
assert.NoError(t, logparser.Gather(&acc))
time.Sleep(time.Millisecond * 500)

logparser.Stop()

acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
}

// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) {
Expand Down

0 comments on commit 7db7591

Please sign in to comment.