diff --git a/pkg/logs/input/tailer/scanner.go b/pkg/logs/input/tailer/scanner.go index cf40fff2f3801f..c9cbdb216deeea 100644 --- a/pkg/logs/input/tailer/scanner.go +++ b/pkg/logs/input/tailer/scanner.go @@ -20,6 +20,16 @@ import ( // scanPeriod represents the period of scanning const scanPeriod = 10 * time.Second +// tailingContext represents the context just before starting a new tailer +type tailingContext int + +// different kind of contexts +const ( + settingUp tailingContext = iota + didFileRotate + pickedNewFile +) + // Scanner checks all files provided by fileProvider and create new tailers // or update the old ones if needed type Scanner struct { @@ -63,21 +73,32 @@ func (s *Scanner) setup() { if _, ok := s.tailers[file.Path]; ok { log.Warn("Can't tail file twice: ", file.Path) } else { - s.setupTailer(file, false, s.pp.NextPipelineChan()) + s.setupTailer(file, settingUp, s.pp.NextPipelineChan()) } } } // setupTailer sets one tailer, making it tail from the beginning or the end // returns true if the setup succeeded, false otherwise -func (s *Scanner) setupTailer(file *File, tailFromBeginning bool, outputChan chan message.Message) bool { +func (s *Scanner) setupTailer(file *File, context tailingContext, outputChan chan message.Message) bool { t := NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration) var err error - if tailFromBeginning { - err = t.tailFromBeginning() - } else { - // resume tailing from last committed offset + switch context { + case settingUp: + // resume tailing from last committed offset if exists or start tailing from end of file err = t.recoverTailing(s.auditor.GetLastCommittedOffset(t.Identifier())) + case didFileRotate: + // force reading file from beginning since it has been log-rotated + err = t.tailFromBeginning() + case pickedNewFile: + // check if an offset has been committed previously + // and tail either the file from the offset or the beginning + offset, whence := s.auditor.GetLastCommittedOffset(t.Identifier()) + if offset != 0 { + err = t.recoverTailing(offset, whence) + } else { + err = t.tailFromBeginning() + } } if err != nil { log.Warn(err) @@ -146,7 +167,7 @@ func (s *Scanner) scan() { if !isTailed && tailersLen < s.tailingLimit { // create new tailer for file - succeeded := s.setupTailer(file, false, s.pp.NextPipelineChan()) + succeeded := s.setupTailer(file, pickedNewFile, s.pp.NextPipelineChan()) if !succeeded { // the setup failed, let's try to tail this file in the next scan continue @@ -192,7 +213,7 @@ func (s *Scanner) didFileRotate(file *File, tailer *Tailer) (bool, error) { func (s *Scanner) onFileRotation(tailer *Tailer, file *File) bool { log.Info("Log rotation happened to ", tailer.path) tailer.StopAfterFileRotation() - return s.setupTailer(file, true, tailer.outputChan) + return s.setupTailer(file, didFileRotate, tailer.outputChan) } // stopTailer stops the tailer diff --git a/pkg/logs/input/tailer/scanner_test.go b/pkg/logs/input/tailer/scanner_test.go index 8b32126c3df533..c62b32759c134c 100644 --- a/pkg/logs/input/tailer/scanner_test.go +++ b/pkg/logs/input/tailer/scanner_test.go @@ -193,6 +193,48 @@ func TestScannerTestSuite(t *testing.T) { suite.Run(t, new(ScannerTestSuite)) } +func TestScannerScanWithTheNewFile(t *testing.T) { + var err error + var path string + var file *os.File + var tailer *Tailer + var msg message.Message + + testDir, err := ioutil.TempDir("", "log-scanner-test-") + assert.Nil(t, err) + + // create scanner + path = fmt.Sprintf("%s/*.log", testDir) + sources := []*config.LogSource{config.NewLogSource("", &config.LogsConfig{Type: config.FileType, Path: path})} + openFilesLimit := 2 + sleepDuration := 20 * time.Millisecond + scanner := New(sources, openFilesLimit, mock.NewMockProvider(), auditor.New(nil, ""), sleepDuration) + + // setup scanner + scanner.setup() + assert.Equal(t, 0, len(scanner.tailers)) + + // create file + path = fmt.Sprintf("%s/test.log", testDir) + file, err = os.Create(path) + assert.Nil(t, err) + + // add content + _, err = file.WriteString("hello\n") + assert.Nil(t, err) + _, err = file.WriteString("world\n") + assert.Nil(t, err) + + // test scan from beginning + scanner.scan() + assert.Equal(t, 1, len(scanner.tailers)) + tailer = scanner.tailers[path] + msg = <-tailer.outputChan + assert.Equal(t, "hello", string(msg.Content())) + msg = <-tailer.outputChan + assert.Equal(t, "world", string(msg.Content())) +} + func TestScannerScanWithTooManyFiles(t *testing.T) { var err error var path string diff --git a/releasenotes/notes/logs-fix-lines-miss-issue-7bc7046f4e6b77bc.yaml b/releasenotes/notes/logs-fix-lines-miss-issue-7bc7046f4e6b77bc.yaml new file mode 100644 index 00000000000000..f74cf6e842291e --- /dev/null +++ b/releasenotes/notes/logs-fix-lines-miss-issue-7bc7046f4e6b77bc.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + Fix line miss [issue](https://github.com/DataDog/datadog-agent/issues/1302) that could happen when tailing new files found when scanning