From 36a81843f6c4e7ea2fd2bbffac23bffec34be883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 21 May 2020 15:18:26 +0200 Subject: [PATCH 01/18] Add initial support for multiple file identity trackers --- .../config/filebeat.inputs.reference.yml.tmpl | 4 + .../inputs/input-common-file-options.asciidoc | 43 ++++ filebeat/docs/inputs/input-log.asciidoc | 45 ++++ filebeat/filebeat.reference.yml | 4 + filebeat/input/file/file.go | 6 +- filebeat/input/file/file_test.go | 63 ------ filebeat/input/file/identifier.go | 213 ++++++++++++++++++ filebeat/input/file/identifier_test.go | 197 ++++++++++++++++ filebeat/input/file/state.go | 68 ++---- filebeat/input/file/states.go | 20 +- .../input/file/testdata/identifier_marker | 1 + filebeat/input/log/config.go | 97 ++++---- filebeat/input/log/config_test.go | 2 +- filebeat/input/log/harvester.go | 2 +- filebeat/input/log/input.go | 74 +++--- filebeat/registrar/registrar.go | 17 -- filebeat/tests/system/test_input.py | 111 +++++++++ x-pack/filebeat/filebeat.reference.yml | 4 + 18 files changed, 752 insertions(+), 219 deletions(-) delete mode 100644 filebeat/input/file/file_test.go create mode 100644 filebeat/input/file/identifier.go create mode 100644 filebeat/input/file/identifier_test.go create mode 100644 filebeat/input/file/testdata/identifier_marker diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index ae4816f4f82b..310c0fe4abca 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -50,6 +50,10 @@ filebeat.inputs: # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: ['.gz$'] + # Method to determine if two files are the same or not. By default + # the Beat considers two files the same if their inode and device id are the same. + #file_identity.inode_deviceid: ~ + # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: diff --git a/filebeat/docs/inputs/input-common-file-options.asciidoc b/filebeat/docs/inputs/input-common-file-options.asciidoc index 1947e7cdda76..be64eb20ad2e 100644 --- a/filebeat/docs/inputs/input-common-file-options.asciidoc +++ b/filebeat/docs/inputs/input-common-file-options.asciidoc @@ -129,6 +129,10 @@ file is renamed or moved in such a way that it's no longer matched by the file patterns specified for the , the file will not be picked up again. {beatname_uc} will not finish reading the file. +Do not use this option when `path` based `file_identity` is configured. It does +not make sense to enable the option, as Filebeat cannot detect renames using +path names as unique identifiers. + WINDOWS: If your Windows log rotation system shows errors because it can't rotate the files, you should enable this option. @@ -397,3 +401,42 @@ file that hasn't been harvested for a longer period of time. This configuration option applies per input. You can use this option to indirectly set higher priorities on certain inputs by assigning a higher limit of harvesters. + +[float] +===== `file_identity` + +Different `file_identity` methods can be configured to suit the +environment you are collecting log messages from. + + +*`inode_deviceid`*:: The default behaviour of {beatname_uc} is to differentiate +between files using their inodes and device ids. + +[source,yaml] +---- +file_identity.inode_deviceid: ~ +---- + +*`path`*:: To identify files based on their paths use this strategy. + +WARNING: Only use this strategy if your log files are rotated to a folder +outside of the scope of your input or not at all. Otherwise you end up +with duplicated events. + +WARNING: This strategy does not support renaming files. +If an input file is renamed, {beatname_uc} will read it again if the new path +matches the settings of the input. + +[source,yaml] +---- +file_identity.path: ~ +---- + +*`inode_marker`*:: If the device id changes from time to time, you must use +this method to tell apart files. Set the location of the marker file the following way: + +[source,yaml] +---- +file_identity.inode_marker.path: /logs/.filebeat-marker +---- + diff --git a/filebeat/docs/inputs/input-log.asciidoc b/filebeat/docs/inputs/input-log.asciidoc index 95670734b02d..1837cf79768f 100644 --- a/filebeat/docs/inputs/input-log.asciidoc +++ b/filebeat/docs/inputs/input-log.asciidoc @@ -57,6 +57,48 @@ multiple input sections: IMPORTANT: Make sure a file is not defined more than once across all inputs because this can lead to unexpected behaviour. +[[file-identity]] +==== Reading files on network shares and cloud providers + +By default, {beatname_uc} identifies files based on their inodes and +device IDs. However, on network shares and cloud providers these +values might change during the lifetime of the file. If this happens +{beatname_uc} thinks that file is new and resends the whole content +of the file. To solve this problem you can configure `file_identity` option. Possible +values are besides the default `inode_deviceid` are `path` and `inode_marker`. + +Selecting `path` instructs {beatname_uc} to identify files based on their +paths. This is a quick way to aviod rereading files if inode and device ids +might change. However, keep in mind if the files are rotated (renamed), they +are going to be reread and resent. + +The option `inode_marker` can be used if the inodes stay the same even if +the device id is changed. You should choose this method if your files are +rotated instead of `path` if possible. You have to configure a marker file +readable by {beatname_uc} and set the path in the option `path` of `inode_marker`. + +The content of this file must be unique to the device. You can put the +UUID of the device or mountpoint where the input is stored. The following +example oneliner generates a hidden marker file for the selected mountpoint `/logs`: + +["source","sh",subs="attributes"] +---- +$ lsblk -o MOUNTPOINT,UUD | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker +---- + +To set the generated file as a marker for `file_identity` you should configure +the input the following way: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: log + paths: + - /logs/*.log + file_identity.inode_marker.path: /logs/.filebeat-marker +---- + + [[rotating-logs]] ==== Reading from rotating logs @@ -66,6 +108,9 @@ a pattern that matches the file you want to harvest and all of its rotated files. Also make sure your log rotation strategy prevents lost or duplicate messages. For more information, see <>. +Furthermore, do not use `path` method for `file_identity` to avoid duplication +of the rotated log messages. + [id="{beatname_lc}-input-{type}-options"] ==== Configuration options diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index e57e9cbfd43d..decc9d584e5e 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -437,6 +437,10 @@ filebeat.inputs: # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: ['.gz$'] + # Method to determine if two files are the same or not. By default + # the Beat considers two files the same if their inode and device id are the same. + #file_identity.inode_deviceid: ~ + # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: diff --git a/filebeat/input/file/file.go b/filebeat/input/file/file.go index 676a2d5cfcb6..963a1015fb8c 100644 --- a/filebeat/input/file/file.go +++ b/filebeat/input/file/file.go @@ -30,12 +30,8 @@ type File struct { State *State } -// Checks if the two files are the same. -func (f *File) IsSameFile(f2 *File) bool { - return os.SameFile(f.FileInfo, f2.FileInfo) -} - // IsSameFile checks if the given File path corresponds with the FileInfo given +// It is used to check if the file has been renamed. func IsSameFile(path string, info os.FileInfo) bool { fileInfo, err := os.Stat(path) diff --git a/filebeat/input/file/file_test.go b/filebeat/input/file/file_test.go deleted file mode 100644 index 1e2bc94d4bfc..000000000000 --- a/filebeat/input/file/file_test.go +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// +build !integration - -package file - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestIsSameFile(t *testing.T) { - absPath, err := filepath.Abs("../../tests/files/") - - assert.NotNil(t, absPath) - assert.Nil(t, err) - - fileInfo1, err := os.Stat(absPath + "/logs/test.log") - fileInfo2, err := os.Stat(absPath + "/logs/system.log") - - assert.Nil(t, err) - assert.NotNil(t, fileInfo1) - assert.NotNil(t, fileInfo2) - - file1 := &File{ - FileInfo: fileInfo1, - } - - file2 := &File{ - FileInfo: fileInfo2, - } - - file3 := &File{ - FileInfo: fileInfo2, - } - - assert.False(t, file1.IsSameFile(file2)) - assert.False(t, file2.IsSameFile(file1)) - - assert.True(t, file1.IsSameFile(file1)) - assert.True(t, file2.IsSameFile(file2)) - - assert.True(t, file3.IsSameFile(file2)) - assert.True(t, file2.IsSameFile(file3)) -} diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go new file mode 100644 index 000000000000..fbe119d5af35 --- /dev/null +++ b/filebeat/input/file/identifier.go @@ -0,0 +1,213 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package file + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/mitchellh/hashstructure" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + inodeDeviceIDName = "inode_deviceid" + pathName = "path" + inodeMarkerName = "inode_marker" + + DefaultIdentifierName = inodeDeviceIDName + identitySep = "::" +) + +var ( + identifierFactories = map[string]IdentifierFactory{ + inodeDeviceIDName: newINodeDeviceIdentifier, + pathName: newPathIdentifier, + inodeMarkerName: newINodeMarkerIdentifier, + } +) + +type IdentifierFactory func(*common.Config) (StateIdentifier, error) + +// StateIdentifier generates an ID for a State. +type StateIdentifier interface { + // GenerateID generates and returns the ID of the state + GenerateID(*State) + // Name return the name of the identifier method for comparison. + Name() string +} + +// NewStateIdentifier creates a new state identifier for a log input. +func NewStateIdentifier(ns *common.ConfigNamespace) (StateIdentifier, error) { + if ns == nil { + return newINodeDeviceIdentifier(nil) + } + + identifierType := ns.Name() + f, ok := identifierFactories[identifierType] + if !ok { + return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) + } + + return f(ns.Config()) +} + +type inodeDeviceIdentifier struct { + name string +} + +func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) { + return &inodeDeviceIdentifier{ + name: inodeDeviceIDName, + }, nil +} + +func (i *inodeDeviceIdentifier) GenerateID(s *State) { + stateID := i.name + identitySep + s.FileStateOS.String() + if len(s.Meta) == 0 { + s.Id = stateID + } else { + s.Id = genIDWithHash(s.Meta, stateID) + } + s.IdentifierName = i.name +} + +func (i *inodeDeviceIdentifier) Name() string { + return i.name +} + +type pathIdentifier struct { + name string +} + +func newPathIdentifier(_ *common.Config) (StateIdentifier, error) { + return &pathIdentifier{ + name: pathName, + }, nil +} + +func (p *pathIdentifier) GenerateID(s *State) { + stateID := p.name + identitySep + s.Source + if len(s.Meta) == 0 { + s.Id = stateID + } else { + s.Id = genIDWithHash(s.Meta, stateID) + } + s.IdentifierName = p.name +} + +func (p *pathIdentifier) Name() string { + return p.name +} + +type inodeMarkerIdentifier struct { + log *logp.Logger + name string + markerPath string + + markerFileLastModifitaion time.Time + markerTxt string +} + +func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { + var config struct { + MarkerPath string `config:"path" validate:"required"` + } + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err) + } + + fi, err := os.Stat(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err) + } + markerContent, err := ioutil.ReadFile(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err) + } + return &inodeMarkerIdentifier{ + log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), + name: inodeMarkerName, + markerPath: config.MarkerPath, + markerFileLastModifitaion: fi.ModTime(), + markerTxt: string(markerContent), + }, nil +} + +func (i *inodeMarkerIdentifier) markerContents() string { + f, err := os.Open(i.markerPath) + if err != nil { + i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err) + return "" + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err) + return "" + } + if i.markerFileLastModifitaion.Before(fi.ModTime()) { + contents, err := ioutil.ReadFile(i.markerPath) + if err != nil { + i.log.Errorf("Error while reading contents of marker file: %v", err) + return "" + } + i.markerTxt = string(contents) + } + + return i.markerTxt +} + +func (i *inodeMarkerIdentifier) GenerateID(s *State) { + m := i.markerContents() + + fileID := fmt.Sprintf("%s%s%d-%s", i.name, identitySep, s.FileStateOS.Inode, m) + if len(s.Meta) == 0 { + s.Id = fileID + } else { + s.Id = genIDWithHash(s.Meta, fileID) + } + s.IdentifierName = i.name +} + +func (i *inodeMarkerIdentifier) Name() string { + return i.name +} + +func genIDWithHash(meta map[string]string, fileID string) string { + hashValue, _ := hashstructure.Hash(meta, nil) + var hashBuf [17]byte + hash := strconv.AppendUint(hashBuf[:0], hashValue, 16) + hash = append(hash, '-') + + var b strings.Builder + b.Grow(len(hash) + len(fileID)) + b.Write(hash) + b.WriteString(fileID) + + return b.String() +} diff --git a/filebeat/input/file/identifier_test.go b/filebeat/input/file/identifier_test.go new file mode 100644 index 000000000000..f4fa56fc51c7 --- /dev/null +++ b/filebeat/input/file/identifier_test.go @@ -0,0 +1,197 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package file + +import ( + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" +) + +type stateTestCase struct { + states [2]State + isSame bool +} + +func TestINodeDeviceIdentifier(t *testing.T) { + tests := map[string]stateTestCase{ + "two states poiting to the same file": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + }, + true, + }, + "two states poiting to different files": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 2, Device: 1}, + }, + }, + false, + }, + } + + identifier, _ := newINodeDeviceIdentifier(nil) + for name, test := range tests { + test := test + for i := 0; i < len(test.states); i++ { + identifier.GenerateID(&test.states[i]) + } + + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, isSame, test.isSame) + }) + } +} + +func TestPathIdentifier(t *testing.T) { + tests := map[string]stateTestCase{ + "two states poiting to the same file": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + }, + true, + }, + "two states poiting to different files": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 2, Device: 1}, + }, + }, + false, + }, + } + + identifier, _ := newPathIdentifier(nil) + for name, test := range tests { + test := test + for i := 0; i < len(test.states); i++ { + identifier.GenerateID(&test.states[i]) + } + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, isSame, test.isSame) + }) + } +} + +func TestInodeMarkerIdentifier(t *testing.T) { + tests := map[string]stateTestCase{ + "two states poiting to the same file i.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + }, + true, + }, + "two states poiting to the same file ii.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 2}, + }, + }, + true, + }, + "two states poiting to different files i.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 2, Device: 1}, + }, + }, + false, + }, + "two states poiting to different files ii.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 2, Device: 3}, + }, + }, + false, + }, + } + + identifier := newMockInodeMarkerIdentifier() + for name, test := range tests { + test := test + for i := 0; i < len(test.states); i++ { + identifier.GenerateID(&test.states[i]) + } + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, isSame, test.isSame) + }) + } +} + +func newMockInodeMarkerIdentifier() StateIdentifier { + cfg := common.MustNewConfigFrom(map[string]string{"path": filepath.Join("testdata", "identifier_marker")}) + i, err := newINodeMarkerIdentifier(cfg) + fmt.Println(err) + return i +} diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index ef255243b4c1..857da3e4f38a 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -19,35 +19,33 @@ package file import ( "os" - "strconv" - "strings" "time" - "github.com/mitchellh/hashstructure" - "github.com/elastic/beats/v7/libbeat/common/file" ) // State is used to communicate the reading state of a file type State struct { - Id string `json:"-" struct:"-"` // local unique id to make comparison more efficient - Finished bool `json:"-" struct:"-"` // harvester state - Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info - Source string `json:"source" struct:"source"` - Offset int64 `json:"offset" struct:"offset"` - Timestamp time.Time `json:"timestamp" struct:"timestamp"` - TTL time.Duration `json:"ttl" struct:"ttl"` - Type string `json:"type" struct:"type"` - Meta map[string]string `json:"meta" struct:"meta,omitempty"` - FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"` + Id string `json:"id" struct:"id"` + Finished bool `json:"-" struct:"-"` // harvester state + Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info + Source string `json:"source" struct:"source"` + Offset int64 `json:"offset" struct:"offset"` + Timestamp time.Time `json:"timestamp" struct:"timestamp"` + TTL time.Duration `json:"ttl" struct:"ttl"` + Type string `json:"type" struct:"type"` + Meta map[string]string `json:"meta" struct:"meta,omitempty"` + FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"` + IdentifierName string `json:"identifier_name" struct:"identifier_name"` } // NewState creates a new file state -func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State { +func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string, identifier StateIdentifier) State { if len(meta) == 0 { meta = nil } - return State{ + + s := State{ Fileinfo: fileInfo, Source: path, Finished: false, @@ -57,43 +55,13 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin Type: t, Meta: meta, } -} - -// ID returns a unique id for the state as a string -func (s *State) ID() string { - // Generate id on first request. This is needed as id is not set when converting back from json - if s.Id == "" { - if len(s.Meta) == 0 { - s.Id = s.FileStateOS.String() - } else { - hashValue, _ := hashstructure.Hash(s.Meta, nil) - var hashBuf [17]byte - hash := strconv.AppendUint(hashBuf[:0], hashValue, 16) - hash = append(hash, '-') - - fileID := s.FileStateOS.String() - var b strings.Builder - b.Grow(len(hash) + len(fileID)) - b.Write(hash) - b.WriteString(fileID) + identifier.GenerateID(&s) - s.Id = b.String() - } - } - - return s.Id + return s } -// IsEqual compares the state to an other state supporting stringer based on the unique string +// IsEqual checks if the two states point to the same file. func (s *State) IsEqual(c *State) bool { - return s.ID() == c.ID() -} - -// IsEmpty returns true if the state is empty -func (s *State) IsEmpty() bool { - return s.FileStateOS == file.StateOS{} && - s.Source == "" && - len(s.Meta) == 0 && - s.Timestamp.IsZero() + return s.Id == c.Id } diff --git a/filebeat/input/file/states.go b/filebeat/input/file/states.go index 34704b41dba1..48cf338f80f9 100644 --- a/filebeat/input/file/states.go +++ b/filebeat/input/file/states.go @@ -55,7 +55,7 @@ func (s *States) UpdateWithTs(newState State, ts time.Time) { s.Lock() defer s.Unlock() - id := newState.ID() + id := newState.Id index := s.findPrevious(id) newState.Timestamp = ts @@ -74,13 +74,20 @@ func (s *States) UpdateWithTs(newState State, ts time.Time) { func (s *States) FindPrevious(newState State) State { s.RLock() defer s.RUnlock() - i := s.findPrevious(newState.ID()) + i := s.findPrevious(newState.Id) if i < 0 { return State{} } return s.states[i] } +func (s *States) IsNew(state State) bool { + s.RLock() + defer s.RUnlock() + i := s.findPrevious(state.Id) + return i < 0 +} + // findPrevious returns the previous state for the file. // In case no previous state exists, index -1 is returned func (s *States) findPrevious(id string) int { @@ -120,17 +127,16 @@ func (s *States) CleanupWith(fn func(string)) (int, int) { continue } - id := state.ID() - delete(s.idx, id) + delete(s.idx, state.Id) if fn != nil { - fn(id) + fn(state.Id) } logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL) L-- if L != i { s.states[i] = s.states[L] - s.idx[s.states[i].ID()] = i + s.idx[s.states[i].Id] = i } } else { i++ @@ -172,7 +178,7 @@ func (s *States) SetStates(states []State) { // create new index s.idx = map[string]int{} for i := range states { - s.idx[states[i].ID()] = i + s.idx[states[i].Id] = i } } diff --git a/filebeat/input/file/testdata/identifier_marker b/filebeat/input/file/testdata/identifier_marker new file mode 100644 index 000000000000..2effed19113a --- /dev/null +++ b/filebeat/input/file/testdata/identifier_marker @@ -0,0 +1 @@ +1234-1234-1234-1234 diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index 835358b4e66d..c5f9f2049da9 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -27,6 +27,7 @@ import ( cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/logp" @@ -35,43 +36,6 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readjson" ) -var ( - defaultConfig = config{ - // Common - ForwarderConfig: harvester.ForwarderConfig{ - Type: cfg.DefaultType, - }, - CleanInactive: 0, - - // Input - Enabled: true, - IgnoreOlder: 0, - ScanFrequency: 10 * time.Second, - CleanRemoved: true, - HarvesterLimit: 0, - Symlinks: false, - TailFiles: false, - ScanSort: "", - ScanOrder: "asc", - RecursiveGlob: true, - - // Harvester - BufferSize: 16 * humanize.KiByte, - MaxBytes: 10 * humanize.MiByte, - LineTerminator: readfile.AutoLineTerminator, - LogConfig: LogConfig{ - Backoff: 1 * time.Second, - BackoffFactor: 2, - MaxBackoff: 10 * time.Second, - CloseInactive: 5 * time.Minute, - CloseRemoved: true, - CloseRenamed: false, - CloseEOF: false, - CloseTimeout: 0, - }, - } -) - type config struct { harvester.ForwarderConfig `config:",inline"` LogConfig `config:",inline"` @@ -81,16 +45,17 @@ type config struct { CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` // Input - Enabled bool `config:"enabled"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - RecursiveGlob bool `config:"recursive_glob.enabled"` + Enabled bool `config:"enabled"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + RecursiveGlob bool `config:"recursive_glob.enabled"` + FileIdentity *common.ConfigNamespace `config:"file_identity"` // Harvester BufferSize int `config:"harvester_buffer_size"` @@ -147,6 +112,44 @@ var ValidScanSort = map[string]struct{}{ ScanSortFilename: {}, } +func defaultConfig() config { + return config{ + // Common + ForwarderConfig: harvester.ForwarderConfig{ + Type: cfg.DefaultType, + }, + CleanInactive: 0, + + // Input + Enabled: true, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, + CleanRemoved: true, + HarvesterLimit: 0, + Symlinks: false, + TailFiles: false, + ScanSort: "", + ScanOrder: "asc", + RecursiveGlob: true, + FileIdentity: nil, + + // Harvester + BufferSize: 16 * humanize.KiByte, + MaxBytes: 10 * humanize.MiByte, + LineTerminator: readfile.AutoLineTerminator, + LogConfig: LogConfig{ + Backoff: 1 * time.Second, + BackoffFactor: 2, + MaxBackoff: 10 * time.Second, + CloseInactive: 5 * time.Minute, + CloseRemoved: true, + CloseRenamed: false, + CloseEOF: false, + CloseTimeout: 0, + }, + } +} + func (c *config) Validate() error { // DEPRECATED 6.0.0: warning is already outputted on input level if c.InputType != "" { diff --git a/filebeat/input/log/config_test.go b/filebeat/input/log/config_test.go index 7406014d049e..f8160a830f76 100644 --- a/filebeat/input/log/config_test.go +++ b/filebeat/input/log/config_test.go @@ -59,7 +59,7 @@ func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) { func TestCleanOlderIgnoreOlder(t *testing.T) { config := config{ - CleanInactive: 10*time.Hour + defaultConfig.ScanFrequency + 1*time.Second, + CleanInactive: 10*time.Hour + defaultConfig().ScanFrequency + 1*time.Second, IgnoreOlder: 10 * time.Hour, Paths: []string{"hello"}, ForwarderConfig: harvester.ForwarderConfig{ diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 94162ebfec97..95043e94237a 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -132,7 +132,7 @@ func NewHarvester( } h := &Harvester{ - config: defaultConfig, + config: defaultConfig(), state: state, states: states, publishState: publishState, diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 032f5c11c92f..fa498c91442b 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -60,16 +60,17 @@ func init() { // Input contains the input and its config type Input struct { - cfg *common.Config - config config - states *file.States - harvesters *harvester.Registry - outlet channel.Outleter - stateOutlet channel.Outleter - done chan struct{} - numHarvesters atomic.Uint32 - meta map[string]string - stopOnce sync.Once + cfg *common.Config + config config + states *file.States + harvesters *harvester.Registry + outlet channel.Outleter + stateOutlet channel.Outleter + done chan struct{} + numHarvesters atomic.Uint32 + meta map[string]string + stopOnce sync.Once + fileStateIdentifier file.StateIdentifier } // NewInput instantiates a new Log @@ -85,7 +86,7 @@ func NewInput( } } - inputConfig := defaultConfig + inputConfig := defaultConfig() if err := cfg.Unpack(&inputConfig); err != nil { return nil, err @@ -101,6 +102,11 @@ func NewInput( return nil, fmt.Errorf("each input must have at least one path defined") } + identifier, err := file.NewStateIdentifier(inputConfig.FileIdentity) + if err != nil { + return nil, fmt.Errorf("failed to initialize file identity generator: %+v", err) + } + // Note: underlying output. // The input and harvester do have different requirements // on the timings the outlets must be closed/unblocked. @@ -125,14 +131,15 @@ func NewInput( } p := &Input{ - config: inputConfig, - cfg: cfg, - harvesters: harvester.NewRegistry(), - outlet: out, - stateOutlet: stateOut, - states: file.NewStates(), - done: context.Done, - meta: meta, + config: inputConfig, + cfg: cfg, + harvesters: harvester.NewRegistry(), + outlet: out, + stateOutlet: stateOut, + states: file.NewStates(), + done: context.Done, + meta: meta, + fileStateIdentifier: identifier, } // Create empty harvester to check if configs are fine @@ -171,6 +178,11 @@ func (p *Input) loadStates(states []file.State) error { return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) } + // Convert state to current identifier if different + if state.IdentifierName != p.fileStateIdentifier.Name() { + p.fileStateIdentifier.GenerateID(&state) + } + // Update input states and send new states to registry err := p.updateState(state) if err != nil { @@ -225,10 +237,14 @@ func (p *Input) Run() { } } else { // Check if existing source on disk and state are the same. Remove if not the case. - newState := file.NewState(stat, state.Source, p.config.Type, p.meta) - if !newState.FileStateOS.IsSame(state.FileStateOS) { + newState := file.NewState(stat, state.Source, p.config.Type, p.meta, p.fileStateIdentifier) + if state.IdentifierName != newState.IdentifierName { + logp.Debug("input", "file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName) + p.fileStateIdentifier.GenerateID(&state) + } + if state.IsEqual(&newState) { p.removeState(state) - logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source) + logp.Debug("input", "Remove state of file as its identity has changed: %s", state.Source) } } } @@ -418,7 +434,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) { } logp.Debug("input", "Check file for harvesting: %s", absolutePath) // Create new state for comparison - newState := file.NewState(info, absolutePath, p.config.Type, p.meta) + newState := file.NewState(info, absolutePath, p.config.Type, p.meta, p.fileStateIdentifier) return newState, nil } @@ -476,11 +492,11 @@ func (p *Input) scan() { } // Load last state - lastState := p.states.FindPrevious(newState) + isNewState := p.states.IsNew(newState) // Ignores all files which fall under ignore_older if p.isIgnoreOlder(newState) { - err := p.handleIgnoreOlder(lastState, newState) + err := p.handleIgnoreOlder(isNewState, newState) if err != nil { logp.Err("Updating ignore_older state error: %s", err) } @@ -488,7 +504,7 @@ func (p *Input) scan() { } // Decides if previous state exists - if lastState.IsEmpty() { + if isNewState { logp.Debug("input", "Start harvester for new file: %s", newState.Source) err := p.startHarvester(newState, 0) if err == errHarvesterLimit { @@ -499,6 +515,7 @@ func (p *Input) scan() { logp.Err(harvesterErrMsg, newState.Source, err) } } else { + lastState := p.states.FindPrevious(newState) p.harvestExistingFile(newState, lastState) } } @@ -566,10 +583,11 @@ func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { // handleIgnoreOlder handles states which fall under ignore older // Based on the state information it is decided if the state information has to be updated or not -func (p *Input) handleIgnoreOlder(lastState, newState file.State) error { +func (p *Input) handleIgnoreOlder(isNewState bool, newState file.State) error { logp.Debug("input", "Ignore file because ignore_older reached: %s", newState.Source) - if !lastState.IsEmpty() { + if !isNewState { + lastState := p.states.FindPrevious(newState) if !lastState.Finished { logp.Info("File is falling under ignore_older before harvesting is finished. Adjust your close_* settings: %s", newState.Source) } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 118f7c276db0..c9d6a5353d0a 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -18,7 +18,6 @@ package registrar import ( - "fmt" "strings" "sync" "time" @@ -110,22 +109,6 @@ func (r *Registrar) loadStates() error { return nil } -func (r *Registrar) Start() error { - // Load the previous log file locations now, for use in input - err := r.loadStates() - if err != nil { - return fmt.Errorf("Error loading state: %v", err) - } - - r.wg.Add(1) - go func() { - defer r.wg.Done() - r.Run() - }() - - return nil -} - // Stop stops the registry. It waits until Run function finished. func (r *Registrar) Stop() { r.log.Info("Stopping Registrar") diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index 684f4f852af0..f3c114a4ed72 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -682,3 +682,114 @@ def test_input_processing_pipeline_disable_host(self): output = self.read_output() assert "host.name" not in output[0] + + def test_path_based_identity_tracking(self): + """ + Renamed files are picked up again as the path of the file has changed. + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + close_eof="true", + input_raw=" file_identity.path: ~", + ) + + testfile = self.working_dir + "/log/test.log" + self.__write_hello_word_to_test_input_file(testfile) + + proc = self.start_beat() + + # wait until the file is picked up + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + time.sleep(2) + renamedfile = self.working_dir + "/log/renamed.log" + os.rename(testfile, renamedfile) + + # wait until the renamed file is picked up + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + renamedfile)) + + # wait until the both messages are received by the output + self.wait_until(lambda: self.output_has(lines=2)) + proc.check_kill_and_wait() + + # assert that renaming of the file went undetected + assert not self.log_contains("File rename was detected:" + testfile + " -> " + renamedfile) + + def test_inode_marker_based_identity_tracking(self): + """ + File is picked up again if the contents of the marker file changes. + """ + + marker_location = self.working_dir + "/marker" + with open(marker_location, 'w') as m: + m.write("very-unique-string") + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + close_eof="true", + input_raw=" file_identity.inode_marker.path: " + marker_location, + ) + + testfile = self.working_dir + "/log/test.log" + self.__write_hello_word_to_test_input_file(testfile) + + proc = self.start_beat() + + # wait until the file is picked up + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + # change the ID in the marker file to simulate a new file + with open(marker_location, 'w') as m: + m.write("different-very-unique-id") + + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + # wait until the both messages are received by the output + self.wait_until(lambda: self.output_has(lines=2)) + proc.check_kill_and_wait() + + def test_inode_marker_based_identity_tracking_to_path_based(self): + """ + File reading can be continued after file_identity is changed. + """ + + marker_location = self.working_dir + "/marker" + with open(marker_location, 'w') as m: + m.write("very-unique-string") + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + input_raw=" file_identity.inode_marker.path: " + marker_location, + ) + + testfile = self.working_dir + "/log/test.log" + self.__write_hello_word_to_test_input_file(testfile) + + proc = self.start_beat() + + # wait until the file is picked up + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + self.wait_until(lambda: self.output_has(lines=1)) + proc.check_kill_and_wait() + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + rotateonstartup="false", + input_raw=" file_identity.path: ~", + ) + + with open(testfile, 'w+') as f: + f.write("hello world again\n") + + proc = self.start_beat() + + # on startup output is rotated + self.wait_until(lambda: self.output_has(lines=1, output_file="output/filebeat.1")) + self.wait_until(lambda: self.output_has(lines=1)) + proc.check_kill_and_wait() + + def __write_hello_word_to_test_input_file(self, testfile): + os.mkdir(self.working_dir + "/log/") + with open(testfile, 'w') as f: + f.write("hello world\n") diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 853eec3f8271..276476d83f52 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1545,6 +1545,10 @@ filebeat.inputs: # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: ['.gz$'] + # Method to determine if two files are the same or not. By default + # the Beat considers two files the same if their inode and device id are the same. + #file_identity.inode_deviceid: ~ + # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: From 80ab8c4bc5486fa1438d97c6c03e8a459d946168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 6 Jul 2020 17:08:14 +0200 Subject: [PATCH 02/18] fix log input tests --- filebeat/input/file/identifier.go | 6 ++++++ filebeat/input/log/input_other_test.go | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go index fbe119d5af35..1437a56e3204 100644 --- a/filebeat/input/file/identifier.go +++ b/filebeat/input/file/identifier.go @@ -211,3 +211,9 @@ func genIDWithHash(meta map[string]string, fileID string) string { return b.String() } + +// mockIdentifier is used for testing +type MockIdentifier struct{} + +func (m *MockIdentifier) GenerateID(s *State) { return } +func (m *MockIdentifier) Name() string { return "mock" } diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index e37b4d0c1f21..0910bd2b2919 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -147,8 +147,9 @@ func TestInit(t *testing.T) { config: config{ Paths: test.paths, }, - states: file.NewStates(), - outlet: TestOutlet{}, + states: file.NewStates(), + outlet: TestOutlet{}, + fileStateIdentifier: &file.MockIdentifier{}, } // Set states to finished From 1b31e96d0eab569c959bcd03a7c96d2b0165904e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 6 Jul 2020 17:14:36 +0200 Subject: [PATCH 03/18] add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6f219db6f7fb..f50dcd806478 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add experimental dataset sonicwall/firewall for Sonicwall Firewalls logs {pull}19713[19713] - Add experimental dataset squid/log for Squid Proxy Server logs {pull}19713[19713] - Add experimental dataset zscaler/zia for Zscaler Internet Access logs {pull}19713[19713] +- Add initial support for configurable file identity tracking. {pull}18748[18748] *Heartbeat* From 7b6204b11a1206d289c95dbd172f6624660b758c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 7 Jul 2020 13:56:58 +0200 Subject: [PATCH 04/18] support close_renamed && close_removed --- filebeat/input/log/harvester.go | 50 +++++++++++++++++++++++++++------ filebeat/input/log/log.go | 43 +--------------------------- 2 files changed, 43 insertions(+), 50 deletions(-) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 95043e94237a..737e2d1723d0 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -67,7 +67,6 @@ var ( ErrFileTruncate = errors.New("detected file being truncated") ErrRenamed = errors.New("file was renamed") - ErrRemoved = errors.New("file was removed") ErrInactive = errors.New("file inactive") ErrClosed = errors.New("reader closed") ) @@ -88,9 +87,10 @@ type Harvester struct { stopLock sync.Mutex // internal harvester state - state file.State - states *file.States - log *Log + state file.State + states *file.States + identifier file.StateIdentifier + log *Log // file reader pipeline reader reader.Reader @@ -122,6 +122,7 @@ func NewHarvester( config *common.Config, state file.State, states *file.States, + ider file.StateIdentifier, publishState func(file.State) bool, outletFactory OutletFactory, ) (*Harvester, error) { @@ -135,6 +136,7 @@ func NewHarvester( config: defaultConfig(), state: state, states: states, + identifier: ider, publishState: publishState, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, @@ -312,10 +314,6 @@ func (h *Harvester) Run() error { logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source) h.state.Offset = 0 filesTruncated.Add(1) - case ErrRemoved: - logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) - case ErrRenamed: - logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) case ErrClosed: logp.Info("Reader was closed: %s. Closing.", h.state.Source) case io.EOF: @@ -336,6 +334,11 @@ func (h *Harvester) Run() error { // This is important in case sending is not successful so on shutdown // the old offset is reported state := h.getState() + + if h.shouldBeClosed(state) { + return nil + } + startingOffset := state.Offset state.Offset += int64(message.Bytes) @@ -354,6 +357,35 @@ func (h *Harvester) Run() error { } } +// shouldBeClosed checks if the underlying file has been renamed or removed. +// then decides based on close_renamed and close_removed if the harverster should be stopped. +func (h *Harvester) shouldBeClosed(state *file.State) bool { + if !h.config.CloseRenamed && !h.config.CloseRemoved { + return false + } + + // check if identifier of state has changed + if state.IsEqual(h.state) { + if h.config.CloseRenamed { + info, err := h.source.Stat() + if err != nil { + logp.Err("Unexpected error reading from %s; error: %s", h.source.Name(), err) + return false + } + if state.Source != info.Name() { + logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) + return true + } + } + } else { + if h.config.CloseRemoved { + logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) + return true + } + } + return false +} + func (h *Harvester) monitorFileSize() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -584,6 +616,8 @@ func (h *Harvester) getState() file.State { // refreshes the values in State with the values from the harvester itself state.FileStateOS = file_helper.GetOSState(h.state.Fileinfo) + h.identifier.GenerateID(&state) + return state } diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index 1a89c5bc8d14..efd95162b2a6 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -23,7 +23,6 @@ import ( "time" "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -35,6 +34,7 @@ type Log struct { lastTimeRead time.Time backoff time.Duration done chan struct{} + id string } // NewLog creates a new log instance to read log sources @@ -73,11 +73,6 @@ func (f *Log) Read(buf []byte) (int, error) { default: } - err := f.checkFileDisappearedErrors() - if err != nil { - return totalN, err - } - n, err := f.fs.Read(buf) if n > 0 { f.offset += int64(n) @@ -154,42 +149,6 @@ func (f *Log) errorChecks(err error) error { return nil } -// checkFileDisappearedErrors checks if the log file has been removed or renamed (rotated). -func (f *Log) checkFileDisappearedErrors() error { - // No point doing a stat call on the file if configuration options are - // not enabled - if !f.config.CloseRenamed && !f.config.CloseRemoved { - return nil - } - - // Refetch fileinfo to check if the file was renamed or removed. - // Errors if the file was removed/rotated after reading and before - // calling the stat function - info, statErr := f.fs.Stat() - if statErr != nil { - logp.Err("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr) - return statErr - } - - if f.config.CloseRenamed { - // Check if the file can still be found under the same path - if !file.IsSameFile(f.fs.Name(), info) { - logp.Debug("harvester", "close_renamed is enabled and file %s has been renamed", f.fs.Name()) - return ErrRenamed - } - } - - if f.config.CloseRemoved { - // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 - if f.fs.Removed() { - logp.Debug("harvester", "close_removed is enabled and file %s has been removed", f.fs.Name()) - return ErrRemoved - } - } - - return nil -} - func (f *Log) wait() { // Wait before trying to read file again. File reached EOF. select { From ef8860d9446132c59cb526e04f243d020db76b09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 7 Jul 2020 14:30:47 +0200 Subject: [PATCH 05/18] change interface --- filebeat/input/file/identifier.go | 54 +++++++------------------- filebeat/input/file/identifier_test.go | 6 +-- filebeat/input/file/state.go | 2 +- filebeat/input/log/harvester.go | 6 +-- filebeat/input/log/input.go | 9 +++-- filebeat/input/stdin/input.go | 2 +- 6 files changed, 28 insertions(+), 51 deletions(-) diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go index 1437a56e3204..b495f3490569 100644 --- a/filebeat/input/file/identifier.go +++ b/filebeat/input/file/identifier.go @@ -53,10 +53,8 @@ type IdentifierFactory func(*common.Config) (StateIdentifier, error) // StateIdentifier generates an ID for a State. type StateIdentifier interface { - // GenerateID generates and returns the ID of the state - GenerateID(*State) - // Name return the name of the identifier method for comparison. - Name() string + // GenerateID generates and returns the ID of the state and its type + GenerateID(State) (id, identifierType string) } // NewStateIdentifier creates a new state identifier for a log input. @@ -84,18 +82,9 @@ func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) { }, nil } -func (i *inodeDeviceIdentifier) GenerateID(s *State) { +func (i *inodeDeviceIdentifier) GenerateID(s State) (id, identifierType string) { stateID := i.name + identitySep + s.FileStateOS.String() - if len(s.Meta) == 0 { - s.Id = stateID - } else { - s.Id = genIDWithHash(s.Meta, stateID) - } - s.IdentifierName = i.name -} - -func (i *inodeDeviceIdentifier) Name() string { - return i.name + return genIDWithHash(s.Meta, stateID), i.name } type pathIdentifier struct { @@ -108,18 +97,9 @@ func newPathIdentifier(_ *common.Config) (StateIdentifier, error) { }, nil } -func (p *pathIdentifier) GenerateID(s *State) { +func (p *pathIdentifier) GenerateID(s State) (id, identifierType string) { stateID := p.name + identitySep + s.Source - if len(s.Meta) == 0 { - s.Id = stateID - } else { - s.Id = genIDWithHash(s.Meta, stateID) - } - s.IdentifierName = p.name -} - -func (p *pathIdentifier) Name() string { - return p.name + return genIDWithHash(s.Meta, stateID), p.name } type inodeMarkerIdentifier struct { @@ -182,23 +162,18 @@ func (i *inodeMarkerIdentifier) markerContents() string { return i.markerTxt } -func (i *inodeMarkerIdentifier) GenerateID(s *State) { +func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { m := i.markerContents() - fileID := fmt.Sprintf("%s%s%d-%s", i.name, identitySep, s.FileStateOS.Inode, m) - if len(s.Meta) == 0 { - s.Id = fileID - } else { - s.Id = genIDWithHash(s.Meta, fileID) - } - s.IdentifierName = i.name -} - -func (i *inodeMarkerIdentifier) Name() string { - return i.name + stateID := fmt.Sprintf("%s%s%d-%s", i.name, identitySep, s.FileStateOS.Inode, m) + return genIDWithHash(s.Meta, stateID), i.name } func genIDWithHash(meta map[string]string, fileID string) string { + if len(meta) == 0 { + return fileID + } + hashValue, _ := hashstructure.Hash(meta, nil) var hashBuf [17]byte hash := strconv.AppendUint(hashBuf[:0], hashValue, 16) @@ -215,5 +190,4 @@ func genIDWithHash(meta map[string]string, fileID string) string { // mockIdentifier is used for testing type MockIdentifier struct{} -func (m *MockIdentifier) GenerateID(s *State) { return } -func (m *MockIdentifier) Name() string { return "mock" } +func (m *MockIdentifier) GenerateID(s State) (string, string) { return s.Id, "mock" } diff --git a/filebeat/input/file/identifier_test.go b/filebeat/input/file/identifier_test.go index f4fa56fc51c7..870e0106f2e2 100644 --- a/filebeat/input/file/identifier_test.go +++ b/filebeat/input/file/identifier_test.go @@ -67,7 +67,7 @@ func TestINodeDeviceIdentifier(t *testing.T) { for name, test := range tests { test := test for i := 0; i < len(test.states); i++ { - identifier.GenerateID(&test.states[i]) + test.states[i].Id, test.states[i].IdentifierName = identifier.GenerateID(test.states[i]) } t.Run(name, func(t *testing.T) { @@ -111,7 +111,7 @@ func TestPathIdentifier(t *testing.T) { for name, test := range tests { test := test for i := 0; i < len(test.states); i++ { - identifier.GenerateID(&test.states[i]) + test.states[i].Id, test.states[i].IdentifierName = identifier.GenerateID(test.states[i]) } t.Run(name, func(t *testing.T) { isSame := test.states[0].IsEqual(&test.states[1]) @@ -180,7 +180,7 @@ func TestInodeMarkerIdentifier(t *testing.T) { for name, test := range tests { test := test for i := 0; i < len(test.states); i++ { - identifier.GenerateID(&test.states[i]) + test.states[i].Id, test.states[i].IdentifierName = identifier.GenerateID(test.states[i]) } t.Run(name, func(t *testing.T) { isSame := test.states[0].IsEqual(&test.states[1]) diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 857da3e4f38a..b6b081b9ddf5 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -56,7 +56,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin Meta: meta, } - identifier.GenerateID(&s) + s.Id, s.IdentifierName = identifier.GenerateID(s) return s } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 737e2d1723d0..26b5a4a1aa18 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -335,7 +335,7 @@ func (h *Harvester) Run() error { // the old offset is reported state := h.getState() - if h.shouldBeClosed(state) { + if h.shouldBeClosed(&state) { return nil } @@ -365,7 +365,7 @@ func (h *Harvester) shouldBeClosed(state *file.State) bool { } // check if identifier of state has changed - if state.IsEqual(h.state) { + if state.IsEqual(&h.state) { if h.config.CloseRenamed { info, err := h.source.Stat() if err != nil { @@ -616,7 +616,7 @@ func (h *Harvester) getState() file.State { // refreshes the values in State with the values from the harvester itself state.FileStateOS = file_helper.GetOSState(h.state.Fileinfo) - h.identifier.GenerateID(&state) + state.Id, state.IdentifierName = h.identifier.GenerateID(state) return state } diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index fa498c91442b..bc3c63ff26e2 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -179,8 +179,10 @@ func (p *Input) loadStates(states []file.State) error { } // Convert state to current identifier if different - if state.IdentifierName != p.fileStateIdentifier.Name() { - p.fileStateIdentifier.GenerateID(&state) + newId, identifierName := p.fileStateIdentifier.GenerateID(state) + if state.IdentifierName != identifierName { + state.Id = newId + state.IdentifierName = identifierName } // Update input states and send new states to registry @@ -240,7 +242,7 @@ func (p *Input) Run() { newState := file.NewState(stat, state.Source, p.config.Type, p.meta, p.fileStateIdentifier) if state.IdentifierName != newState.IdentifierName { logp.Debug("input", "file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName) - p.fileStateIdentifier.GenerateID(&state) + state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state) } if state.IsEqual(&newState) { p.removeState(state) @@ -670,6 +672,7 @@ func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harveste p.cfg, state, p.states, + p.fileStateIdentifier, func(state file.State) bool { return p.stateOutlet.OnEvent(beat.Event{Private: state}) }, diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 0e8fbd0fc104..05e05d6fc733 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -89,7 +89,7 @@ func (p *Input) createHarvester(state file.State) (*log.Harvester, error) { // Each harvester gets its own copy of the outlet h, err := log.NewHarvester( p.cfg, - state, nil, nil, + state, nil, nil, nil, func() channel.Outleter { return p.outlet }, From 154bea2fb9c9a2323451f4050d46ab6d07b2db30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 7 Jul 2020 16:06:18 +0200 Subject: [PATCH 06/18] fix tests --- filebeat/input/log/harvester.go | 50 ++++--------------------- filebeat/input/log/input.go | 3 +- filebeat/input/log/log.go | 43 ++++++++++++++++++++- filebeat/input/stdin/input.go | 2 +- filebeat/tests/system/test_harvester.py | 2 +- 5 files changed, 53 insertions(+), 47 deletions(-) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 26b5a4a1aa18..95043e94237a 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -67,6 +67,7 @@ var ( ErrFileTruncate = errors.New("detected file being truncated") ErrRenamed = errors.New("file was renamed") + ErrRemoved = errors.New("file was removed") ErrInactive = errors.New("file inactive") ErrClosed = errors.New("reader closed") ) @@ -87,10 +88,9 @@ type Harvester struct { stopLock sync.Mutex // internal harvester state - state file.State - states *file.States - identifier file.StateIdentifier - log *Log + state file.State + states *file.States + log *Log // file reader pipeline reader reader.Reader @@ -122,7 +122,6 @@ func NewHarvester( config *common.Config, state file.State, states *file.States, - ider file.StateIdentifier, publishState func(file.State) bool, outletFactory OutletFactory, ) (*Harvester, error) { @@ -136,7 +135,6 @@ func NewHarvester( config: defaultConfig(), state: state, states: states, - identifier: ider, publishState: publishState, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, @@ -314,6 +312,10 @@ func (h *Harvester) Run() error { logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source) h.state.Offset = 0 filesTruncated.Add(1) + case ErrRemoved: + logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) + case ErrRenamed: + logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) case ErrClosed: logp.Info("Reader was closed: %s. Closing.", h.state.Source) case io.EOF: @@ -334,11 +336,6 @@ func (h *Harvester) Run() error { // This is important in case sending is not successful so on shutdown // the old offset is reported state := h.getState() - - if h.shouldBeClosed(&state) { - return nil - } - startingOffset := state.Offset state.Offset += int64(message.Bytes) @@ -357,35 +354,6 @@ func (h *Harvester) Run() error { } } -// shouldBeClosed checks if the underlying file has been renamed or removed. -// then decides based on close_renamed and close_removed if the harverster should be stopped. -func (h *Harvester) shouldBeClosed(state *file.State) bool { - if !h.config.CloseRenamed && !h.config.CloseRemoved { - return false - } - - // check if identifier of state has changed - if state.IsEqual(&h.state) { - if h.config.CloseRenamed { - info, err := h.source.Stat() - if err != nil { - logp.Err("Unexpected error reading from %s; error: %s", h.source.Name(), err) - return false - } - if state.Source != info.Name() { - logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) - return true - } - } - } else { - if h.config.CloseRemoved { - logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) - return true - } - } - return false -} - func (h *Harvester) monitorFileSize() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -616,8 +584,6 @@ func (h *Harvester) getState() file.State { // refreshes the values in State with the values from the harvester itself state.FileStateOS = file_helper.GetOSState(h.state.Fileinfo) - state.Id, state.IdentifierName = h.identifier.GenerateID(state) - return state } diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index bc3c63ff26e2..9bce8b0687a1 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -244,7 +244,7 @@ func (p *Input) Run() { logp.Debug("input", "file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName) state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state) } - if state.IsEqual(&newState) { + if !state.IsEqual(&newState) { p.removeState(state) logp.Debug("input", "Remove state of file as its identity has changed: %s", state.Source) } @@ -672,7 +672,6 @@ func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harveste p.cfg, state, p.states, - p.fileStateIdentifier, func(state file.State) bool { return p.stateOutlet.OnEvent(beat.Event{Private: state}) }, diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index efd95162b2a6..1a89c5bc8d14 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -23,6 +23,7 @@ import ( "time" "github.com/elastic/beats/v7/filebeat/harvester" + "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -34,7 +35,6 @@ type Log struct { lastTimeRead time.Time backoff time.Duration done chan struct{} - id string } // NewLog creates a new log instance to read log sources @@ -73,6 +73,11 @@ func (f *Log) Read(buf []byte) (int, error) { default: } + err := f.checkFileDisappearedErrors() + if err != nil { + return totalN, err + } + n, err := f.fs.Read(buf) if n > 0 { f.offset += int64(n) @@ -149,6 +154,42 @@ func (f *Log) errorChecks(err error) error { return nil } +// checkFileDisappearedErrors checks if the log file has been removed or renamed (rotated). +func (f *Log) checkFileDisappearedErrors() error { + // No point doing a stat call on the file if configuration options are + // not enabled + if !f.config.CloseRenamed && !f.config.CloseRemoved { + return nil + } + + // Refetch fileinfo to check if the file was renamed or removed. + // Errors if the file was removed/rotated after reading and before + // calling the stat function + info, statErr := f.fs.Stat() + if statErr != nil { + logp.Err("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr) + return statErr + } + + if f.config.CloseRenamed { + // Check if the file can still be found under the same path + if !file.IsSameFile(f.fs.Name(), info) { + logp.Debug("harvester", "close_renamed is enabled and file %s has been renamed", f.fs.Name()) + return ErrRenamed + } + } + + if f.config.CloseRemoved { + // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 + if f.fs.Removed() { + logp.Debug("harvester", "close_removed is enabled and file %s has been removed", f.fs.Name()) + return ErrRemoved + } + } + + return nil +} + func (f *Log) wait() { // Wait before trying to read file again. File reached EOF. select { diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 05e05d6fc733..0e8fbd0fc104 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -89,7 +89,7 @@ func (p *Input) createHarvester(state file.State) (*log.Harvester, error) { // Each harvester gets its own copy of the outlet h, err := log.NewHarvester( p.cfg, - state, nil, nil, nil, + state, nil, nil, func() channel.Outleter { return p.outlet }, diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index be2e4f42b8ff..a6e7004f8d06 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -79,7 +79,7 @@ def test_close_renamed(self): def test_close_removed(self): """ - Checks that a file is closed if removed + Checks that a file is closed if removed with inode_deviceid file identifier """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/test.log", From 37bc62827f241d61d8dd79ac60af48a6dc6d1492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 8 Jul 2020 13:12:49 +0200 Subject: [PATCH 07/18] follow-up changes --- filebeat/input/log/input.go | 2 ++ filebeat/registrar/migrate.go | 11 ++++++++--- filebeat/registrar/registrar.go | 19 ++++++++++++++++++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 9bce8b0687a1..2324fb2fe042 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -179,8 +179,10 @@ func (p *Input) loadStates(states []file.State) error { } // Convert state to current identifier if different + // and remove outdated state newId, identifierName := p.fileStateIdentifier.GenerateID(state) if state.IdentifierName != identifierName { + p.removeState(state) state.Id = newId state.IdentifierName = identifierName } diff --git a/filebeat/registrar/migrate.go b/filebeat/registrar/migrate.go index 4a76771878af..16e7b14744fa 100644 --- a/filebeat/registrar/migrate.go +++ b/filebeat/registrar/migrate.go @@ -340,10 +340,9 @@ func fixStates(states []file.State) []file.State { state := &states[i] fixState(state) - id := state.ID() - old, exists := idx[id] + old, exists := idx[state.Id] if !exists { - idx[id] = state + idx[state.Id] = state } else { mergeStates(old, state) // overwrite the entry in 'old' } @@ -364,10 +363,16 @@ func fixStates(states []file.State) []file.State { // fixState updates a read state to fullfil required invariantes: // - "Meta" must be nil if len(Meta) == 0 +// - "Id" must be initialized func fixState(st *file.State) { if len(st.Meta) == 0 { st.Meta = nil } + + if len(st.IdentifierName) == 0 { + identifier, _ := file.NewStateIdentifier(nil) + st.Id, st.IdentifierName = identifier.GenerateID(*st) + } } // resetStates sets all states to finished and disable TTL on restart diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index c9d6a5353d0a..0faa8a388903 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -18,6 +18,7 @@ package registrar import ( + "fmt" "strings" "sync" "time" @@ -109,6 +110,22 @@ func (r *Registrar) loadStates() error { return nil } +func (r *Registrar) Start() error { + // Load the previous log file locations now, for use in input + err := r.loadStates() + if err != nil { + return fmt.Errorf("error loading state: %v", err) + } + + r.wg.Add(1) + go func() { + defer r.wg.Done() + r.Run() + }() + + return nil +} + // Stop stops the registry. It waits until Run function finished. func (r *Registrar) Stop() { r.log.Info("Stopping Registrar") @@ -285,7 +302,7 @@ func readStatesFrom(store *statestore.Store) ([]file.State, error) { func writeStates(store *statestore.Store, states []file.State) error { for i := range states { - key := fileStatePrefix + states[i].ID() + key := fileStatePrefix + states[i].Id if err := store.Set(key, states[i]); err != nil { return err } From 00af6edba1d0095bafcdce268188470f9b93b1ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 8 Jul 2020 15:53:16 +0200 Subject: [PATCH 08/18] add warning about network shares --- filebeat/docs/inputs/input-log.asciidoc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/filebeat/docs/inputs/input-log.asciidoc b/filebeat/docs/inputs/input-log.asciidoc index 1837cf79768f..5f25c79a7be6 100644 --- a/filebeat/docs/inputs/input-log.asciidoc +++ b/filebeat/docs/inputs/input-log.asciidoc @@ -60,6 +60,11 @@ because this can lead to unexpected behaviour. [[file-identity]] ==== Reading files on network shares and cloud providers +:WARNING: Filebeat does not support reading from network shares and cloud providers. + +However, one of the limitations of these data sources can be mitigated +if you configure Filebeat adequately. + By default, {beatname_uc} identifies files based on their inodes and device IDs. However, on network shares and cloud providers these values might change during the lifetime of the file. If this happens From 89cf3954940295195b35a0ee7442a2c2c6c9c004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 8 Jul 2020 16:17:10 +0200 Subject: [PATCH 09/18] get inode-like info for identification --- filebeat/docs/inputs/input-log.asciidoc | 1 + filebeat/input/file/identifier.go | 2 +- filebeat/tests/system/test_input.py | 2 ++ libbeat/common/file/file_other.go | 5 +++++ libbeat/common/file/file_windows.go | 9 +++++++++ 5 files changed, 18 insertions(+), 1 deletion(-) diff --git a/filebeat/docs/inputs/input-log.asciidoc b/filebeat/docs/inputs/input-log.asciidoc index 5f25c79a7be6..42ff673c924c 100644 --- a/filebeat/docs/inputs/input-log.asciidoc +++ b/filebeat/docs/inputs/input-log.asciidoc @@ -85,6 +85,7 @@ readable by {beatname_uc} and set the path in the option `path` of `inode_marker The content of this file must be unique to the device. You can put the UUID of the device or mountpoint where the input is stored. The following example oneliner generates a hidden marker file for the selected mountpoint `/logs`: +Please note that you should not use this option on Windows. ["source","sh",subs="attributes"] ---- diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go index b495f3490569..51c6b1de0095 100644 --- a/filebeat/input/file/identifier.go +++ b/filebeat/input/file/identifier.go @@ -165,7 +165,7 @@ func (i *inodeMarkerIdentifier) markerContents() string { func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { m := i.markerContents() - stateID := fmt.Sprintf("%s%s%d-%s", i.name, identitySep, s.FileStateOS.Inode, m) + stateID := fmt.Sprintf("%s%s%d-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) return genIDWithHash(s.Meta, stateID), i.name } diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index f3c114a4ed72..98f1b0619b8a 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -715,6 +715,7 @@ def test_path_based_identity_tracking(self): # assert that renaming of the file went undetected assert not self.log_contains("File rename was detected:" + testfile + " -> " + renamedfile) + @unittest.skipIf(sys.platform.startswith("win")) def test_inode_marker_based_identity_tracking(self): """ File is picked up again if the contents of the marker file changes. @@ -748,6 +749,7 @@ def test_inode_marker_based_identity_tracking(self): self.wait_until(lambda: self.output_has(lines=2)) proc.check_kill_and_wait() + @unittest.skipIf(sys.platform.startswith("win")) def test_inode_marker_based_identity_tracking_to_path_based(self): """ File reading can be continued after file_identity is changed. diff --git a/libbeat/common/file/file_other.go b/libbeat/common/file/file_other.go index 599108f480b3..fa2082da8ac1 100644 --- a/libbeat/common/file/file_other.go +++ b/libbeat/common/file/file_other.go @@ -68,3 +68,8 @@ func IsRemoved(f *os.File) bool { _, err := os.Stat(f.Name()) return err != nil } + +// InodeString returns the inode in string. +func (s *StateOS) InodeString() string { + return strconv.FormatUint(s.Inode, 10) +} diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index 1a9ac6e1c769..783e45d9afd6 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -146,3 +146,12 @@ func IsRemoved(f *os.File) bool { } return info.DeletePending } + +// InodeString returns idxhi and idxlo as a string. +func InodeString() string { + var buf [61]byte + current := strconv.AppendUint(buf[:0], fs.IdxHi, 10) + current = append(current, '-') + current = strconv.AppendUint(current, fs.IdxLo, 10) + return string(current) +} From a35b38fe876f35150f5954cc4608024b1468f638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 8 Jul 2020 17:46:40 +0200 Subject: [PATCH 10/18] remove states with previous id --- filebeat/input/file/identifier.go | 2 +- filebeat/input/file/state.go | 1 + filebeat/input/log/input.go | 21 +++++++++++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go index 51c6b1de0095..c2c49f297b0f 100644 --- a/filebeat/input/file/identifier.go +++ b/filebeat/input/file/identifier.go @@ -165,7 +165,7 @@ func (i *inodeMarkerIdentifier) markerContents() string { func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { m := i.markerContents() - stateID := fmt.Sprintf("%s%s%d-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) + stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) return genIDWithHash(s.Meta, stateID), i.name } diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index b6b081b9ddf5..18fcfee583b0 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -27,6 +27,7 @@ import ( // State is used to communicate the reading state of a file type State struct { Id string `json:"id" struct:"id"` + PrevId string `json:"prev_id" struct:"prev_id"` Finished bool `json:"-" struct:"-"` // harvester state Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info Source string `json:"source" struct:"source"` diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 2324fb2fe042..d7bd4f180170 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -182,7 +182,7 @@ func (p *Input) loadStates(states []file.State) error { // and remove outdated state newId, identifierName := p.fileStateIdentifier.GenerateID(state) if state.IdentifierName != identifierName { - p.removeState(state) + state.PrevId = state.Id state.Id = newId state.IdentifierName = identifierName } @@ -733,8 +733,26 @@ func (p *Input) updateState(state file.State) error { state.Meta = nil } + err := p.doUpdate(state) + if err != nil { + return err + } + + if state.PrevId != "" { + stateToRemove := file.State{Id: state.PrevId, TTL: 0, Finished: true, Meta: nil} + err := p.doUpdate(stateToRemove) + if err != nil { + return fmt.Errorf("failed to remove outdated states based on prev_id: %v", err) + } + } + + return nil +} + +func (p *Input) doUpdate(state file.State) error { // Update first internal state p.states.Update(state) + ok := p.outlet.OnEvent(beat.Event{ Private: state, }) @@ -742,7 +760,6 @@ func (p *Input) updateState(state file.State) error { logp.Info("input outlet closed") return errors.New("input outlet closed") } - return nil } From 8c2f355f0712717e82a2217e6468c60f6a9efb24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 9 Jul 2020 13:48:48 +0200 Subject: [PATCH 11/18] fix minor issues --- filebeat/tests/system/test_input.py | 1 + libbeat/common/file/file_windows.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index 98f1b0619b8a..c8d55682779b 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -3,6 +3,7 @@ from filebeat import BaseTest import os import time +import unittest from beat.beat import Proc diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index 783e45d9afd6..1b8a9da49dea 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -148,7 +148,7 @@ func IsRemoved(f *os.File) bool { } // InodeString returns idxhi and idxlo as a string. -func InodeString() string { +func (fs *StateOS) InodeString() string { var buf [61]byte current := strconv.AppendUint(buf[:0], fs.IdxHi, 10) current = append(current, '-') From 3ef6550579a9136be8a80543bfba008d1b1ca6c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 9 Jul 2020 15:37:31 +0200 Subject: [PATCH 12/18] disable unsupported test for windows --- filebeat/input/file/identifier_test.go | 2 ++ filebeat/tests/system/test_input.py | 1 + 2 files changed, 3 insertions(+) diff --git a/filebeat/input/file/identifier_test.go b/filebeat/input/file/identifier_test.go index 870e0106f2e2..f47f4a37fb98 100644 --- a/filebeat/input/file/identifier_test.go +++ b/filebeat/input/file/identifier_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// +build !windows + package file import ( diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index c8d55682779b..f20a637b0ba8 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -2,6 +2,7 @@ from filebeat import BaseTest import os +import sys import time import unittest From 42206a66545505b24a098e18465781132bd0ba58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 13 Jul 2020 19:23:42 +0200 Subject: [PATCH 13/18] disable tests --- filebeat/tests/system/test_input.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index f20a637b0ba8..7aae7de43086 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -717,7 +717,7 @@ def test_path_based_identity_tracking(self): # assert that renaming of the file went undetected assert not self.log_contains("File rename was detected:" + testfile + " -> " + renamedfile) - @unittest.skipIf(sys.platform.startswith("win")) + @unittest.skipIf(sys.platform.startswith("win"), "inode_marker is not supported on windows") def test_inode_marker_based_identity_tracking(self): """ File is picked up again if the contents of the marker file changes. @@ -751,7 +751,7 @@ def test_inode_marker_based_identity_tracking(self): self.wait_until(lambda: self.output_has(lines=2)) proc.check_kill_and_wait() - @unittest.skipIf(sys.platform.startswith("win")) + @unittest.skipIf(sys.platform.startswith("win"), "inode_marker is not supported on windows") def test_inode_marker_based_identity_tracking_to_path_based(self): """ File reading can be continued after file_identity is changed. From fd370e714c871541e1864fd039f8ea641545fbcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 14 Jul 2020 10:03:26 +0200 Subject: [PATCH 14/18] change sleep to wait --- filebeat/tests/system/test_input.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index 7aae7de43086..11f9f60b0b8f 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -702,8 +702,8 @@ def test_path_based_identity_tracking(self): # wait until the file is picked up self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + self.wait_until(lambda: self.output_has(lines=1)) - time.sleep(2) renamedfile = self.working_dir + "/log/renamed.log" os.rename(testfile, renamedfile) From 11fef2ceeebb0ceed214071c49b80bb2c2f6ed3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 14 Jul 2020 10:13:22 +0200 Subject: [PATCH 15/18] doc fixes --- .../docs/inputs/input-common-file-options.asciidoc | 4 ++-- filebeat/docs/inputs/input-log.asciidoc | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/filebeat/docs/inputs/input-common-file-options.asciidoc b/filebeat/docs/inputs/input-common-file-options.asciidoc index be64eb20ad2e..b52e4660d44d 100644 --- a/filebeat/docs/inputs/input-common-file-options.asciidoc +++ b/filebeat/docs/inputs/input-common-file-options.asciidoc @@ -406,7 +406,7 @@ limit of harvesters. ===== `file_identity` Different `file_identity` methods can be configured to suit the -environment you are collecting log messages from. +environment where you are collecting log messages. *`inode_deviceid`*:: The default behaviour of {beatname_uc} is to differentiate @@ -433,7 +433,7 @@ file_identity.path: ~ ---- *`inode_marker`*:: If the device id changes from time to time, you must use -this method to tell apart files. Set the location of the marker file the following way: +this method to distinguish files. Set the location of the marker file the following way: [source,yaml] ---- diff --git a/filebeat/docs/inputs/input-log.asciidoc b/filebeat/docs/inputs/input-log.asciidoc index 42ff673c924c..0a69a9b65c29 100644 --- a/filebeat/docs/inputs/input-log.asciidoc +++ b/filebeat/docs/inputs/input-log.asciidoc @@ -70,12 +70,12 @@ device IDs. However, on network shares and cloud providers these values might change during the lifetime of the file. If this happens {beatname_uc} thinks that file is new and resends the whole content of the file. To solve this problem you can configure `file_identity` option. Possible -values are besides the default `inode_deviceid` are `path` and `inode_marker`. +values besides the default `inode_deviceid` are `path` and `inode_marker`. Selecting `path` instructs {beatname_uc} to identify files based on their paths. This is a quick way to aviod rereading files if inode and device ids might change. However, keep in mind if the files are rotated (renamed), they -are going to be reread and resent. +will be reread and resubmitted. The option `inode_marker` can be used if the inodes stay the same even if the device id is changed. You should choose this method if your files are @@ -85,7 +85,8 @@ readable by {beatname_uc} and set the path in the option `path` of `inode_marker The content of this file must be unique to the device. You can put the UUID of the device or mountpoint where the input is stored. The following example oneliner generates a hidden marker file for the selected mountpoint `/logs`: -Please note that you should not use this option on Windows. +Please note that you should not use this option on Windows as file identifiers might be +more volatile. ["source","sh",subs="attributes"] ---- @@ -114,8 +115,9 @@ a pattern that matches the file you want to harvest and all of its rotated files. Also make sure your log rotation strategy prevents lost or duplicate messages. For more information, see <>. -Furthermore, do not use `path` method for `file_identity` to avoid duplication -of the rotated log messages. +Furthermore, to avoid duplicate of rotated log messages, do not use the +`path` method for `file_identity`. Or exclude the rotated files with `exclude_files` +option. [id="{beatname_lc}-input-{type}-options"] ==== Configuration options From 2f8d4db4c8b4fec782dec8bfe6e67efcec894214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 14 Jul 2020 12:13:43 +0200 Subject: [PATCH 16/18] use os.path.join instead of + --- filebeat/tests/system/test_input.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index 11f9f60b0b8f..b3b832c451ce 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -695,21 +695,17 @@ def test_path_based_identity_tracking(self): input_raw=" file_identity.path: ~", ) - testfile = self.working_dir + "/log/test.log" + testfile = os.path.join(self.working_dir, "log", "test.log") self.__write_hello_word_to_test_input_file(testfile) proc = self.start_beat() # wait until the file is picked up - self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) self.wait_until(lambda: self.output_has(lines=1)) - renamedfile = self.working_dir + "/log/renamed.log" + renamedfile = os.path.join(self.working_dir, "log", "renamed.log") os.rename(testfile, renamedfile) - # wait until the renamed file is picked up - self.wait_until(lambda: self.log_contains("Start harvester for new file: " + renamedfile)) - # wait until the both messages are received by the output self.wait_until(lambda: self.output_has(lines=2)) proc.check_kill_and_wait() @@ -733,7 +729,7 @@ def test_inode_marker_based_identity_tracking(self): input_raw=" file_identity.inode_marker.path: " + marker_location, ) - testfile = self.working_dir + "/log/test.log" + testfile = os.path.join(self.working_dir, "log", "test.log") self.__write_hello_word_to_test_input_file(testfile) proc = self.start_beat() @@ -766,7 +762,7 @@ def test_inode_marker_based_identity_tracking_to_path_based(self): input_raw=" file_identity.inode_marker.path: " + marker_location, ) - testfile = self.working_dir + "/log/test.log" + testfile = os.path.join(self.working_dir, "log", "test.log") self.__write_hello_word_to_test_input_file(testfile) proc = self.start_beat() From 75b772580e7e5ff3f8493d8c9bee106feae815d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 14 Jul 2020 12:19:32 +0200 Subject: [PATCH 17/18] more fix --- filebeat/tests/system/test_input.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index b3b832c451ce..8aebe49f4d0a 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -719,7 +719,7 @@ def test_inode_marker_based_identity_tracking(self): File is picked up again if the contents of the marker file changes. """ - marker_location = self.working_dir + "/marker" + marker_location = os.path.join(self.working_dir, "marker") with open(marker_location, 'w') as m: m.write("very-unique-string") @@ -753,7 +753,7 @@ def test_inode_marker_based_identity_tracking_to_path_based(self): File reading can be continued after file_identity is changed. """ - marker_location = self.working_dir + "/marker" + marker_location = os.path.join(self.working_dir, "marker") with open(marker_location, 'w') as m: m.write("very-unique-string") From a9b61f2fe50bd954c166a6a2eb5157c79e211a02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 14 Jul 2020 13:19:14 +0200 Subject: [PATCH 18/18] rename inode_deviceid to natice && remove support for inode_marker on windows --- .../config/filebeat.inputs.reference.yml.tmpl | 2 +- .../inputs/input-common-file-options.asciidoc | 8 +- filebeat/filebeat.reference.yml | 2 +- filebeat/input/file/identifier.go | 88 ++--------------- .../input/file/identifier_inode_deviceid.go | 98 +++++++++++++++++++ .../file/identifier_inode_deviceid_windows.go | 30 ++++++ .../input/file/identifier_test_windows.go | 29 ++++++ filebeat/tests/system/test_harvester.py | 2 +- x-pack/filebeat/filebeat.reference.yml | 2 +- 9 files changed, 174 insertions(+), 87 deletions(-) create mode 100644 filebeat/input/file/identifier_inode_deviceid.go create mode 100644 filebeat/input/file/identifier_inode_deviceid_windows.go create mode 100644 filebeat/input/file/identifier_test_windows.go diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index 310c0fe4abca..c920b7dbec82 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -52,7 +52,7 @@ filebeat.inputs: # Method to determine if two files are the same or not. By default # the Beat considers two files the same if their inode and device id are the same. - #file_identity.inode_deviceid: ~ + #file_identity.native: ~ # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering diff --git a/filebeat/docs/inputs/input-common-file-options.asciidoc b/filebeat/docs/inputs/input-common-file-options.asciidoc index b52e4660d44d..9bd93e97efc4 100644 --- a/filebeat/docs/inputs/input-common-file-options.asciidoc +++ b/filebeat/docs/inputs/input-common-file-options.asciidoc @@ -409,12 +409,12 @@ Different `file_identity` methods can be configured to suit the environment where you are collecting log messages. -*`inode_deviceid`*:: The default behaviour of {beatname_uc} is to differentiate +*`native`*:: The default behaviour of {beatname_uc} is to differentiate between files using their inodes and device ids. [source,yaml] ---- -file_identity.inode_deviceid: ~ +file_identity.native: ~ ---- *`path`*:: To identify files based on their paths use this strategy. @@ -433,7 +433,9 @@ file_identity.path: ~ ---- *`inode_marker`*:: If the device id changes from time to time, you must use -this method to distinguish files. Set the location of the marker file the following way: +this method to distinguish files. This option is not supported on Windows. + +Set the location of the marker file the following way: [source,yaml] ---- diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index decc9d584e5e..f52b731140f1 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -439,7 +439,7 @@ filebeat.inputs: # Method to determine if two files are the same or not. By default # the Beat considers two files the same if their inode and device id are the same. - #file_identity.inode_deviceid: ~ + #file_identity.native: ~ # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go index c2c49f297b0f..c16535f3e19e 100644 --- a/filebeat/input/file/identifier.go +++ b/filebeat/input/file/identifier.go @@ -19,33 +19,28 @@ package file import ( "fmt" - "io/ioutil" - "os" - "path/filepath" "strconv" "strings" - "time" "github.com/mitchellh/hashstructure" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" ) const ( - inodeDeviceIDName = "inode_deviceid" - pathName = "path" - inodeMarkerName = "inode_marker" + nativeName = "native" + pathName = "path" + inodeMarkerName = "inode_marker" - DefaultIdentifierName = inodeDeviceIDName + DefaultIdentifierName = nativeName identitySep = "::" ) var ( identifierFactories = map[string]IdentifierFactory{ - inodeDeviceIDName: newINodeDeviceIdentifier, - pathName: newPathIdentifier, - inodeMarkerName: newINodeMarkerIdentifier, + nativeName: newINodeDeviceIdentifier, + pathName: newPathIdentifier, + inodeMarkerName: newINodeMarkerIdentifier, } ) @@ -78,7 +73,7 @@ type inodeDeviceIdentifier struct { func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) { return &inodeDeviceIdentifier{ - name: inodeDeviceIDName, + name: nativeName, }, nil } @@ -102,73 +97,6 @@ func (p *pathIdentifier) GenerateID(s State) (id, identifierType string) { return genIDWithHash(s.Meta, stateID), p.name } -type inodeMarkerIdentifier struct { - log *logp.Logger - name string - markerPath string - - markerFileLastModifitaion time.Time - markerTxt string -} - -func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { - var config struct { - MarkerPath string `config:"path" validate:"required"` - } - err := cfg.Unpack(&config) - if err != nil { - return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err) - } - - fi, err := os.Stat(config.MarkerPath) - if err != nil { - return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err) - } - markerContent, err := ioutil.ReadFile(config.MarkerPath) - if err != nil { - return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err) - } - return &inodeMarkerIdentifier{ - log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), - name: inodeMarkerName, - markerPath: config.MarkerPath, - markerFileLastModifitaion: fi.ModTime(), - markerTxt: string(markerContent), - }, nil -} - -func (i *inodeMarkerIdentifier) markerContents() string { - f, err := os.Open(i.markerPath) - if err != nil { - i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err) - return "" - } - defer f.Close() - - fi, err := f.Stat() - if err != nil { - i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err) - return "" - } - if i.markerFileLastModifitaion.Before(fi.ModTime()) { - contents, err := ioutil.ReadFile(i.markerPath) - if err != nil { - i.log.Errorf("Error while reading contents of marker file: %v", err) - return "" - } - i.markerTxt = string(contents) - } - - return i.markerTxt -} - -func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { - m := i.markerContents() - - stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) - return genIDWithHash(s.Meta, stateID), i.name -} - func genIDWithHash(meta map[string]string, fileID string) string { if len(meta) == 0 { return fileID diff --git a/filebeat/input/file/identifier_inode_deviceid.go b/filebeat/input/file/identifier_inode_deviceid.go new file mode 100644 index 000000000000..f5e191744d60 --- /dev/null +++ b/filebeat/input/file/identifier_inode_deviceid.go @@ -0,0 +1,98 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package file + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type inodeMarkerIdentifier struct { + log *logp.Logger + name string + markerPath string + + markerFileLastModifitaion time.Time + markerTxt string +} + +func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { + var config struct { + MarkerPath string `config:"path" validate:"required"` + } + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err) + } + + fi, err := os.Stat(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err) + } + markerContent, err := ioutil.ReadFile(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err) + } + return &inodeMarkerIdentifier{ + log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), + name: inodeMarkerName, + markerPath: config.MarkerPath, + markerFileLastModifitaion: fi.ModTime(), + markerTxt: string(markerContent), + }, nil +} + +func (i *inodeMarkerIdentifier) markerContents() string { + f, err := os.Open(i.markerPath) + if err != nil { + i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err) + return "" + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err) + return "" + } + if i.markerFileLastModifitaion.Before(fi.ModTime()) { + contents, err := ioutil.ReadFile(i.markerPath) + if err != nil { + i.log.Errorf("Error while reading contents of marker file: %v", err) + return "" + } + i.markerTxt = string(contents) + } + + return i.markerTxt +} + +func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { + m := i.markerContents() + + stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) + return genIDWithHash(s.Meta, stateID), i.name +} diff --git a/filebeat/input/file/identifier_inode_deviceid_windows.go b/filebeat/input/file/identifier_inode_deviceid_windows.go new file mode 100644 index 000000000000..9fb1152a33c8 --- /dev/null +++ b/filebeat/input/file/identifier_inode_deviceid_windows.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build windows + +package file + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { + return nil, fmt.Errorf("inode_deviceid is not supported on Windows") +} diff --git a/filebeat/input/file/identifier_test_windows.go b/filebeat/input/file/identifier_test_windows.go new file mode 100644 index 000000000000..544dbad2546c --- /dev/null +++ b/filebeat/input/file/identifier_test_windows.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build windows + +package file + +import "testing" + +func TestInodeMarkerError(t *testing.T) { + _, err := newINodeMarkerIdentifier(nil) + if err == nil { + t.Fatal("inode_marker should not be supported on windows") + } +} diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index a6e7004f8d06..cb30dc4976b6 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -79,7 +79,7 @@ def test_close_renamed(self): def test_close_removed(self): """ - Checks that a file is closed if removed with inode_deviceid file identifier + Checks that a file is closed if removed with native file identifier """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/test.log", diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 276476d83f52..879676c267ce 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1547,7 +1547,7 @@ filebeat.inputs: # Method to determine if two files are the same or not. By default # the Beat considers two files the same if their inode and device id are the same. - #file_identity.inode_deviceid: ~ + #file_identity.native: ~ # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering