Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed lines miss issue when tailing new files #1306

Merged
merged 1 commit into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions pkg/logs/input/tailer/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ import (
// scanPeriod represents the period of scanning
const scanPeriod = 10 * time.Second

// tailingContext represents the context just before starting a new tailer
type tailingContext int

// different kind of contexts
const (
settingUp tailingContext = iota
didFileRotate
pickedNewFile
)

// Scanner checks all files provided by fileProvider and create new tailers
// or update the old ones if needed
type Scanner struct {
Expand Down Expand Up @@ -63,21 +73,32 @@ func (s *Scanner) setup() {
if _, ok := s.tailers[file.Path]; ok {
log.Warn("Can't tail file twice: ", file.Path)
} else {
s.setupTailer(file, false, s.pp.NextPipelineChan())
s.setupTailer(file, settingUp, s.pp.NextPipelineChan())
}
}
}

// setupTailer sets one tailer, making it tail from the beginning or the end
// returns true if the setup succeeded, false otherwise
func (s *Scanner) setupTailer(file *File, tailFromBeginning bool, outputChan chan message.Message) bool {
func (s *Scanner) setupTailer(file *File, context tailingContext, outputChan chan message.Message) bool {
t := NewTailer(outputChan, file.Source, file.Path, s.tailerSleepDuration)
var err error
if tailFromBeginning {
err = t.tailFromBeginning()
} else {
// resume tailing from last committed offset
switch context {
case settingUp:
// resume tailing from last committed offset if exists or start tailing from end of file
err = t.recoverTailing(s.auditor.GetLastCommittedOffset(t.Identifier()))
case didFileRotate:
// force reading file from beginning since it has been log-rotated
err = t.tailFromBeginning()
case pickedNewFile:
// check if an offset has been committed previously
// and tail either the file from the offset or the beginning
offset, whence := s.auditor.GetLastCommittedOffset(t.Identifier())
if offset != 0 {
err = t.recoverTailing(offset, whence)
} else {
err = t.tailFromBeginning()
}
}
if err != nil {
log.Warn(err)
Expand Down Expand Up @@ -146,7 +167,7 @@ func (s *Scanner) scan() {

if !isTailed && tailersLen < s.tailingLimit {
// create new tailer for file
succeeded := s.setupTailer(file, false, s.pp.NextPipelineChan())
succeeded := s.setupTailer(file, pickedNewFile, s.pp.NextPipelineChan())
if !succeeded {
// the setup failed, let's try to tail this file in the next scan
continue
Expand Down Expand Up @@ -192,7 +213,7 @@ func (s *Scanner) didFileRotate(file *File, tailer *Tailer) (bool, error) {
func (s *Scanner) onFileRotation(tailer *Tailer, file *File) bool {
log.Info("Log rotation happened to ", tailer.path)
tailer.StopAfterFileRotation()
return s.setupTailer(file, true, tailer.outputChan)
return s.setupTailer(file, didFileRotate, tailer.outputChan)
}

// stopTailer stops the tailer
Expand Down
42 changes: 42 additions & 0 deletions pkg/logs/input/tailer/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,48 @@ func TestScannerTestSuite(t *testing.T) {
suite.Run(t, new(ScannerTestSuite))
}

func TestScannerScanWithTheNewFile(t *testing.T) {
var err error
var path string
var file *os.File
var tailer *Tailer
var msg message.Message

testDir, err := ioutil.TempDir("", "log-scanner-test-")
assert.Nil(t, err)

// create scanner
path = fmt.Sprintf("%s/*.log", testDir)
sources := []*config.LogSource{config.NewLogSource("", &config.LogsConfig{Type: config.FileType, Path: path})}
openFilesLimit := 2
sleepDuration := 20 * time.Millisecond
scanner := New(sources, openFilesLimit, mock.NewMockProvider(), auditor.New(nil, ""), sleepDuration)

// setup scanner
scanner.setup()
assert.Equal(t, 0, len(scanner.tailers))

// create file
path = fmt.Sprintf("%s/test.log", testDir)
file, err = os.Create(path)
assert.Nil(t, err)

// add content
_, err = file.WriteString("hello\n")
assert.Nil(t, err)
_, err = file.WriteString("world\n")
assert.Nil(t, err)

// test scan from beginning
scanner.scan()
assert.Equal(t, 1, len(scanner.tailers))
tailer = scanner.tailers[path]
msg = <-tailer.outputChan
assert.Equal(t, "hello", string(msg.Content()))
msg = <-tailer.outputChan
assert.Equal(t, "world", string(msg.Content()))
}

func TestScannerScanWithTooManyFiles(t *testing.T) {
var err error
var path string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
Fix line miss [issue](https://github.com/DataDog/datadog-agent/issues/1302) that could happen when tailing new files found when scanning