Skip to content

Commit

Permalink
Cherry-pick #21444 to 7.x: Add implementation of FSWatcher and FSScan…
Browse files Browse the repository at this point in the history
…ner for filestream (#21468)

* Add implementation of FSWatcher and FSScanner for filestream (#21444)

## What does this PR do?

This PR adds the implementation for `FSWatcher` and `FSScanner` for the `filestream` input.

The implementation of `FSScanner` is called `fileScanner`. It is responsible for
* resolves recursive globs on creation
* normalizes glob patterns on creation
* finds files which match the configured paths and returns `FileInfo` for those

This is the refactored version of the `log` input's scanner, globber functions.

The implementation of `FSWatcher` is called `fileWatcher`. It checks the file list returned by `fileScanner` and creates events based on the result.

## Why is it important?

It is required for the `filestream` input.

## Related issues

Related #20243

(cherry picked from commit a119083)

* Do not run symlink tests on Windows (#21472)
kvch authored Oct 5, 2020

Verified

This commit was signed with the committer’s verified signature.
frostming Frost Ming
1 parent 5d60433 commit 9c00415
Showing 3 changed files with 787 additions and 0 deletions.
375 changes: 375 additions & 0 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,375 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/elastic/beats/v7/filebeat/input/file"
loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

const (
recursiveGlobDepth = 8
scannerName = "scanner"
watcherDebugKey = "file_watcher"
)

var (
watcherFactories = map[string]watcherFactory{
scannerName: newScannerWatcher,
}
)

type watcherFactory func(paths []string, cfg *common.Config) (loginp.FSWatcher, error)

// fileScanner looks for files which match the patterns in paths.
// It is able to exclude files and symlinks.
type fileScanner struct {
paths []string
excludedFiles []match.Matcher
symlinks bool

log *logp.Logger
}

type fileWatcherConfig struct {
// Interval is the time between two scans.
Interval time.Duration
// Scanner is the configuration of the scanner.
Scanner fileScannerConfig
}

// fileWatcher gets the list of files from a FSWatcher and creates events by
// comparing the files between its last two runs.
type fileWatcher struct {
interval time.Duration
prev map[string]os.FileInfo
scanner loginp.FSScanner
log *logp.Logger
events chan loginp.FSEvent
}

func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) {
if ns == nil {
return newScannerWatcher(paths, nil)
}

watcherType := ns.Name()
f, ok := watcherFactories[watcherType]
if !ok {
return nil, fmt.Errorf("no such file watcher: %s", watcherType)
}

return f(paths, ns.Config())
}

func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, error) {
config := defaultFileWatcherConfig()
err := c.Unpack(&config)
if err != nil {
return nil, err
}
scanner, err := newFileScanner(paths, config.Scanner)
if err != nil {
return nil, err
}
return &fileWatcher{
log: logp.NewLogger(watcherDebugKey),
interval: config.Interval,
prev: make(map[string]os.FileInfo, 0),
scanner: scanner,
events: make(chan loginp.FSEvent),
}, nil
}

func defaultFileWatcherConfig() fileWatcherConfig {
return fileWatcherConfig{
Interval: 10 * time.Second,
Scanner: defaultFileScannerConfig(),
}
}

func (w *fileWatcher) Run(ctx unison.Canceler) {
defer close(w.events)

ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.watch(ctx)
}
}
}

func (w *fileWatcher) watch(ctx unison.Canceler) {
w.log.Info("Start next scan")

paths := w.scanner.GetFiles()

newFiles := make(map[string]os.FileInfo)

for path, info := range paths {

prevInfo, ok := w.prev[path]
if !ok {
newFiles[path] = paths[path]
continue
}

if prevInfo.ModTime() != info.ModTime() {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
}
}

// delete from previous state, as we have more up to date info
delete(w.prev, path)
}

// remaining files are in the prev map are the ones that are missing
// either because they have been deleted or renamed
for removedPath, removedInfo := range w.prev {
for newPath, newInfo := range newFiles {
if os.SameFile(removedInfo, newInfo) {
select {
case <-ctx.Done():
return
case w.events <- renamedEvent(removedPath, newPath, newInfo):
delete(newFiles, newPath)
goto CHECK_NEXT_REMOVED
}
}
}

select {
case <-ctx.Done():
return
case w.events <- deleteEvent(removedPath, removedInfo):
}
CHECK_NEXT_REMOVED:
}

// remaining files in newFiles are new
for path, info := range newFiles {
select {
case <-ctx.Done():
return
case w.events <- createEvent(path, info):
}

}

w.log.Debugf("Found %d paths", len(paths))
w.prev = paths
}

func createEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: path, Info: fi}
}

func writeEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi}
}

func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi}
}

func deleteEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpDelete, OldPath: path, NewPath: "", Info: fi}
}

func (w *fileWatcher) Event() loginp.FSEvent {
return <-w.events
}

type fileScannerConfig struct {
Paths []string
ExcludedFiles []match.Matcher
Symlinks bool
RecursiveGlob bool
}

func defaultFileScannerConfig() fileScannerConfig {
return fileScannerConfig{
Symlinks: false,
RecursiveGlob: true,
}
}

func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, error) {
fs := fileScanner{
paths: paths,
excludedFiles: cfg.ExcludedFiles,
symlinks: cfg.Symlinks,
log: logp.NewLogger(scannerName),
}
err := fs.resolveRecursiveGlobs(cfg)
if err != nil {
return nil, err
}
err = fs.normalizeGlobPatterns()
if err != nil {
return nil, err
}

return &fs, nil
}

// resolveRecursiveGlobs expands `**` from the globs in multiple patterns
func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error {
if !c.RecursiveGlob {
s.log.Debug("recursive glob disabled")
return nil
}

s.log.Debug("recursive glob enabled")
var paths []string
for _, path := range s.paths {
patterns, err := file.GlobPatterns(path, recursiveGlobDepth)
if err != nil {
return err
}
if len(patterns) > 1 {
s.log.Debugf("%q expanded to %#v", path, patterns)
}
paths = append(paths, patterns...)
}
s.paths = paths
return nil
}

// normalizeGlobPatterns calls `filepath.Abs` on all the globs from config
func (s *fileScanner) normalizeGlobPatterns() error {
var paths []string
for _, path := range s.paths {
pathAbs, err := filepath.Abs(path)
if err != nil {
return fmt.Errorf("failed to get the absolute path for %s: %v", path, err)
}
paths = append(paths, pathAbs)
}
s.paths = paths
return nil
}

// GetFiles returns a map of files and fileinfos which
// match the configured paths.
func (s *fileScanner) GetFiles() map[string]os.FileInfo {
pathInfo := map[string]os.FileInfo{}

for _, path := range s.paths {
matches, err := filepath.Glob(path)
if err != nil {
s.log.Errorf("glob(%s) failed: %v", path, err)
continue
}

for _, file := range matches {
if s.shouldSkipFile(file) {
continue
}

// If symlink is enabled, it is checked that original is not part of same input
// If original is harvested by other input, states will potentially overwrite each other
if s.isOriginalAndSymlinkConfigured(file, pathInfo) {
continue
}

fileInfo, err := os.Stat(file)
if err != nil {
s.log.Debug("stat(%s) failed: %s", file, err)
continue
}
pathInfo[file] = fileInfo
}
}

return pathInfo
}

func (s *fileScanner) shouldSkipFile(file string) bool {
if s.isFileExcluded(file) {
s.log.Debugf("Exclude file: %s", file)
return true
}

fileInfo, err := os.Lstat(file)
if err != nil {
s.log.Debugf("lstat(%s) failed: %s", file, err)
return true
}

if fileInfo.IsDir() {
s.log.Debugf("Skipping directory: %s", file)
return true
}

isSymlink := fileInfo.Mode()&os.ModeSymlink > 0
if isSymlink && !s.symlinks {
s.log.Debugf("File %s skipped as it is a symlink", file)
return true
}

return false
}

func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, paths map[string]os.FileInfo) bool {
if s.symlinks {
fileInfo, err := os.Stat(file)
if err != nil {
s.log.Debugf("stat(%s) failed: %s", file, err)
return false
}

for _, finfo := range paths {
if os.SameFile(finfo, fileInfo) {
s.log.Info("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name())
return true
}
}
}
return false
}

func (s *fileScanner) isFileExcluded(file string) bool {
return len(s.excludedFiles) > 0 && s.matchAny(s.excludedFiles, file)
}

// matchAny checks if the text matches any of the regular expressions
func (s *fileScanner) matchAny(matchers []match.Matcher, text string) bool {
for _, m := range matchers {
if m.MatchString(text) {
return true
}
}
return false
}
268 changes: 268 additions & 0 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/logp"
)

var (
excludedFilePath = filepath.Join("testdata", "excluded_file")
includedFilePath = filepath.Join("testdata", "included_file")
directoryPath = filepath.Join("testdata", "unharvestable_dir")
)

func TestFileScanner(t *testing.T) {
testCases := map[string]struct {
paths []string
excludedFiles []match.Matcher
symlinks bool
expectedFiles []string
}{
"select all files": {
paths: []string{excludedFilePath, includedFilePath},
expectedFiles: []string{
mustAbsPath(excludedFilePath),
mustAbsPath(includedFilePath),
},
},
"skip excluded files": {
paths: []string{excludedFilePath, includedFilePath},
excludedFiles: []match.Matcher{
match.MustCompile("excluded_file"),
},
expectedFiles: []string{
mustAbsPath(includedFilePath),
},
},
"skip directories": {
paths: []string{directoryPath},
expectedFiles: []string{},
},
}

setupFilesForScannerTest(t)
defer removeFilesOfScannerTest(t)

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
cfg := fileScannerConfig{
ExcludedFiles: test.excludedFiles,
Symlinks: test.symlinks,
RecursiveGlob: false,
}
fs, err := newFileScanner(test.paths, cfg)
if err != nil {
t.Fatal(err)
}
files := fs.GetFiles()
paths := make([]string, 0)
for p, _ := range files {
paths = append(paths, p)
}
assert.True(t, checkIfSameContents(test.expectedFiles, paths))
})
}
}

func setupFilesForScannerTest(t *testing.T) {
err := os.MkdirAll(directoryPath, 0750)
if err != nil {
t.Fatal(t)
}

for _, path := range []string{excludedFilePath, includedFilePath} {
f, err := os.Create(path)
if err != nil {
t.Fatalf("file %s, error %v", path, err)
}
f.Close()
}
}

func removeFilesOfScannerTest(t *testing.T) {
err := os.RemoveAll("testdata")
if err != nil {
t.Fatal(err)
}
}

// only handles sets
func checkIfSameContents(one, other []string) bool {
if len(one) != len(other) {
return false
}

mustFind := len(one)
for _, oneElem := range one {
for _, otherElem := range other {
if oneElem == otherElem {
mustFind--
}
}
}
return mustFind == 0
}

func TestFileWatchNewDeleteModified(t *testing.T) {
oldTs := time.Now()
newTs := oldTs.Add(5 * time.Second)
testCases := map[string]struct {
prevFiles map[string]os.FileInfo
nextFiles map[string]os.FileInfo
expectedEvents []loginp.FSEvent
}{
"one new file": {
prevFiles: map[string]os.FileInfo{},
nextFiles: map[string]os.FileInfo{
"new_path": testFileInfo{"new_path", 5, oldTs},
},
expectedEvents: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "new_path", Info: testFileInfo{"new_path", 5, oldTs}},
},
},
"one deleted file": {
prevFiles: map[string]os.FileInfo{
"old_path": testFileInfo{"old_path", 5, oldTs},
},
nextFiles: map[string]os.FileInfo{},
expectedEvents: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpDelete, OldPath: "old_path", NewPath: "", Info: testFileInfo{"old_path", 5, oldTs}},
},
},
"one modified file": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 5, oldTs},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 10, newTs},
},
expectedEvents: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 10, newTs}},
},
},
"two modified files": {
prevFiles: map[string]os.FileInfo{
"path1": testFileInfo{"path1", 5, oldTs},
"path2": testFileInfo{"path2", 5, oldTs},
},
nextFiles: map[string]os.FileInfo{
"path1": testFileInfo{"path1", 10, newTs},
"path2": testFileInfo{"path2", 10, newTs},
},
expectedEvents: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs}},
loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path2", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs}},
},
},
"one modified file, one new file": {
prevFiles: map[string]os.FileInfo{
"path1": testFileInfo{"path1", 5, oldTs},
},
nextFiles: map[string]os.FileInfo{
"path1": testFileInfo{"path1", 10, newTs},
"path2": testFileInfo{"path2", 10, newTs},
},
expectedEvents: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs}},
loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs}},
},
},
"one new file, one deleted file": {
prevFiles: map[string]os.FileInfo{
"path_deleted": testFileInfo{"path_deleted", 5, oldTs},
},
nextFiles: map[string]os.FileInfo{
"path_new": testFileInfo{"path_new", 10, newTs},
},
expectedEvents: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpDelete, OldPath: "path_deleted", NewPath: "", Info: testFileInfo{"path_deleted", 5, oldTs}},
loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path_new", Info: testFileInfo{"path_new", 10, newTs}},
},
},
}

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
w := fileWatcher{
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent),
}

go w.watch(context.Background())

for _, expectedEvent := range test.expectedEvents {
evt := w.Event()
assert.Equal(t, expectedEvent, evt)
}
})
}
}

type mockScanner struct {
files map[string]os.FileInfo
}

func (m *mockScanner) GetFiles() map[string]os.FileInfo {
return m.files
}

type testFileInfo struct {
path string
size int64
time time.Time
}

func (t testFileInfo) Name() string { return t.path }
func (t testFileInfo) Size() int64 { return t.size }
func (t testFileInfo) Mode() os.FileMode { return 0 }
func (t testFileInfo) ModTime() time.Time { return t.time }
func (t testFileInfo) IsDir() bool { return false }
func (t testFileInfo) Sys() interface{} { return nil }

func mustAbsPath(path string) string {
p, err := filepath.Abs(path)
if err != nil {
panic(err)
}
return p
}

func mustDuration(durStr string) time.Duration {
dur, err := time.ParseDuration(durStr)
if err != nil {
panic(err)
}
return dur
}
144 changes: 144 additions & 0 deletions filebeat/input/filestream/fswatch_test_non_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package filestream

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/logp"
)

func TestFileScannerSymlinks(t *testing.T) {
testCases := map[string]struct {
paths []string
excludedFiles []match.Matcher
symlinks bool
expectedFiles []string
}{
// covers test_input.py/test_skip_symlinks
"skip symlinks": {
paths: []string{
filepath.Join("testdata", "symlink_to_included_file"),
filepath.Join("testdata", "included_file"),
},
symlinks: false,
expectedFiles: []string{
mustAbsPath(filepath.Join("testdata", "included_file")),
},
},
"return a file once if symlinks are enabled": {
paths: []string{
filepath.Join("testdata", "symlink_to_included_file"),
filepath.Join("testdata", "included_file"),
},
symlinks: true,
expectedFiles: []string{
mustAbsPath(filepath.Join("testdata", "included_file")),
},
},
}

err := os.Symlink(
mustAbsPath(filepath.Join("testdata", "included_file")),
mustAbsPath(filepath.Join("testdata", "symlink_to_included_file")),
)
if err != nil {
t.Fatal(err)
}

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
cfg := fileScannerConfig{
ExcludedFiles: test.excludedFiles,
Symlinks: true,
RecursiveGlob: false,
}
fs, err := newFileScanner(test.paths, cfg)
if err != nil {
t.Fatal(err)
}
files := fs.GetFiles()
paths := make([]string, 0)
for p, _ := range files {
paths = append(paths, p)
}
assert.Equal(t, test.expectedFiles, paths)
})
}
}

func TestFileWatcherRenamedFile(t *testing.T) {
testPath := mustAbsPath("first_name")
renamedPath := mustAbsPath("renamed")

f, err := os.Create(testPath)
if err != nil {
t.Fatal(err)
}
f.Close()
fi, err := os.Stat(testPath)
if err != nil {
t.Fatal(err)
}

cfg := fileScannerConfig{
ExcludedFiles: nil,
Symlinks: false,
RecursiveGlob: false,
}
scanner, err := newFileScanner([]string{testPath, renamedPath}, cfg)
if err != nil {
t.Fatal(err)
}
w := fileWatcher{
log: logp.L(),
scanner: scanner,
events: make(chan loginp.FSEvent),
}

go w.watch(context.Background())
assert.Equal(t, loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: testPath, Info: fi}, w.Event())

err = os.Rename(testPath, renamedPath)
if err != nil {
t.Fatal(err)
}
defer os.Remove(renamedPath)
fi, err = os.Stat(renamedPath)
if err != nil {
t.Fatal(err)
}

go w.watch(context.Background())
evt := w.Event()

assert.Equal(t, loginp.OpRename, evt.Op)
assert.Equal(t, testPath, evt.OldPath)
assert.Equal(t, renamedPath, evt.NewPath)
}

0 comments on commit 9c00415

Please sign in to comment.