Skip to content

Commit

Permalink
Fixed lines miss issue when tailing new files
Browse files Browse the repository at this point in the history
  • Loading branch information
ajacquemot committed Feb 20, 2018
1 parent 0b8a507 commit 5bf4ee5
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
37 changes: 29 additions & 8 deletions pkg/logs/input/tailer/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ import (
// scanPeriod represents the period of scanning
const scanPeriod = 10 * time.Second

// tailingContext represents the context just before starting a new tailer
type tailingContext int

// different kind of contexts
const (
settingUp tailingContext = iota
didFileRotate
pickedNewFile
)

// Scanner checks all files provided by fileProvider and create new tailers
// or update the old ones if needed
type Scanner struct {
Expand Down Expand Up @@ -63,21 +73,32 @@ func (s *Scanner) setup() {
if _, ok := s.tailers[file.Path]; ok {
log.Warn("Can't tail file twice: ", file.Path)
} else {
s.setupTailer(file, false, s.pp.NextPipelineChan())
s.setupTailer(file, settingUp, s.pp.NextPipelineChan())
}
}
}

// setupTailer sets one tailer, making it tail from the beginning or the end
// returns true if the setup succeeded, false otherwise
func (s *Scanner) setupTailer(file *File, tailFromBeginning bool, outputChan chan message.Message) bool {
func (s *Scanner) setupTailer(file *File, context tailingContext, outputChan chan message.Message) bool {
t := NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration)
var err error
if tailFromBeginning {
err = t.tailFromBeginning()
} else {
// resume tailing from last committed offset
switch context {
case settingUp:
// resume tailing from last committed offset if exists or start tailing from end of file
err = t.recoverTailing(s.auditor.GetLastCommittedOffset(t.Identifier()))
case didFileRotate:
// force reading file from beginning since it has been log-rotated
err = t.tailFromBeginning()
case pickedNewFile:
// check if an offset has been committed previously
// and tail either the file from the offset or the beginning
offset, whence := s.auditor.GetLastCommittedOffset(t.Identifier())
if offset != 0 {
err = t.recoverTailing(offset, whence)
} else {
err = t.tailFromBeginning()
}
}
if err != nil {
log.Warn(err)
Expand Down Expand Up @@ -146,7 +167,7 @@ func (s *Scanner) scan() {

if !isTailed && tailersLen < s.tailingLimit {
// create new tailer for file
succeeded := s.setupTailer(file, false, s.pp.NextPipelineChan())
succeeded := s.setupTailer(file, pickedNewFile, s.pp.NextPipelineChan())
if !succeeded {
// the setup failed, let's try to tail this file in the next scan
continue
Expand Down Expand Up @@ -192,7 +213,7 @@ func (s *Scanner) didFileRotate(file *File, tailer *Tailer) (bool, error) {
func (s *Scanner) onFileRotation(tailer *Tailer, file *File) bool {
log.Info("Log rotation happened to ", tailer.path)
tailer.StopAfterFileRotation()
return s.setupTailer(file, true, tailer.outputChan)
return s.setupTailer(file, didFileRotate, tailer.outputChan)
}

// stopTailer stops the tailer
Expand Down
42 changes: 42 additions & 0 deletions pkg/logs/input/tailer/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,48 @@ func TestScannerTestSuite(t *testing.T) {
suite.Run(t, new(ScannerTestSuite))
}

func TestScannerScanWithTheNewFile(t *testing.T) {
var err error
var path string
var file *os.File
var tailer *Tailer
var msg message.Message

testDir, err := ioutil.TempDir("", "log-scanner-test-")
assert.Nil(t, err)

// create scanner
path = fmt.Sprintf("%s/*.log", testDir)
sources := []*config.LogSource{config.NewLogSource("", &config.LogsConfig{Type: config.FileType, Path: path})}
openFilesLimit := 2
sleepDuration := 20 * time.Millisecond
scanner := New(sources, openFilesLimit, mock.NewMockProvider(), auditor.New(nil, ""), sleepDuration)

// setup scanner
scanner.setup()
assert.Equal(t, 0, len(scanner.tailers))

// create file
path = fmt.Sprintf("%s/test.log", testDir)
file, err = os.Create(path)
assert.Nil(t, err)

// add content
_, err = file.WriteString("hello\n")
assert.Nil(t, err)
_, err = file.WriteString("world\n")
assert.Nil(t, err)

// test scan from beginning
scanner.scan()
assert.Equal(t, 1, len(scanner.tailers))
tailer = scanner.tailers[path]
msg = <-tailer.outputChan
assert.Equal(t, "hello", string(msg.Content()))
msg = <-tailer.outputChan
assert.Equal(t, "world", string(msg.Content()))
}

func TestScannerScanWithTooManyFiles(t *testing.T) {
var err error
var path string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
Fix line miss [issue](https://github.com/DataDog/datadog-agent/issues/1302) that could happen when tailing new files found when scanning

0 comments on commit 5bf4ee5

Please sign in to comment.