From 48b0798713bd34356705ce2a656e9955b8375a6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20St=C3=A4ber?= Date: Sat, 19 Jan 2019 22:53:54 +0100 Subject: [PATCH] #5 prepare linux file tailer for multiple log file support --- go.mod | 2 + grok_exporter.go | 7 +- tailer/fileTailer.go | 17 +- ...=> fileTailerLegacyImplWrapper_windows.go} | 6 +- ..._darwin.go => fileTailerNewImplWrapper.go} | 17 +- tailer/fileTailer_linux.go | 272 ----------- tailer/fileTailer_test.go | 107 +++-- tailer/fileTailer_windows.go | 5 +- tailer/fswatcher/errors.go | 1 + tailer/fswatcher/fswatcher_darwin.go | 237 +++++----- tailer/fswatcher/fswatcher_linux.go | 443 ++++++++++++++++++ tailer/fswatcher/inotifyloop_linux.go | 155 ++++++ tailer/fswatcher/keventloop_darwin.go | 34 +- tailer/glob/glob.go | 85 ++-- tailer/glob/validator.go | 79 ++++ .../glob/{glob_test.go => validator_test.go} | 0 tailer/pollingFileTailer.go | 3 +- tailer/watcher.go | 3 +- 18 files changed, 959 insertions(+), 514 deletions(-) rename tailer/{fileTailerLegacyImplWrapper.go => fileTailerLegacyImplWrapper_windows.go} (91%) rename tailer/{fileTailerNewImplWrapper_darwin.go => fileTailerNewImplWrapper.go} (78%) delete mode 100644 tailer/fileTailer_linux.go create mode 100644 tailer/fswatcher/fswatcher_linux.go create mode 100644 tailer/fswatcher/inotifyloop_linux.go create mode 100644 tailer/glob/validator.go rename tailer/glob/{glob_test.go => validator_test.go} (100%) diff --git a/go.mod b/go.mod index 858ade5e..ddec1a1a 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/fstab/grok_exporter require ( github.com/prometheus/client_golang v0.9.2 + github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f github.com/sirupsen/logrus v1.3.0 + golang.org/x/exp v0.0.0-20190121172915-509febef88a4 gopkg.in/yaml.v2 v2.2.2 ) diff --git a/grok_exporter.go b/grok_exporter.go index cec14e18..a7368489 100644 --- a/grok_exporter.go +++ b/grok_exporter.go @@ -23,6 +23,7 @@ import ( "github.com/fstab/grok_exporter/oniguruma" "github.com/fstab/grok_exporter/tailer" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" "net/http" "os" "time" @@ -269,13 +270,15 @@ func startServer(cfg v2.ServerConfig, handler http.Handler) chan error { } func startTailer(cfg *v2.Config) (tailer.Tailer, error) { + logger := logrus.New() + logger.Level = logrus.WarnLevel var tail tailer.Tailer switch { case cfg.Input.Type == "file": if cfg.Input.PollInterval == 0 { - tail = tailer.RunFseventFileTailer(cfg.Input.Path, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, nil) + tail = tailer.RunFseventFileTailer(cfg.Input.Path, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger) } else { - tail = tailer.RunPollingFileTailer(cfg.Input.Path, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, nil) + tail = tailer.RunPollingFileTailer(cfg.Input.Path, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger) } case cfg.Input.Type == "stdin": tail = tailer.RunStdinTailer() diff --git a/tailer/fileTailer.go b/tailer/fileTailer.go index 23771da1..479f72be 100644 --- a/tailer/fileTailer.go +++ b/tailer/fileTailer.go @@ -16,6 +16,7 @@ package tailer import ( "fmt" + "github.com/sirupsen/logrus" "io" "os" "path/filepath" @@ -44,16 +45,18 @@ func (f *fileTailer) Errors() chan Error { return f.errors } -func RunPollingFileTailer(path string, readall bool, failOnMissingFile bool, pollIntervall time.Duration, logger simpleLogger) Tailer { +func RunPollingFileTailer(path string, readall bool, failOnMissingFile bool, pollIntervall time.Duration, logger logrus.FieldLogger) Tailer { makeWatcher := func(abspath string, _ *File) (Watcher, error) { return NewPollingWatcher(abspath, pollIntervall) } return runFileTailer(path, readall, failOnMissingFile, logger, makeWatcher) } -func runFileTailer(path string, readall bool, failOnMissingFile bool, logger simpleLogger, makeWatcher func(string, *File) (Watcher, error)) Tailer { +func runFileTailer(path string, readall bool, failOnMissingFile bool, logger logrus.FieldLogger, makeWatcher func(string, *File) (Watcher, error)) Tailer { if logger == nil { - logger = &nilLogger{} + log := logrus.New() + log.Level = logrus.WarnLevel + logger = log } lines := make(chan string) @@ -199,11 +202,3 @@ func writeError(errors chan Error, done chan struct{}, cause error, format strin case <-done: } } - -type simpleLogger interface { - Debug(format string, a ...interface{}) -} - -type nilLogger struct{} - -func (_ *nilLogger) Debug(_ string, _ ...interface{}) {} diff --git a/tailer/fileTailerLegacyImplWrapper.go b/tailer/fileTailerLegacyImplWrapper_windows.go similarity index 91% rename from tailer/fileTailerLegacyImplWrapper.go rename to tailer/fileTailerLegacyImplWrapper_windows.go index bcb36473..5d9a1c62 100644 --- a/tailer/fileTailerLegacyImplWrapper.go +++ b/tailer/fileTailerLegacyImplWrapper_windows.go @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !darwin - package tailer +import "github.com/sirupsen/logrus" + // old implementation, darwin is already switched to the new implementation, the other OSes will follow -func RunFseventFileTailer(path string, readall bool, failOnMissingFile bool, logger simpleLogger) Tailer { +func RunFseventFileTailer(path string, readall bool, failOnMissingFile bool, logger logrus.FieldLogger) Tailer { return runFileTailer(path, readall, failOnMissingFile, logger, NewFseventWatcher) } diff --git a/tailer/fileTailerNewImplWrapper_darwin.go b/tailer/fileTailerNewImplWrapper.go similarity index 78% rename from tailer/fileTailerNewImplWrapper_darwin.go rename to tailer/fileTailerNewImplWrapper.go index dec4c9ce..5bd61bf8 100644 --- a/tailer/fileTailerNewImplWrapper_darwin.go +++ b/tailer/fileTailerNewImplWrapper.go @@ -12,12 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !windows + package tailer import ( "github.com/fstab/grok_exporter/tailer/fswatcher" + "github.com/fstab/grok_exporter/tailer/glob" + "github.com/sirupsen/logrus" ) +// TODO: This wrapper will be removed when all OSs are migrated to the new fswatcher, supporting multiple log files. + type tailerWrapper struct { lines chan string errors chan Error @@ -38,14 +44,21 @@ func (t *tailerWrapper) Errors() chan Error { // Switch to the new file tailer implementation which supports watching multiple files. // Once we switched for all supported operating systems, we can remove the old implementation and the wrapper. -func RunFseventFileTailer(path string, readall bool, failOnMissingFile bool, _ interface{}) Tailer { +func RunFseventFileTailer(path string, readall bool, failOnMissingFile bool, logger logrus.FieldLogger) Tailer { result := &tailerWrapper{ lines: make(chan string), errors: make(chan Error), done: make(chan struct{}), } - newTailer, err := fswatcher.Run([]string{path}, readall, failOnMissingFile) + pathAsGlob, err := glob.Parse(path) + if err != nil { + go func() { + result.errors <- newError("failed to initialize file system watcher", err) + }() + return result + } + newTailer, err := fswatcher.Run([]glob.Glob{pathAsGlob}, readall, failOnMissingFile, logger) if err != nil { go func() { result.errors <- newError("failed to initialize file system watcher", err) diff --git a/tailer/fileTailer_linux.go b/tailer/fileTailer_linux.go deleted file mode 100644 index 3641a923..00000000 --- a/tailer/fileTailer_linux.go +++ /dev/null @@ -1,272 +0,0 @@ -// Copyright 2016-2018 The grok_exporter Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailer - -import ( - "fmt" - "io" - "path/filepath" - "strings" - "syscall" - "unsafe" -) - -type watcher struct { - fd int // file descriptor for reading inotify events - wd int // watch descriptor for the log directory -} - -// File system event watcher, using Linux's inotify. -func NewFseventWatcher(abspath string, _ *File) (Watcher, error) { - fd, err := syscall.InotifyInit1(syscall.IN_CLOEXEC) - if err != nil { - return nil, err - } - wd, err := syscall.InotifyAddWatch(fd, filepath.Dir(abspath), syscall.IN_MODIFY|syscall.IN_MOVED_FROM|syscall.IN_MOVED_TO|syscall.IN_DELETE|syscall.IN_CREATE) - if err != nil { - return nil, err - } - return &watcher{ - fd: fd, - wd: wd, - }, nil -} - -func (w *watcher) Close() error { - var err error - if w.fd != 0 { - err = syscall.Close(w.fd) - } - return err -} - -type eventLoop struct { - w *watcher - events chan Events - errors chan error - done chan struct{} -} - -type eventWithName struct { - syscall.InotifyEvent - Name string -} - -type eventList []eventWithName - -func (w *watcher) StartEventLoop() EventLoop { - events := make(chan Events) - errors := make(chan error) - done := make(chan struct{}) - - go func() { - defer func() { - close(events) - close(errors) - }() - - buf := make([]byte, (syscall.SizeofInotifyEvent+syscall.NAME_MAX+1)*10) - - for { - n, err := syscall.Read(w.fd, buf) - if err != nil { - select { - case errors <- err: - case <-done: - } - return - } else { - var eventsWithName eventList - for offset := 0; offset < n; { - if n-offset < syscall.SizeofInotifyEvent { - select { - case errors <- fmt.Errorf("inotify: read %v bytes, but sizeof(struct inotify_event) is %v bytes.", n, syscall.SizeofInotifyEvent): - case <-done: - } - return - } - event := eventWithName{*(*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])), ""} - if event.Len > 0 { - bytes := (*[syscall.NAME_MAX]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent])) - event.Name = strings.TrimRight(string(bytes[0:event.Len]), "\000") - } - if event.Mask&syscall.IN_IGNORED == syscall.IN_IGNORED { - // eventLoop.Close() was called or log dir was deleted - return - } - eventsWithName = append(eventsWithName, event) - offset += syscall.SizeofInotifyEvent + int(event.Len) - } - if len(eventsWithName) > 0 { - select { - case events <- eventsWithName: - case <-done: - return - } - } - } - } - }() - return &eventLoop{ - w: w, - events: events, - errors: errors, - done: done, - } -} - -func (l *eventLoop) Close() error { - _, err := syscall.InotifyRmWatch(l.w.fd, uint32(l.w.wd)) // generates an IN_IGNORED event, which interrupts the syscall.Read() - close(l.done) - return err -} - -func (l *eventLoop) Errors() chan error { - return l.errors -} - -func (l *eventLoop) Events() chan Events { - return l.events -} - -func (events eventList) Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { - file = fileBefore - lines = []string{} - var ( - filename = filepath.Base(abspath) - truncated bool - line string - eof bool - ) - for _, event := range events { - logger.Debug("File system watcher received %v.\n", event2string(event)) - } - - // WRITE or TRUNCATE - for _, event := range events { - if file != nil && event.Name == filename && event.Mask&syscall.IN_MODIFY == syscall.IN_MODIFY { - truncated, err = file.CheckTruncated() - if err != nil { - return - } - if truncated { - _, err = file.Seek(0, io.SeekStart) - if err != nil { - return - } - reader.Clear() - } - for { - line, eof, err = reader.ReadLine(file) - if err != nil { - return - } - if eof { - break - } - lines = append(lines, line) - } - } - } - - // MOVED_FROM or DELETE - for _, event := range events { - if file != nil && event.Name == filename && (event.Mask&syscall.IN_MOVED_FROM == syscall.IN_MOVED_FROM || event.Mask&syscall.IN_DELETE == syscall.IN_DELETE) { - file.Close() - file = nil - reader.Clear() - } - } - - // CREATE or MOVED_TO - for _, event := range events { - if file == nil && event.Name == filename && (event.Mask&syscall.IN_CREATE == syscall.IN_CREATE || event.Mask&syscall.IN_MOVED_TO == syscall.IN_MOVED_TO) { - file, err = open(abspath) - if err != nil { - return - } - reader.Clear() - for { - line, eof, err = reader.ReadLine(file) - if err != nil { - return - } - if eof { - break - } - lines = append(lines, line) - } - } - } - return -} - -func event2string(event eventWithName) string { - result := "event" - if len(event.Name) > 0 { - result = fmt.Sprintf("%v with path %v and mask", result, event.Name) - } else { - result = fmt.Sprintf("%v with unknown path and mask", result) - } - if event.Mask&syscall.IN_ACCESS == syscall.IN_ACCESS { - result = fmt.Sprintf("%v IN_ACCESS", result) - } - if event.Mask&syscall.IN_ATTRIB == syscall.IN_ATTRIB { - result = fmt.Sprintf("%v IN_ATTRIB", result) - } - if event.Mask&syscall.IN_CLOSE_WRITE == syscall.IN_CLOSE_WRITE { - result = fmt.Sprintf("%v IN_CLOSE_WRITE", result) - } - if event.Mask&syscall.IN_CLOSE_NOWRITE == syscall.IN_CLOSE_NOWRITE { - result = fmt.Sprintf("%v IN_CLOSE_NOWRITE", result) - } - if event.Mask&syscall.IN_CREATE == syscall.IN_CREATE { - result = fmt.Sprintf("%v IN_CREATE", result) - } - if event.Mask&syscall.IN_DELETE == syscall.IN_DELETE { - result = fmt.Sprintf("%v IN_DELETE", result) - } - if event.Mask&syscall.IN_DELETE_SELF == syscall.IN_DELETE_SELF { - result = fmt.Sprintf("%v IN_DELETE_SELF", result) - } - if event.Mask&syscall.IN_MODIFY == syscall.IN_MODIFY { - result = fmt.Sprintf("%v IN_MODIFY", result) - } - if event.Mask&syscall.IN_MOVE_SELF == syscall.IN_MOVE_SELF { - result = fmt.Sprintf("%v IN_MOVE_SELF", result) - } - if event.Mask&syscall.IN_MOVED_FROM == syscall.IN_MOVED_FROM { - result = fmt.Sprintf("%v IN_MOVED_FROM", result) - } - if event.Mask&syscall.IN_MOVED_TO == syscall.IN_MOVED_TO { - result = fmt.Sprintf("%v IN_MOVED_TO", result) - } - if event.Mask&syscall.IN_OPEN == syscall.IN_OPEN { - result = fmt.Sprintf("%v IN_OPEN", result) - } - if event.Mask&syscall.IN_IGNORED == syscall.IN_IGNORED { - result = fmt.Sprintf("%v IN_IGNORED", result) - } - if event.Mask&syscall.IN_ISDIR == syscall.IN_ISDIR { - result = fmt.Sprintf("%v IN_ISDIR", result) - } - if event.Mask&syscall.IN_Q_OVERFLOW == syscall.IN_Q_OVERFLOW { - result = fmt.Sprintf("%v IN_Q_OVERFLOW", result) - } - if event.Mask&syscall.IN_UNMOUNT == syscall.IN_UNMOUNT { - result = fmt.Sprintf("%v IN_UNMOUNT", result) - } - return result -} diff --git a/tailer/fileTailer_test.go b/tailer/fileTailer_test.go index de100298..ff49f591 100644 --- a/tailer/fileTailer_test.go +++ b/tailer/fileTailer_test.go @@ -16,6 +16,7 @@ package tailer import ( "fmt" + "github.com/sirupsen/logrus" "io/ioutil" "os" "os/user" @@ -108,13 +109,15 @@ func (opt watcherType) String() string { } func TestFileTailerCloseLogfileAfterEachLine(t *testing.T) { + logger := logrus.New() + logger.Level = logrus.DebugLevel testRunNumber := 0 // Helps to figure out which debug message belongs to which test run. for _, watcherOpt := range []watcherType{fsevent, polling} { for _, logrotateOpt := range []logrotateOption{_create, _nocreate, _create_from_temp} { for _, mvOpt := range []logrotateMoveOption{mv, cp, rm} { testRunNumber++ t.Run(fmt.Sprintf("[%v]", testRunNumber), func(t *testing.T) { - testLogrotate(t, NewTestRunLogger(testRunNumber), watcherOpt, logrotateOpt, mvOpt, closeFileAfterEachLine) + testLogrotate(t, logger.WithField("test-nr", testRunNumber), watcherOpt, logrotateOpt, mvOpt, closeFileAfterEachLine) }) } } @@ -122,19 +125,21 @@ func TestFileTailerCloseLogfileAfterEachLine(t *testing.T) { // For logrotate options 'copy' and 'copytruncate', only the mvOpt 'cp' makes sense. testRunNumber++ t.Run(fmt.Sprintf("[%v]", testRunNumber), func(t *testing.T) { - testLogrotate(t, NewTestRunLogger(testRunNumber), watcherOpt, logrotateOpt, cp, closeFileAfterEachLine) + testLogrotate(t, logger.WithField("test-nr", testRunNumber), watcherOpt, logrotateOpt, cp, closeFileAfterEachLine) }) } } } func TestFileTailerKeepLogfileOpen(t *testing.T) { + logger := logrus.New() + logger.Level = logrus.DebugLevel testRunNumber := 100 // When the logger keeps the file open, only the logrotate options 'copy' and 'copytruncate' make sense. for _, watcherOpt := range []watcherType{fsevent, polling} { - testLogrotate(t, NewTestRunLogger(testRunNumber), watcherOpt, _copy, cp, keepOpen) // 100, 102 + testLogrotate(t, logger.WithField("test-nr", testRunNumber), watcherOpt, _copy, cp, keepOpen) // 100, 102 testRunNumber++ - testLogrotate(t, NewTestRunLogger(testRunNumber), watcherOpt, _copytruncate, cp, keepOpen) // 101, 103 + testLogrotate(t, logger.WithField("test-nr", testRunNumber), watcherOpt, _copytruncate, cp, keepOpen) // 101, 103 testRunNumber++ } } @@ -146,7 +151,8 @@ func TestFileTailerKeepLogfileOpen(t *testing.T) { // * directories with the xattr com.apple.FinderInfo (like everything in /tmp) are hidden // In order to test this, we must create a log file somewhere outside of /tmp, so we use $HOME. func TestVisibleInOSXFinder(t *testing.T) { - log := NewTestRunLogger(200) + log := logrus.New().WithField("test-nr", 200) + log.Level = logrus.DebugLevel usr, err := user.Current() if err != nil { t.Fatalf("failed to get current user: %v", err) @@ -161,7 +167,16 @@ func TestVisibleInOSXFinder(t *testing.T) { defer logFileWriter.close(t) logFileWriter.writeLine(t, log, "test line 1") tail := RunFseventFileTailer(logfile, false, true, log) - defer tail.Close() + defer func() { + tail.Close() + // wait until closed + select { + case <-tail.Lines(): + case <-time.After(5 * time.Second): + fmt.Fprintf(os.Stderr, "failed to shut down the tailer. timeout after 5 seconds") + t.Errorf("failed to shut down the tailer. timeout after 5 seconds") + } + }() go func() { for err := range tail.Errors() { t.Fatalf("Tailer failed: %v", err.Error()) @@ -183,12 +198,23 @@ func TestVisibleInOSXFinder(t *testing.T) { // test the "fail_on_missing_logfile: false" configuration func TestFileMissingOnStartup(t *testing.T) { const logfileName = "grok_exporter_test_logfile.log" - log := NewTestRunLogger(300) + log := logrus.New() + log.Level = logrus.DebugLevel + logger := log.WithField("test-nr", 300) tmpDir := mkTmpDirOrFail(t) defer cleanUp(t, tmpDir) var logfile = fmt.Sprintf("%s%c%s", tmpDir, os.PathSeparator, logfileName) tail := RunFseventFileTailer(logfile, true, false, log) - defer tail.Close() + defer func() { + tail.Close() + // wait until closed + select { + case <-tail.Lines(): + case <-time.After(5 * time.Second): + fmt.Fprintf(os.Stderr, "failed to shut down the tailer. timeout after 5 seconds") + t.Errorf("failed to shut down the tailer. timeout after 5 seconds") + } + }() // We don't expect errors. However, start a go-routine listening on // the tailer's errorChannel in case something goes wrong. @@ -206,10 +232,10 @@ func TestFileMissingOnStartup(t *testing.T) { } logFileWriter := newLogFileWriter(t, logfile, closeFileAfterEachLine) - logFileWriter.writeLine(t, log, "test line 1") - logFileWriter.writeLine(t, log, "test line 2") - expect(t, log, tail.Lines(), "test line 1", 1*time.Second) - expect(t, log, tail.Lines(), "test line 2", 1*time.Second) + logFileWriter.writeLine(t, logger, "test line 1") + logFileWriter.writeLine(t, logger, "test line 2") + expect(t, logger, tail.Lines(), "test line 1", 1*time.Second) + expect(t, logger, tail.Lines(), "test line 2", 1*time.Second) } //func TestShutdownDuringSyscall(t *testing.T) { @@ -227,14 +253,14 @@ func TestFileMissingOnStartup(t *testing.T) { // } //} -func testLogrotate(t *testing.T, log simpleLogger, watcherOpt watcherType, logrotateOpt logrotateOption, logrotateMoveOpt logrotateMoveOption, loggerOpt loggerOption) { +func testLogrotate(t *testing.T, log logrus.FieldLogger, watcherOpt watcherType, logrotateOpt logrotateOption, logrotateMoveOpt logrotateMoveOption, loggerOpt loggerOption) { tmpDir := mkTmpDirOrFail(t) defer cleanUp(t, tmpDir) logfile := mkTmpFileOrFail(t, tmpDir) logFileWriter := newLogFileWriter(t, logfile, loggerOpt) defer logFileWriter.close(t) - log.Debug("Running test using logfile %v with watcher option '%v', logrotate option '%v', move option '%v', and logger option '%v'.\n", path.Base(logfile), watcherOpt, logrotateOpt, logrotateMoveOpt, loggerOpt) + log.Debugf("Running test using logfile %v with watcher option '%v', logrotate option '%v', move option '%v', and logger option '%v'.\n", path.Base(logfile), watcherOpt, logrotateOpt, logrotateMoveOpt, loggerOpt) logFileWriter.writeLine(t, log, "test line 1") logFileWriter.writeLine(t, log, "test line 2") @@ -247,13 +273,28 @@ func testLogrotate(t *testing.T, log simpleLogger, watcherOpt watcherType, logro tail = RunPollingFileTailer(logfile, true, true, 10*time.Millisecond, log) } tail = BufferedTailer(tail) - defer tail.Close() + defer func() { + tail.Close() + // wait until closed + select { + case <-tail.Lines(): + case <-time.After(5 * time.Second): + fmt.Fprintf(os.Stderr, "failed to shut down the tailer. timeout after 5 seconds") + t.Errorf("failed to shut down the tailer. timeout after 5 seconds") + } + }() // We don't expect errors. However, start a go-routine listening on // the tailer's errorChannel in case something goes wrong. go func() { for err := range tail.Errors() { - t.Errorf("Tailer failed: %v", err.Error()) // Cannot call t.Fatalf() in other goroutine. + if err == nil { + return // tailer closed + } else { + fmt.Fprintf(os.Stderr, "TAILER FAILED: %v\n", err) + t.Errorf("Tailer failed: %v", err.Error()) // Cannot call t.Fatalf() in other goroutine. + return + } } }() @@ -288,7 +329,7 @@ func newLogFileWriter(t *testing.T, logfile string, opt loggerOption) logFileWri } type logFileWriter interface { - writeLine(t *testing.T, log simpleLogger, line string) + writeLine(t *testing.T, log logrus.FieldLogger, line string) close(t *testing.T) } @@ -302,7 +343,7 @@ func newCloseFileAfterEachLineLogFileWriter(t *testing.T, logfile string) logFil } } -func (l *closeFileAfterEachLineLogFileWriter) writeLine(t *testing.T, log simpleLogger, line string) { +func (l *closeFileAfterEachLineLogFileWriter) writeLine(t *testing.T, log logrus.FieldLogger, line string) { f, err := os.OpenFile(l.path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) if err != nil { t.Fatalf("%v: Failed to open file for writing: %v", l.path, err.Error()) @@ -323,7 +364,7 @@ func (l *closeFileAfterEachLineLogFileWriter) writeLine(t *testing.T, log simple if err != nil { t.Fatalf("%v: Failed to close file: %v", l.path, err.Error()) } - log.Debug("Wrote log line '%v' with closeFileAfterEachLineLogger.\n", line) + log.Debugf("Wrote log line '%v' with closeFileAfterEachLineLogger.\n", line) } func (l *closeFileAfterEachLineLogFileWriter) close(t *testing.T) { @@ -344,7 +385,7 @@ func newKeepOpenLogFileWriter(t *testing.T, logfile string) logFileWriter { } } -func (l *keepOpenLogFileWriter) writeLine(t *testing.T, log simpleLogger, line string) { +func (l *keepOpenLogFileWriter) writeLine(t *testing.T, log logrus.FieldLogger, line string) { _, err := l.file.WriteString(fmt.Sprintf("%v\n", line)) if err != nil { t.Fatalf("%v: Failed to write to file: %v", l.file.Name(), err.Error()) @@ -353,7 +394,7 @@ func (l *keepOpenLogFileWriter) writeLine(t *testing.T, log simpleLogger, line s if err != nil { t.Fatalf("%v: Failed to flush the file: %v", l.file.Name(), err.Error()) } - log.Debug("Wrote log line '%v' with keepOpenLogger.\n", line) + log.Debugf("Wrote log line '%v' with keepOpenLogger.\n", line) } func (l *keepOpenLogFileWriter) close(t *testing.T) { @@ -419,7 +460,7 @@ func ls(t *testing.T, path string) []os.FileInfo { return result } -func rotate(t *testing.T, log simpleLogger, logfile string, opt logrotateOption, mvOpt logrotateMoveOption) { +func rotate(t *testing.T, log logrus.FieldLogger, logfile string, opt logrotateOption, mvOpt logrotateMoveOption) { dir := filepath.Dir(logfile) filename := filepath.Base(logfile) filesBefore := ls(t, dir) @@ -449,7 +490,7 @@ func rotate(t *testing.T, log simpleLogger, logfile string, opt logrotateOption, default: t.Fatalf("Unknown logrotate option.") } - log.Debug("Simulated logrotate with option %v and mvOption %v\n", opt, mvOpt) + log.Debugf("Simulated logrotate with option %v and mvOption %v\n", opt, mvOpt) } func moveOrFail(t *testing.T, mvOpt logrotateMoveOption, logfile string) { @@ -549,7 +590,7 @@ func truncateOrFail(t *testing.T, logfile string) { } } -func expect(t *testing.T, log simpleLogger, c chan string, line string, timeout time.Duration) { +func expect(t *testing.T, log logrus.FieldLogger, c chan string, line string, timeout time.Duration) { timeoutChan := make(chan bool, 1) go func() { time.Sleep(timeout) @@ -560,28 +601,14 @@ func expect(t *testing.T, log simpleLogger, c chan string, line string, timeout if result != line { t.Fatalf("Expected '%v', but got '%v'.", line, result) } else { - log.Debug("Read expected line '%v'\n", line) + log.Debugf("Read expected line '%v'\n", line) } case <-timeoutChan: - log.Debug("Timeout after %v while waiting for line '%v'\n", timeout, line) + log.Debugf("Timeout after %v while waiting for line '%v'\n", timeout, line) t.Fatalf("Timeout after %v while waiting for line '%v'", timeout, line) } } -type testRunLogger struct { - testRunNumber int -} - -func NewTestRunLogger(testRunNumber int) *testRunLogger { - return &testRunLogger{ - testRunNumber: testRunNumber, - } -} - -func (l *testRunLogger) Debug(format string, a ...interface{}) { - fmt.Printf("%v [%v] %v", time.Now().Format("2006-01-02 15:04:05.0000"), l.testRunNumber, fmt.Sprintf(format, a...)) -} - // Commented out until we switched to the new implementation, because this test uses internal API of the old implementation. //func runTestShutdown(t *testing.T, mode string) { // diff --git a/tailer/fileTailer_windows.go b/tailer/fileTailer_windows.go index bcaf5343..2d05d0ef 100644 --- a/tailer/fileTailer_windows.go +++ b/tailer/fileTailer_windows.go @@ -15,6 +15,7 @@ package tailer import ( + "github.com/sirupsen/logrus" "golang.org/x/exp/winfsnotify" "io" "path/filepath" @@ -105,14 +106,14 @@ func (l *eventLoop) Events() chan Events { return l.events } -func (event *event) Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { +func (event *event) Process(fileBefore *File, reader *lineReader, abspath string, logger logrus.FieldLogger) (file *File, lines []string, err error) { file = fileBefore lines = []string{} var ( truncated, eof bool line string ) - logger.Debug("File system watcher received %v.\n", event.String()) + logger.Debugf("File system watcher received %v.\n", event.String()) // MOVED_FROM or DELETE if file != nil && norm(event.Name) == norm(abspath) && (event.Mask&winfsnotify.FS_MOVED_FROM == winfsnotify.FS_MOVED_FROM || event.Mask&winfsnotify.FS_DELETE == winfsnotify.FS_DELETE) { diff --git a/tailer/fswatcher/errors.go b/tailer/fswatcher/errors.go index 9d3e39d6..4267f8c8 100644 --- a/tailer/fswatcher/errors.go +++ b/tailer/fswatcher/errors.go @@ -20,6 +20,7 @@ type ErrorType int const ( NotSpecified = iota + DirectoryNotFound FileNotFound ) diff --git a/tailer/fswatcher/fswatcher_darwin.go b/tailer/fswatcher/fswatcher_darwin.go index 6a724c75..beda738b 100644 --- a/tailer/fswatcher/fswatcher_darwin.go +++ b/tailer/fswatcher/fswatcher_darwin.go @@ -15,17 +15,16 @@ package fswatcher import ( - "errors" "fmt" + "github.com/fstab/grok_exporter/tailer/glob" "github.com/sirupsen/logrus" "io" "os" "path/filepath" - "strings" "syscall" ) -// how will this eventually be configured in the config file: +// ideas how this might look like in the config file: // // * input section may specify multiple inputs and use globs // @@ -35,16 +34,8 @@ import ( // Heads up: filters use globs while matches use regular expressions. // Moreover, we should provide vars {{.filename}} and {{.filepath}} for labels. -var logger2 *logrus.Logger - -func init() { - logger2 = logrus.New() - logger2.Level = logrus.ErrorLevel - //logger2.Level = logrus.DebugLevel // TODO: Use debug in tests -} - type watcher struct { - globs []string + globs []glob.Glob watchedDirs []*os.File watchedFiles []*fileWithReader kq int @@ -66,30 +57,23 @@ func (w *watcher) Errors() chan Error { return w.errors } -func Run(globs []string, readall bool, failOnMissingFile bool) (FSWatcher, error) { +func (w *watcher) Close() { + // Closing the done channel will stop the consumer loop. + // Deferred functions within the consumer loop will close the producer loop. + close(w.done) +} + +func Run(globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FSWatcher, error) { var ( w *watcher err error - absglob string - absglobs = make([]string, 0, len(globs)) + Err Error ) - // make globs absolute paths, because they will be matched against absolute file names - // TODO (1): Write tests to make sure this works reliably - // TODO (2): This should not be darwin specific, but the same for all OSes. - for _, glob := range globs { - absglob, err = filepath.Abs(glob) - if err != nil { - return nil, fmt.Errorf("failed to get absolut path for pattern %q: %v", glob, err) - } - absglobs = append(absglobs, absglob) - } - - // Initializing directory watches happens in the main thread, so we fail immediately if the directories cannot be watched. - w, err = initDirs(absglobs) - if err != nil { - return nil, err + w, Err = initWatcher(globs) + if Err != nil { + return nil, Err } go func() { @@ -99,19 +83,48 @@ func Run(globs []string, readall bool, failOnMissingFile bool) (FSWatcher, error close(w.lines) close(w.errors) + warnf := func(format string, args ...interface{}) { + log.Warnf("error while shutting down the file system watcher: %v", fmt.Sprint(format, args)) + } + + // After calling keventProducerLoop.Close(), we need to close the kq descriptor + // in order to interrupt the kevent() system call. See keventProducerLoop.Close(). + err = syscall.Close(w.kq) + if err != nil { + warnf("closing the kq descriptor failed: %v", err) + } + for _, file := range w.watchedFiles { - file.file.Close() + err = file.file.Close() + if err != nil { + warnf("close(%q) failed: %v", file.file.Name(), err) + } } for _, dir := range w.watchedDirs { - dir.Close() + err = dir.Close() + if err != nil { + warnf("close(%q) failed: %v", dir.Name(), err) + } } }() + Err = w.watchDirs(log) + if Err != nil { + select { + case <-w.done: + case w.errors <- Err: + } + return + } + + keventProducerLoop := runKeventLoop(w.kq) + defer keventProducerLoop.Close() + // Initializing watches for the files within the directories happens in the goroutine, because with readall=true // this will immediately write lines to the lines channel, so this blocks until the caller starts reading from the lines channel. for _, dir := range w.watchedDirs { - dirLogger := logger2.WithField("directory", dir.Name()) + dirLogger := log.WithField("directory", dir.Name()) dirLogger.Debugf("initializing directory") syncFilesErr := w.syncFilesInDir(dir, readall, dirLogger) if syncFilesErr != nil { @@ -135,15 +148,12 @@ func Run(globs []string, readall bool, failOnMissingFile bool) (FSWatcher, error } } - keventProducerLoop := runKeventLoop(w.kq) - defer keventProducerLoop.Close() - for { // kevent consumer loop select { case <-w.done: return case event := <-keventProducerLoop.events: - processEventError := w.processEvent(event, logger2) + processEventError := w.processEvent(event, log) if processEventError != nil { select { case <-w.done: @@ -163,13 +173,7 @@ func Run(globs []string, readall bool, failOnMissingFile bool) (FSWatcher, error return w, nil } -func (w *watcher) Close() { - // Closing the done channel will stop the kevent consumer loop. - close(w.done) - // The producer loop, lines and errors channels, files and directories will be closed once the consumer loop is terminated (via deferred function calls). -} - -func initDirs(globs []string) (*watcher, error) { +func initWatcher(globs []glob.Glob) (*watcher, Error) { var ( w = &watcher{ globs: globs, @@ -177,35 +181,41 @@ func initDirs(globs []string) (*watcher, error) { errors: make(chan Error), done: make(chan struct{}), } + err error + ) + w.kq, err = syscall.Kqueue() + if err != nil { + return nil, NewError(NotSpecified, err, "kqueue() failed") + } + return w, nil +} + +func (w *watcher) watchDirs(log logrus.FieldLogger) Error { + var ( err error + Err Error dir *os.File dirPaths []string dirPath string zeroTimeout = syscall.NsecToTimespec(0) // timeout zero means non-blocking kevent() call ) - w.kq, err = syscall.Kqueue() - if err != nil { - return nil, fmt.Errorf("failed to initialize file system watcher: %v", err) - } - dirPaths, err = uniqueBaseDirs(globs) - if err != nil { - w.Close() - return nil, err + dirPaths, Err = uniqueDirs(w.globs) + if Err != nil { + return Err } for _, dirPath = range dirPaths { + log.Debugf("watching directory %v", dirPath) dir, err = os.Open(dirPath) if err != nil { - w.Close() - return nil, err + return NewErrorf(NotSpecified, err, "%v: open() failed", dirPath) } w.watchedDirs = append(w.watchedDirs, dir) _, err = syscall.Kevent(w.kq, []syscall.Kevent_t{makeEvent(dir)}, nil, &zeroTimeout) if err != nil { - w.Close() - return nil, err + return NewErrorf(NotSpecified, err, "%v: kevent() failed", dirPath) } } - return w, nil + return nil } // check if files have been added/removed and update kevent file watches accordingly @@ -218,7 +228,7 @@ func (w *watcher) syncFilesInDir(dir *os.File, readall bool, log logrus.FieldLog fileInfos []os.FileInfo fileInfo os.FileInfo err error - tailerErr Error + Err Error fileLogger logrus.FieldLogger zeroTimeout = syscall.NsecToTimespec(0) // timeout zero means non-blocking kevent() call ) @@ -228,9 +238,9 @@ func (w *watcher) syncFilesInDir(dir *os.File, readall bool, log logrus.FieldLog } watchedFilesAfter = make([]*fileWithReader, 0, len(w.watchedFiles)) for _, fileInfo = range fileInfos { - fullPath := filepath.Join(dir.Name(), fileInfo.Name()) - fileLogger = log.WithField("file", fullPath) - if !anyGlobMatches(w.globs, fullPath) { + filePath := filepath.Join(dir.Name(), fileInfo.Name()) + fileLogger = log.WithField("file", filePath) + if !anyGlobMatches(w.globs, filePath) { fileLogger.Debug("skipping file, because no glob matches") continue } @@ -238,28 +248,28 @@ func (w *watcher) syncFilesInDir(dir *os.File, readall bool, log logrus.FieldLog fileLogger.Debug("skipping, because it is a directory") continue } - existingFile, tailerErr = findSameFile(w.watchedFiles, fileInfo) - if tailerErr != nil { - return tailerErr + existingFile, Err = w.findSameFile(fileInfo) + if Err != nil { + return Err } if existingFile != nil { - if existingFile.file.Name() != fullPath { + if existingFile.file.Name() != filePath { fileLogger.WithField("fd", existingFile.file.Fd()).Infof("file was moved from %v", existingFile.file.Name()) - existingFile.file = os.NewFile(existingFile.file.Fd(), fullPath) + existingFile.file = os.NewFile(existingFile.file.Fd(), filePath) } else { fileLogger.Debug("skipping, because file is already watched") } watchedFilesAfter = append(watchedFilesAfter, existingFile) continue } - newFile, err = os.Open(fullPath) + newFile, err = os.Open(filePath) if err != nil { - return NewErrorf(NotSpecified, err, "%v: failed to open file", fullPath) + return NewErrorf(NotSpecified, err, "%v: failed to open file", filePath) } if !readall { _, err = newFile.Seek(0, io.SeekEnd) if err != nil { - return NewErrorf(NotSpecified, err, "%v: failed to seek to end of file", fullPath) + return NewErrorf(NotSpecified, err, "%v: failed to seek to end of file", filePath) } } fileLogger = fileLogger.WithField("fd", newFile.Fd()) @@ -270,9 +280,9 @@ func (w *watcher) syncFilesInDir(dir *os.File, readall bool, log logrus.FieldLog return NewErrorf(NotSpecified, err, "%v: failed to watch file", newFile.Name()) } newFileWithReader = &fileWithReader{file: newFile, reader: NewLineReader()} - tailerErr = w.readNewLines(newFileWithReader, fileLogger) - if tailerErr != nil { - return tailerErr + Err = w.readNewLines(newFileWithReader, fileLogger) + if Err != nil { + return Err } watchedFilesAfter = append(watchedFilesAfter, newFileWithReader) } @@ -280,6 +290,7 @@ func (w *watcher) syncFilesInDir(dir *os.File, readall bool, log logrus.FieldLog if !contains(watchedFilesAfter, f) { fileLogger = log.WithField("file", f.file.Name()).WithField("fd", f.file.Fd()) fileLogger.Info("file was removed, closing and un-watching") + // TODO: explicit un-watch needed, or are kevents for deleted files removed automatically? f.file.Close() } } @@ -296,20 +307,20 @@ func (w *watcher) processEvent(kevent syscall.Kevent_t, log logrus.FieldLogger) for _, dir = range w.watchedDirs { if kevent.Ident == fdToInt(dir.Fd()) { dirLogger = log.WithField("directory", dir.Name()) - dirLogger.Debugf("dir event with fflags %v", fflags2string(kevent)) + dirLogger.Debugf("dir event: %v", kevent) return w.processDirEvent(kevent, dir, dirLogger) } } for _, file = range w.watchedFiles { if kevent.Ident == fdToInt(file.file.Fd()) { fileLogger = log.WithField("file", file.file.Name()).WithField("fd", file.file.Fd()) - fileLogger.Debugf("file event with fflags %v", fflags2string(kevent)) + fileLogger.Debugf("file event: %v", kevent) return w.processFileEvent(kevent, file, fileLogger) } } // Events for unknown file descriptors are ignored. This might happen if syncFilesInDir() already // closed a file while a pending event is still coming in. - log.Debugf("event for unknown file descriptor %v with fflags %v", kevent.Ident, fflags2string(kevent)) + log.Debugf("event for unknown file descriptor %v: %v", kevent.Ident, kevent) return nil } @@ -399,51 +410,43 @@ func (w *watcher) readNewLines(file *fileWithReader, log logrus.FieldLogger) Err func (w *watcher) checkMissingFile() Error { OUTER: - for _, glob := range w.globs { + for _, g := range w.globs { for _, watchedFile := range w.watchedFiles { - if match, _ := filepath.Match(glob, watchedFile.file.Name()); match { + if g.Match(watchedFile.file.Name()) { continue OUTER } } - return NewErrorf(FileNotFound, nil, "%v: no such file", glob) + // Error message must be phrased so that it makes sense for globs, + // but also if g is a plain path without wildcards. + return NewErrorf(FileNotFound, nil, "%v: no such file", g) } return nil } -// gets the base directories from the glob expressions, -// makes sure the paths exist and point to directories. -func uniqueBaseDirs(globs []string) ([]string, error) { +// Gets the directory paths from the glob expressions, +// and makes sure these directories exist. +func uniqueDirs(globs []glob.Glob) ([]string, Error) { var ( result = make([]string, 0, len(globs)) - dirPath string + g glob.Glob + dirInfo os.FileInfo err error - errMsg string - g string ) for _, g = range globs { - dirPath, err = filepath.Abs(filepath.Dir(g)) - if err != nil { - return nil, fmt.Errorf("%q: failed to determine absolute path: %v", filepath.Dir(g), err) - } - if containsString(result, dirPath) { + if containsString(result, g.Dir()) { continue } - dirInfo, err := os.Stat(dirPath) + dirInfo, err = os.Stat(g.Dir()) if err != nil { if os.IsNotExist(err) { - errMsg = fmt.Sprintf("%v: no such directory", dirPath) - if strings.Contains(dirPath, "*") || strings.Contains(dirPath, "?") || strings.Contains(dirPath, "[") { - return nil, fmt.Errorf("%v: note that wildcards are only supported for files but not for directories", errMsg) - } else { - return nil, errors.New(errMsg) - } + return nil, NewErrorf(DirectoryNotFound, nil, "%q: no such directory", g.Dir()) } - return nil, err + return nil, NewErrorf(NotSpecified, err, "%q: stat() failed", g.Dir()) } if !dirInfo.IsDir() { - return nil, fmt.Errorf("%v is not a directory", dirPath) + return nil, NewErrorf(NotSpecified, nil, "%q is not a directory", g.Dir()) } - result = append(result, dirPath) + result = append(result, g.Dir()) } return result, nil } @@ -460,26 +463,26 @@ func isTruncated(file *os.File) (bool, error) { return currentPos > fileInfo.Size(), nil } -func anyGlobMatches(globs []string, path string) bool { +func anyGlobMatches(globs []glob.Glob, path string) bool { for _, pattern := range globs { - if match, _ := filepath.Match(pattern, path); match { + if pattern.Match(path) { return true } } return false } -func findSameFile(watchedFiles []*fileWithReader, other os.FileInfo) (*fileWithReader, Error) { +func (w *watcher) findSameFile(file os.FileInfo) (*fileWithReader, Error) { var ( fileInfo os.FileInfo err error ) - for _, watchedFile := range watchedFiles { + for _, watchedFile := range w.watchedFiles { fileInfo, err = watchedFile.file.Stat() if err != nil { return nil, NewErrorf(NotSpecified, err, "%v: stat failed", watchedFile.file.Name()) } - if os.SameFile(fileInfo, other) { + if os.SameFile(fileInfo, file) { return watchedFile, nil } } @@ -536,29 +539,3 @@ func makeEvent(file *os.File) syscall.Kevent_t { Udata: nil, } } - -func fflags2string(event syscall.Kevent_t) string { - result := make([]string, 0, 1) - if event.Fflags&syscall.NOTE_DELETE == syscall.NOTE_DELETE { - result = append(result, "NOTE_DELETE") - } - if event.Fflags&syscall.NOTE_WRITE == syscall.NOTE_WRITE { - result = append(result, "NOTE_WRITE") - } - if event.Fflags&syscall.NOTE_EXTEND == syscall.NOTE_EXTEND { - result = append(result, "NOTE_EXTEND") - } - if event.Fflags&syscall.NOTE_ATTRIB == syscall.NOTE_ATTRIB { - result = append(result, "NOTE_ATTRIB") - } - if event.Fflags&syscall.NOTE_LINK == syscall.NOTE_LINK { - result = append(result, "NOTE_LINK") - } - if event.Fflags&syscall.NOTE_RENAME == syscall.NOTE_RENAME { - result = append(result, "NOTE_RENAME") - } - if event.Fflags&syscall.NOTE_REVOKE == syscall.NOTE_REVOKE { - result = append(result, "NOTE_REVOKE") - } - return strings.Join(result, ", ") -} diff --git a/tailer/fswatcher/fswatcher_linux.go b/tailer/fswatcher/fswatcher_linux.go new file mode 100644 index 00000000..eb77e226 --- /dev/null +++ b/tailer/fswatcher/fswatcher_linux.go @@ -0,0 +1,443 @@ +// Copyright 2019 The grok_exporter Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fswatcher + +import ( + "fmt" + "github.com/fstab/grok_exporter/tailer/glob" + "github.com/sirupsen/logrus" + "io" + "os" + "path/filepath" + "syscall" +) + +type watcher struct { + globs []glob.Glob + watchedDirs map[int]string // watch descriptor -> path + watchedFiles map[string]*fileWithReader // path -> fileWithReader + fd int + lines chan Line + errors chan Error + done chan struct{} +} + +type fileWithReader struct { + file *os.File + reader *lineReader +} + +func (w *watcher) Lines() chan Line { + return w.lines +} + +func (w *watcher) Errors() chan Error { + return w.errors +} + +func (w *watcher) Close() { + // Closing the done channel will stop the consumer loop. + // Deferred functions within the consumer loop will close the producer loop. + close(w.done) +} + +func Run(globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FSWatcher, Error) { + + var ( + w *watcher + err error + Err Error + ) + + w, Err = initWatcher(globs) + if Err != nil { + return nil, Err + } + + go func() { + + defer func() { + + close(w.lines) + close(w.errors) + + warnf := func(format string, args ...interface{}) { + log.Warnf("error while shutting down the file system watcher: %v", fmt.Sprint(format, args)) + } + + for wd, dirPath := range w.watchedDirs { + // After calling eventProducerLoop.Close(), we need to call inotify_rm_watch() + // in order to terminate the inotify loop. See eventProducerLoop.Close(). + success, err := syscall.InotifyRmWatch(w.fd, uint32(wd)) + if success != 0 || err != nil { + warnf("inotify_rm_watch(%q) failed: status=%v, err=%v", dirPath, success, err) + } + } + + err = syscall.Close(w.fd) + if err != nil { + warnf("failed to close the inotify file descriptor: %v", err) + } + + for _, file := range w.watchedFiles { + err = file.file.Close() + if err != nil { + warnf("close(%q) failed: %v", file.file.Name(), err) + } + } + }() + + Err = w.watchDirs(log) + if Err != nil { + select { + case <-w.done: + case w.errors <- Err: + } + return + } + + eventProducerLoop := runInotifyLoop(w.fd) + defer eventProducerLoop.Close() + + for _, dirPath := range w.watchedDirs { + dirLogger := log.WithField("directory", dirPath) + dirLogger.Debugf("initializing directory") + Err = w.syncFilesInDir(dirPath, readall, dirLogger) + if Err != nil { + select { + case <-w.done: + case w.errors <- Err: + return + } + } + } + + // make sure at least one logfile was found for each glob + if failOnMissingFile { + missingFileError := w.checkMissingFile() + if missingFileError != nil { + select { + case <-w.done: + case w.errors <- missingFileError: + } + return + } + } + + for { // inotify event consumer loop + select { + case <-w.done: + return + case event := <-eventProducerLoop.events: + processEventError := w.processEvent(event, log) + if processEventError != nil { + select { + case <-w.done: + case w.errors <- processEventError: + } + return + } + case err := <-eventProducerLoop.errors: + select { + case <-w.done: + case w.errors <- err: + } + return + } + } + }() + return w, nil +} + +func initWatcher(globs []glob.Glob) (*watcher, Error) { + var ( + w = &watcher{ + globs: globs, + watchedDirs: make(map[int]string), + watchedFiles: make(map[string]*fileWithReader), + lines: make(chan Line), + errors: make(chan Error), + done: make(chan struct{}), + } + err error + ) + w.fd, err = syscall.InotifyInit1(syscall.IN_CLOEXEC) + if err != nil { + return nil, NewError(NotSpecified, err, "inotify_init1() failed") + } + return w, nil +} + +func (w *watcher) watchDirs(log logrus.FieldLogger) Error { + var ( + wd int + dirPaths []string + dirPath string + Err Error + err error + ) + dirPaths, Err = uniqueDirs(w.globs) + if Err != nil { + return Err + } + for _, dirPath = range dirPaths { + log.Debugf("watching directory %v", dirPath) + wd, err = syscall.InotifyAddWatch(w.fd, dirPath, syscall.IN_MODIFY|syscall.IN_MOVED_FROM|syscall.IN_MOVED_TO|syscall.IN_DELETE|syscall.IN_CREATE) + if err != nil { + w.Close() + return NewErrorf(NotSpecified, err, "%q: inotify_add_watch() failed", dirPath) + } + w.watchedDirs[wd] = dirPath + } + return nil +} + +func (w *watcher) syncFilesInDir(dirPath string, readall bool, log logrus.FieldLogger) Error { + var ( + watchedFilesAfter = make(map[string]*fileWithReader) + ) + dir, err := os.Open(dirPath) + if err != nil { + return NewErrorf(NotSpecified, err, "%q: failed to open directory", dirPath) + } + defer dir.Close() + fileInfos, err := dir.Readdir(-1) + if err != nil { + return NewErrorf(NotSpecified, err, "%q: failed to read directory", dirPath) + } + for _, fileInfo := range fileInfos { + filePath := filepath.Join(dirPath, fileInfo.Name()) + fileLogger := log.WithField("file", filePath) + if !anyGlobMatches(w.globs, filePath) { + fileLogger.Debug("skipping file, because no glob matches") + continue + } + if fileInfo.IsDir() { + fileLogger.Debug("skipping, because it is a directory") + continue + } + existingFilePath, Err := w.findSameFile(fileInfo) + if Err != nil { + return Err + } + if len(existingFilePath) > 0 { + existingFile := w.watchedFiles[existingFilePath] + if existingFilePath != filePath { + fileLogger.WithField("fd", existingFile.file.Fd()).Infof("file was moved from %v", existingFilePath) + existingFile.file = os.NewFile(existingFile.file.Fd(), filePath) + } else { + fileLogger.Debug("skipping, because file is already watched") + } + watchedFilesAfter[filePath] = existingFile + continue + } + newFile, err := os.Open(filePath) + if err != nil { + return NewErrorf(NotSpecified, err, "%v: failed to open file", newFile) + } + if !readall { + _, err = newFile.Seek(0, io.SeekEnd) + if err != nil { + return NewErrorf(NotSpecified, err, "%v: failed to seek to end of file", filePath) + } + } + fileLogger = fileLogger.WithField("fd", newFile.Fd()) + fileLogger.Info("watching new file") + newFileWithReader := &fileWithReader{file: newFile, reader: NewLineReader()} + Err = w.readNewLines(newFileWithReader, fileLogger) + if Err != nil { + return Err + } + watchedFilesAfter[filePath] = newFileWithReader + } + for _, f := range w.watchedFiles { + if !contains(watchedFilesAfter, f) { + fileLogger := log.WithField("file", f.file.Name()).WithField("fd", f.file.Fd()) + fileLogger.Info("file was removed, closing and un-watching") + f.file.Close() + } + } + w.watchedFiles = watchedFilesAfter + return nil +} + +func (w *watcher) processEvent(event inotifyEvent, log logrus.FieldLogger) Error { + dirPath, ok := w.watchedDirs[int(event.Wd)] + if !ok { + return NewError(NotSpecified, nil, "watch list inconsistent: received a file system event for an unknown directory") + } + log.WithField("directory", dirPath).Debugf("received event: %v", event) + if event.Mask&syscall.IN_IGNORED == syscall.IN_IGNORED { + delete(w.watchedDirs, int(event.Wd)) + return NewErrorf(NotSpecified, nil, "%s: directory was removed while being watched", dirPath) + } + if event.Mask&syscall.IN_MODIFY == syscall.IN_MODIFY { + file, ok := w.watchedFiles[filepath.Join(dirPath, event.Name)] + if !ok { + return nil // unrelated file was modified + } + truncated, err := isTruncated(file.file) + if err != nil { + return NewErrorf(NotSpecified, err, "%v: seek() or stat() failed", file.file.Name()) + } + if truncated { + _, err = file.file.Seek(0, io.SeekStart) + if err != nil { + return NewErrorf(NotSpecified, err, "%v: seek() failed", file.file.Name()) + } + file.reader.Clear() + } + readErr := w.readNewLines(file, log) + if readErr != nil { + return readErr + } + } + if event.Mask&syscall.IN_MOVED_FROM == syscall.IN_MOVED_FROM || event.Mask&syscall.IN_DELETE == syscall.IN_DELETE || event.Mask&syscall.IN_CREATE == syscall.IN_CREATE || event.Mask&syscall.IN_MOVED_TO == syscall.IN_MOVED_TO { + // There are a lot of corner cases here: + // * a file is renamed, but still matches the pattern so we continue watching it (MOVED_FROM followed by MOVED_TO) + // * a file is created overwriting an existing file + // * a file is moved to the watched directory overwriting an existing file + // Trying to figure out what happened from the events would be error prone. + // Therefore, we don't care which of the above events we received, we just update our watched files with the current + // state of the watched directory. + err := w.syncFilesInDir(dirPath, true, log) + if err != nil { + return err + } + } + return nil +} + +func (w *watcher) readNewLines(file *fileWithReader, log logrus.FieldLogger) Error { + var ( + line string + eof bool + err error + ) + for { + line, eof, err = file.reader.ReadLine(file.file) + if err != nil { + return NewErrorf(NotSpecified, err, "%v: read() failed", file.file.Name()) + } + if eof { + return nil + } + log.Debugf("read line %q", line) + select { + case <-w.done: + return nil + case w.lines <- Line{Line: line, File: file.file.Name()}: + } + } +} + +func (w *watcher) checkMissingFile() Error { +OUTER: + for _, g := range w.globs { + for _, watchedFile := range w.watchedFiles { + if g.Match(watchedFile.file.Name()) { + continue OUTER + } + } + // Error message must be phrased so that it makes sense for globs, + // but also if g is a plain path without wildcards. + return NewErrorf(FileNotFound, nil, "%v: no such file", g) + } + return nil +} + +// Gets the directory paths from the glob expressions, +// and makes sure these directories exist. +func uniqueDirs(globs []glob.Glob) ([]string, Error) { + var ( + result = make([]string, 0, len(globs)) + g glob.Glob + dirInfo os.FileInfo + err error + ) + for _, g = range globs { + if containsString(result, g.Dir()) { + continue + } + dirInfo, err = os.Stat(g.Dir()) + if err != nil { + if os.IsNotExist(err) { + return nil, NewErrorf(DirectoryNotFound, nil, "%q: no such directory", g.Dir()) + } + return nil, NewErrorf(NotSpecified, err, "%q: stat() failed", g.Dir()) + } + if !dirInfo.IsDir() { + return nil, NewErrorf(NotSpecified, nil, "%q is not a directory", g.Dir()) + } + result = append(result, g.Dir()) + } + return result, nil +} + +func isTruncated(file *os.File) (bool, error) { + currentPos, err := file.Seek(0, io.SeekCurrent) + if err != nil { + return false, err + } + fileInfo, err := file.Stat() + if err != nil { + return false, err + } + return currentPos > fileInfo.Size(), nil +} + +func anyGlobMatches(globs []glob.Glob, path string) bool { + for _, pattern := range globs { + if pattern.Match(path) { + return true + } + } + return false +} + +func (w *watcher) findSameFile(file os.FileInfo) (string, Error) { + var ( + fileInfo os.FileInfo + err error + ) + for watchedFilePath, watchedFile := range w.watchedFiles { + fileInfo, err = watchedFile.file.Stat() + if err != nil { + return "", NewErrorf(NotSpecified, err, "%v: stat failed", watchedFile.file.Name()) + } + if os.SameFile(fileInfo, file) { + return watchedFilePath, nil + } + } + return "", nil +} + +func containsString(list []string, s string) bool { + for _, existing := range list { + if existing == s { + return true + } + } + return false +} + +func contains(list map[string]*fileWithReader, f *fileWithReader) bool { + for _, existing := range list { + if existing == f { + return true + } + } + return false +} diff --git a/tailer/fswatcher/inotifyloop_linux.go b/tailer/fswatcher/inotifyloop_linux.go new file mode 100644 index 00000000..eb2897ec --- /dev/null +++ b/tailer/fswatcher/inotifyloop_linux.go @@ -0,0 +1,155 @@ +// Copyright 2019 The grok_exporter Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fswatcher + +import ( + "fmt" + "strings" + "syscall" + "unsafe" +) + +type inotifyloop struct { + fd int + events chan inotifyEvent + errors chan Error + done chan struct{} +} + +type inotifyEvent struct { + syscall.InotifyEvent + Name string +} + +// Terminate the inotify loop. +// If the loop hangs in syscall.Read(), it will keep hanging there until the next event is read. +// Therefore, after the consumer called Close(), it should generate an artificial IN_IGNORE event to +// interrupt syscall.Read(). This can be done by calling inotify_rm_watch() on one of the watched directories. +func (l *inotifyloop) Close() { + close(l.done) +} + +func runInotifyLoop(fd int) *inotifyloop { + var result = &inotifyloop{ + fd: fd, + events: make(chan inotifyEvent), + errors: make(chan Error), + done: make(chan struct{}), + } + go func(l *inotifyloop) { + var ( + n, offset int + event inotifyEvent + bytes *[syscall.NAME_MAX]byte + buf = make([]byte, (syscall.SizeofInotifyEvent+syscall.NAME_MAX+1)*10) + err error + ) + defer func() { + close(result.errors) + close(result.events) + }() + for { + n, err = syscall.Read(l.fd, buf) + if err != nil { + select { + case l.errors <- NewError(NotSpecified, err, "failed to read inotify events"): + case <-l.done: + } + return + } + for offset = 0; offset < n; { + if n-offset < syscall.SizeofInotifyEvent { + select { + case l.errors <- NewError(NotSpecified, nil, fmt.Sprintf("inotify: read %v bytes, but sizeof(struct inotify_event) is %v bytes.", n, syscall.SizeofInotifyEvent)): + case <-l.done: + } + return + } + event = inotifyEvent{*(*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])), ""} + if event.Len > 0 { + bytes = (*[syscall.NAME_MAX]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent])) + event.Name = strings.TrimRight(string(bytes[0:event.Len]), "\000") + } + select { + case l.events <- event: + case <-l.done: + return + } + if event.Mask&syscall.IN_IGNORED == syscall.IN_IGNORED { + // IN_IGNORED event can have two reasons: + // 1) The consumer loop is shutting down and called inotify_rm_watch() to interrupt syscall.Read() + // 2) The watched directory was deleted. fswatcher will report an error and terminate if that happens. + // In both cases, we should terminate here and not call syscall.Read() again, as the next + // call might block forever as we don't receive events anymore. + return + } + offset += syscall.SizeofInotifyEvent + int(event.Len) + } + } + }(result) + return result +} + +func (e inotifyEvent) String() string { + var name = "" + if len(e.Name) > 0 { + name = e.Name + } + return fmt.Sprintf("%v: %v", name, flags2string(e.InotifyEvent)) +} + +func flags2string(event syscall.InotifyEvent) string { + result := make([]string, 0, 1) + if event.Mask&syscall.IN_ACCESS == syscall.IN_ACCESS { + result = append(result, "IN_ACCESS") + } + if event.Mask&syscall.IN_ATTRIB == syscall.IN_ATTRIB { + result = append(result, "IN_ATTRIB") + } + if event.Mask&syscall.IN_CLOSE_WRITE == syscall.IN_CLOSE_WRITE { + result = append(result, "IN_CLOSE_WRITE") + } + if event.Mask&syscall.IN_CLOSE_NOWRITE == syscall.IN_CLOSE_NOWRITE { + result = append(result, "IN_CLOSE_NOWRITE") + } + if event.Mask&syscall.IN_CREATE == syscall.IN_CREATE { + result = append(result, "IN_CREATE") + } + if event.Mask&syscall.IN_DELETE == syscall.IN_DELETE { + result = append(result, "IN_DELETE") + } + if event.Mask&syscall.IN_DELETE_SELF == syscall.IN_DELETE_SELF { + result = append(result, "IN_DELETE_SELF") + } + if event.Mask&syscall.IN_IGNORED == syscall.IN_IGNORED { + result = append(result, "IN_IGNORED") + } + if event.Mask&syscall.IN_MODIFY == syscall.IN_MODIFY { + result = append(result, "IN_MODIFY") + } + if event.Mask&syscall.IN_MOVE_SELF == syscall.IN_MOVE_SELF { + result = append(result, "IN_MOVE_SELF") + } + if event.Mask&syscall.IN_MOVED_FROM == syscall.IN_MOVED_FROM { + result = append(result, "IN_MOVED_FROM") + } + if event.Mask&syscall.IN_MOVED_TO == syscall.IN_MOVED_TO { + result = append(result, "IN_MOVED_TO") + } + if event.Mask&syscall.IN_OPEN == syscall.IN_OPEN { + result = append(result, "IN_OPEN") + } + return strings.Join(result, ", ") +} diff --git a/tailer/fswatcher/keventloop_darwin.go b/tailer/fswatcher/keventloop_darwin.go index 1c85d5ff..96ccc4fe 100644 --- a/tailer/fswatcher/keventloop_darwin.go +++ b/tailer/fswatcher/keventloop_darwin.go @@ -15,6 +15,7 @@ package fswatcher import ( + "strings" "syscall" ) @@ -25,10 +26,13 @@ type keventloop struct { done chan struct{} } +type Kevent syscall.Kevent_t + +// Terminate the kevent loop. +// If the loop hangs in syscall.Kevent(), it will keep hanging there until the next event is read. +// Therefore, after the consumer called Close(), it should interrupt the kevent() call by closing the kq descriptor. func (p *keventloop) Close() { close(p.done) - // closing the kq file descriptor will interrupt syscall.Kevent() - syscall.Close(p.kq) } func runKeventLoop(kq int) *keventloop { @@ -73,3 +77,29 @@ func runKeventLoop(kq int) *keventloop { }(result) return result } + +func (event Kevent) String() string { + result := make([]string, 0, 1) + if event.Fflags&syscall.NOTE_DELETE == syscall.NOTE_DELETE { + result = append(result, "NOTE_DELETE") + } + if event.Fflags&syscall.NOTE_WRITE == syscall.NOTE_WRITE { + result = append(result, "NOTE_WRITE") + } + if event.Fflags&syscall.NOTE_EXTEND == syscall.NOTE_EXTEND { + result = append(result, "NOTE_EXTEND") + } + if event.Fflags&syscall.NOTE_ATTRIB == syscall.NOTE_ATTRIB { + result = append(result, "NOTE_ATTRIB") + } + if event.Fflags&syscall.NOTE_LINK == syscall.NOTE_LINK { + result = append(result, "NOTE_LINK") + } + if event.Fflags&syscall.NOTE_RENAME == syscall.NOTE_RENAME { + result = append(result, "NOTE_RENAME") + } + if event.Fflags&syscall.NOTE_REVOKE == syscall.NOTE_REVOKE { + result = append(result, "NOTE_REVOKE") + } + return strings.Join(result, ", ") +} diff --git a/tailer/glob/glob.go b/tailer/glob/glob.go index 69c84b81..1f4ec3f4 100644 --- a/tailer/glob/glob.go +++ b/tailer/glob/glob.go @@ -15,65 +15,54 @@ package glob import ( + "fmt" + "path/filepath" "runtime" ) -type charClassItem int // produced by the lexer lexing character classes (like [a-z]) in a pattern +type Glob string -const ( - charItem charClassItem = iota // regular character, including escaped special characters - minusItem // minus symbol in a character range, like in 'a-z' -) +func Parse(pattern string) (Glob, error) { + var ( + result Glob + absglob string + err error + ) + if !IsPatternValid(pattern) { + return "", fmt.Errorf("%q: invalid glob pattern", pattern) + } + absglob, err = filepath.Abs(pattern) + if err != nil { + return "", fmt.Errorf("%q: failed to finnd absolute path for glob pattern: %v", pattern, err) + } + result = Glob(absglob) + if containsWildcards(result.Dir()) { + return "", fmt.Errorf("%q: wildcards are only allowed in the file name, but not in the directory path", pattern) + } + return result, nil +} + +func (g Glob) Dir() string { + return filepath.Dir(string(g)) +} + +func (g Glob) Match(path string) bool { + matched, _ := filepath.Match(string(g), path) + return matched +} -// If IsPatternValid(pattern) is true, filepath.Match(pattern, name) will not return an error. -// See also https://go-review.googlesource.com/c/go/+/143477 -func IsPatternValid(pattern string) bool { +func containsWildcards(pattern string) bool { p := []rune(pattern) - charClassItems := make([]charClassItem, 0) // captures content of '[...]' - insideCharClass := false // we saw a '[' but no ']' yet - escaped := false // p[i] is escaped by '\\' + escaped := false // p[i] is escaped by '\\' for i := 0; i < len(p); i++ { - switch { - case p[i] == '\\' && !escaped && runtime.GOOS != "windows": + if p[i] == '\\' && !escaped && runtime.GOOS != "windows" { escaped = true continue - case !insideCharClass && p[i] == '[' && !escaped: - insideCharClass = true - if i+1 < len(p) && p[i+1] == '^' { - i++ // It doesn't matter if the char class starts with '[' or '[^'. - } - case insideCharClass && !escaped && p[i] == '-': - charClassItems = append(charClassItems, minusItem) - case insideCharClass && !escaped && p[i] == ']': - if !isCharClassValid(charClassItems) { - return false - } - charClassItems = charClassItems[:0] - insideCharClass = false - case insideCharClass: - charClassItems = append(charClassItems, charItem) } - escaped = false - } - return !escaped && !insideCharClass -} - -func isCharClassValid(charClassItems []charClassItem) bool { - if len(charClassItems) == 0 { - return false - } - for i := 0; i < len(charClassItems); i++ { - if charClassItems[i] == minusItem { + if !escaped && (p[i] == '[' || p[i] == '*' || p[i] == '?') { return false } - if i+1 < len(charClassItems) { - if charClassItems[i+1] == minusItem { - i += 2 - if i >= len(charClassItems) || charClassItems[i] == minusItem { - return false - } - } - } + escaped = false } - return true + return false } diff --git a/tailer/glob/validator.go b/tailer/glob/validator.go new file mode 100644 index 00000000..69c84b81 --- /dev/null +++ b/tailer/glob/validator.go @@ -0,0 +1,79 @@ +// Copyright 2018 The grok_exporter Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package glob + +import ( + "runtime" +) + +type charClassItem int // produced by the lexer lexing character classes (like [a-z]) in a pattern + +const ( + charItem charClassItem = iota // regular character, including escaped special characters + minusItem // minus symbol in a character range, like in 'a-z' +) + +// If IsPatternValid(pattern) is true, filepath.Match(pattern, name) will not return an error. +// See also https://go-review.googlesource.com/c/go/+/143477 +func IsPatternValid(pattern string) bool { + p := []rune(pattern) + charClassItems := make([]charClassItem, 0) // captures content of '[...]' + insideCharClass := false // we saw a '[' but no ']' yet + escaped := false // p[i] is escaped by '\\' + for i := 0; i < len(p); i++ { + switch { + case p[i] == '\\' && !escaped && runtime.GOOS != "windows": + escaped = true + continue + case !insideCharClass && p[i] == '[' && !escaped: + insideCharClass = true + if i+1 < len(p) && p[i+1] == '^' { + i++ // It doesn't matter if the char class starts with '[' or '[^'. + } + case insideCharClass && !escaped && p[i] == '-': + charClassItems = append(charClassItems, minusItem) + case insideCharClass && !escaped && p[i] == ']': + if !isCharClassValid(charClassItems) { + return false + } + charClassItems = charClassItems[:0] + insideCharClass = false + case insideCharClass: + charClassItems = append(charClassItems, charItem) + } + escaped = false + } + return !escaped && !insideCharClass +} + +func isCharClassValid(charClassItems []charClassItem) bool { + if len(charClassItems) == 0 { + return false + } + for i := 0; i < len(charClassItems); i++ { + if charClassItems[i] == minusItem { + return false + } + if i+1 < len(charClassItems) { + if charClassItems[i+1] == minusItem { + i += 2 + if i >= len(charClassItems) || charClassItems[i] == minusItem { + return false + } + } + } + } + return true +} diff --git a/tailer/glob/glob_test.go b/tailer/glob/validator_test.go similarity index 100% rename from tailer/glob/glob_test.go rename to tailer/glob/validator_test.go diff --git a/tailer/pollingFileTailer.go b/tailer/pollingFileTailer.go index ba566e34..9c545a88 100644 --- a/tailer/pollingFileTailer.go +++ b/tailer/pollingFileTailer.go @@ -15,6 +15,7 @@ package tailer import ( + "github.com/sirupsen/logrus" "io" "time" ) @@ -83,7 +84,7 @@ func (l *pollingEventLoop) Events() chan Events { return l.events } -func (e *pollingEvent) Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) { +func (e *pollingEvent) Process(fileBefore *File, reader *lineReader, abspath string, _ logrus.FieldLogger) (file *File, lines []string, err error) { var ( truncated, moved, eof bool filename, line string diff --git a/tailer/watcher.go b/tailer/watcher.go index 9024df55..aa1034f0 100644 --- a/tailer/watcher.go +++ b/tailer/watcher.go @@ -15,6 +15,7 @@ package tailer import ( + "github.com/sirupsen/logrus" "io" ) @@ -34,5 +35,5 @@ type EventLoop interface { // File system events. // The operating system may return more than one event for each read, so it's plural. type Events interface { - Process(fileBefore *File, reader *lineReader, abspath string, logger simpleLogger) (file *File, lines []string, err error) + Process(fileBefore *File, reader *lineReader, abspath string, logger logrus.FieldLogger) (file *File, lines []string, err error) }