From fc6376413211a824e46ae5a659f46fff5c382262 Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 25 Oct 2016 14:54:33 +0200 Subject: [PATCH] Move tail_files to prospector level tail_files is now only applied on the first run and after that ignored. Also the state for all files falling under tail_files and not having a state, a state will directly be written. * Implement tail_files by setting ignore_older to 1ns for the first run * Fix typo in stats variable names Closes https://github.com/elastic/beats/issues/2613 and https://github.com/elastic/beats/issues/2788 --- CHANGELOG.asciidoc | 1 + filebeat/harvester/config.go | 2 -- filebeat/harvester/log.go | 6 ----- filebeat/prospector/config.go | 2 ++ filebeat/prospector/prospector.go | 6 ++--- filebeat/prospector/prospector_log.go | 24 +++++++++++++++---- .../prospector/prospector_log_other_test.go | 4 +++- filebeat/prospector/prospector_stdin.go | 2 +- filebeat/prospector/prospector_test.go | 4 +++- 9 files changed, 32 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 3953d39c2a5..02eb63f7954 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -24,6 +24,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff] - If a file is falling under ignore_older during startup, offset is now set to end of file instead of 0. With the previous logic the whole file was sent in case a line was added and it was inconsitent with files which were harvested previously. {pull}2907[2907] +- tail_files is now only applied on the first scan and not for all new files. {pull}2932[2932] *Winlogbeat* diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index 563250ac1fb..e5fcded5292 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -18,7 +18,6 @@ var ( BufferSize: 16 * humanize.KiByte, DocumentType: "log", InputType: cfg.DefaultInputType, - TailFiles: false, Backoff: 1 * time.Second, BackoffFactor: 2, MaxBackoff: 10 * time.Second, @@ -38,7 +37,6 @@ type harvesterConfig struct { DocumentType string `config:"document_type"` Encoding string `config:"encoding"` InputType string `config:"input_type"` - TailFiles bool `config:"tail_files"` Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` BackoffFactor int `config:"backoff_factor" validate:"min=1"` MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index c0c056d867f..2a07d68ae3a 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -252,12 +252,6 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) { return file.Seek(h.state.Offset, os.SEEK_SET) } - // tail file if file is new and tail_files config is set - if h.config.TailFiles { - logp.Debug("harvester", "Setting offset for tailing file: %s.", h.state.Source) - return file.Seek(0, os.SEEK_END) - } - // get offset from file in case of encoding factory was required to read some data. logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source) return file.Seek(0, os.SEEK_CUR) diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index ee063c97c94..c73b28d4cbf 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -17,6 +17,7 @@ var ( CleanRemoved: true, HarvesterLimit: 0, Symlinks: false, + TailFiles: false, } ) @@ -30,6 +31,7 @@ type prospectorConfig struct { CleanRemoved bool `config:"clean_removed"` HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index ddb3545f7a4..bb4d7c7707b 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -34,7 +34,7 @@ type Prospector struct { } type Prospectorer interface { - Init(states []file.State) error + Init(states file.States) error Run() } @@ -61,7 +61,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros return nil, err } - err := prospector.Init(states.GetStates()) + err := prospector.Init(states) if err != nil { return nil, err } @@ -72,7 +72,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros } // Init sets up default config for prospector -func (p *Prospector) Init(states []file.State) error { +func (p *Prospector) Init(states file.States) error { var prospectorer Prospectorer var err error diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 3e779555d40..29d3d09bbac 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -15,8 +15,8 @@ import ( ) var ( - filesRenamed = expvar.NewInt("filebeat.prospector.log.files.renamed") - filesTrucated = expvar.NewInt("filebeat.prospector.log.files.truncated") + filesRenamed = expvar.NewInt("filebeat.prospector.log.files.renamed") + filesTruncated = expvar.NewInt("filebeat.prospector.log.files.truncated") ) type ProspectorLog struct { @@ -37,13 +37,14 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) { // Init sets up the prospector // It goes through all states coming from the registry. Only the states which match the glob patterns of // the prospector will be loaded and updated. All other states will not be touched. -func (p *ProspectorLog) Init(states []file.State) error { +func (p *ProspectorLog) Init(states file.States) error { logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles) - for _, state := range states { + for _, state := range states.GetStates() { // Check if state source belongs to this prospector. If yes, update the state. if p.matchesFile(state.Source) { state.TTL = -1 + // Update prospector states and send new states to registry err := p.Prospector.updateState(input.NewEvent(state)) if err != nil { @@ -60,6 +61,19 @@ func (p *ProspectorLog) Init(states []file.State) error { func (p *ProspectorLog) Run() { logp.Debug("prospector", "Start next scan") + // TailFiles is like ignore_older = 1ns and only on startup + if p.config.TailFiles { + ignoreOlder := p.config.IgnoreOlder + + // Overwrite ignore_older for the first scan + p.config.IgnoreOlder = 1 + defer func() { + // Reset ignore_older after first run + p.config.IgnoreOlder = ignoreOlder + // Disable tail_files after the first run + p.config.TailFiles = false + }() + } p.scan() // It is important that a first scan is run before cleanup to make sure all new states are read first @@ -246,7 +260,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) } - filesTrucated.Add(1) + filesTruncated.Add(1) return } diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index 06990455b53..71a42ec4eb3 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -138,7 +138,9 @@ func TestInit(t *testing.T) { Paths: test.paths, }, } - err := p.Init(test.states) + states := file.NewStates() + states.SetStates(test.states) + err := p.Init(*states) assert.NoError(t, err) assert.Equal(t, test.count, p.Prospector.states.Count()) } diff --git a/filebeat/prospector/prospector_stdin.go b/filebeat/prospector/prospector_stdin.go index 10b671d6b73..9a574a2a801 100644 --- a/filebeat/prospector/prospector_stdin.go +++ b/filebeat/prospector/prospector_stdin.go @@ -29,7 +29,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) { return prospectorer, nil } -func (p *ProspectorStdin) Init(states []file.State) error { +func (p *ProspectorStdin) Init(states file.States) error { p.started = false return nil } diff --git a/filebeat/prospector/prospector_test.go b/filebeat/prospector/prospector_test.go index 096daf27a19..b07d37ec1cd 100644 --- a/filebeat/prospector/prospector_test.go +++ b/filebeat/prospector/prospector_test.go @@ -17,7 +17,9 @@ func TestProspectorInitInputTypeLogError(t *testing.T) { config: prospectorConfig{}, } - err := prospector.Init([]file.State{}) + states := file.NewStates() + states.SetStates([]file.State{}) + err := prospector.Init(*states) // Error should be returned because no path is set assert.Error(t, err) }