Skip to content

Commit

Permalink
Do not add all contextual information to the same logger in `filestre…
Browse files Browse the repository at this point in the history
…am` prospector (#26977)

## What does this PR do?

This PR creates a separate logger for every FSEvent the prospector encounters to make sure contextual metadata of different files does not get mixed together.

Furthermore, now the human-readable format of the file system event is added to the metadata.

## Why is it important?

Previously after getting more than one file system event through the file watcher metadata started to pile up in the logger. Notice the duplicated keys "operation", "source_name", "os_id", "new_path" after accepting the second event form the channel:
```json
{
    "id": " 2248D961AFA2E8F",
    "prospector": "file_prospector",
    "operation": "create",
    "source_name": "native::8786365-65029",
    "os_id": "8786365-65029",
    "new_path": "/home/n/go/src/github.com/elastic/beats/filebeat/test.log",
    "operation": "write",
    "source_name": "native::8786363-65029",
    "os_id": "8786363-65029",
    "new_path": "/home/n/go/src/github.com/elastic/beats/filebeat/test.log",
    "old_path": "/home/n/go/src/github.com/elastic/beats/filebeat/test.log"
}
```
  • Loading branch information
kvch authored Jul 26, 2021
1 parent fa85b8a commit 877d8bc
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 88 deletions.
113 changes: 62 additions & 51 deletions filebeat/input/filestream/copytruncate_prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,78 +240,89 @@ func (p *copyTruncateFileProspector) Run(ctx input.Context, s loginp.StateMetada
}

src := p.identifier.GetSource(fe)
log = loggerWithEvent(log, fe, src)
p.onFSEvent(loggerWithEvent(log, fe, src), ctx, fe, src, s, hg, ignoreInactiveSince)

switch fe.Op {
case loginp.OpCreate, loginp.OpWrite:
if fe.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", fe.NewPath)
}
return nil
})

} else if fe.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", fe.NewPath)
}
errs := tg.Wait()
if len(errs) > 0 {
log.Error("%s", sderr.WrapAll(errs, "running prospector failed"))
}
}

if p.fileProspector.isFileIgnored(log, fe, ignoreInactiveSince) {
continue
}
func (p *copyTruncateFileProspector) onFSEvent(
log *logp.Logger,
ctx input.Context,
event loginp.FSEvent,
src loginp.Source,
updater loginp.StateMetadataUpdater,
group loginp.HarvesterGroup,
ignoreSince time.Time,
) {

if fe.Op == loginp.OpCreate {
err := s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: p.identifier.Name()})
if err != nil {
log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err)
}
}
switch event.Op {
case loginp.OpCreate, loginp.OpWrite:
if event.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", event.NewPath)

// check if the event belongs to a rotated file
if p.isRotated(fe) {
log.Debugf("File %s is rotated", fe.NewPath)
} else if event.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", event.NewPath)
}

p.onRotatedFile(log, ctx, fe, src, hg)
if p.fileProspector.isFileIgnored(log, event, ignoreSince) {
return
}

} else {
log.Debugf("File %s is original", fe.NewPath)
// if file is original, add it to the bookeeper
p.rotatedFiles.addOriginalFile(fe.NewPath, src)
if event.Op == loginp.OpCreate {
err := updater.UpdateMetadata(src, fileMeta{Source: event.NewPath, IdentifierName: p.identifier.Name()})
if err != nil {
log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err)
}
}

hg.Start(ctx, src)
}
// check if the event belongs to a rotated file
if p.isRotated(event) {
log.Debugf("File %s is rotated", event.NewPath)

case loginp.OpTruncate:
log.Debugf("File %s has been truncated", fe.NewPath)
p.onRotatedFile(log, ctx, event, src, group)

s.ResetCursor(src, state{Offset: 0})
hg.Restart(ctx, src)
} else {
log.Debugf("File %s is original", event.NewPath)
// if file is original, add it to the bookeeper
p.rotatedFiles.addOriginalFile(event.NewPath, src)

case loginp.OpDelete:
log.Debugf("File %s has been removed", fe.OldPath)
group.Start(ctx, src)
}

p.fileProspector.onRemove(log, fe, src, s, hg)
case loginp.OpTruncate:
log.Debugf("File %s has been truncated", event.NewPath)

case loginp.OpRename:
log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath)
updater.ResetCursor(src, state{Offset: 0})
group.Restart(ctx, src)

// check if the event belongs to a rotated file
if p.isRotated(fe) {
log.Debugf("File %s is rotated", fe.NewPath)
case loginp.OpDelete:
log.Debugf("File %s has been removed", event.OldPath)

p.onRotatedFile(log, ctx, fe, src, hg)
}
p.fileProspector.onRemove(log, event, src, updater, group)

p.fileProspector.onRename(log, ctx, fe, src, s, hg)
case loginp.OpRename:
log.Debugf("File %s has been renamed to %s", event.OldPath, event.NewPath)

default:
log.Error("Unkown return value %v", fe.Op)
}
// check if the event belongs to a rotated file
if p.isRotated(event) {
log.Debugf("File %s is rotated", event.NewPath)

p.onRotatedFile(log, ctx, event, src, group)
}
return nil
})

errs := tg.Wait()
if len(errs) > 0 {
log.Error("%s", sderr.WrapAll(errs, "running prospector failed"))
p.fileProspector.onRename(log, ctx, event, src, updater, group)

default:
log.Error("Unkown return value %v", event.Op)
}
}

func (p *copyTruncateFileProspector) isRotated(event loginp.FSEvent) bool {
if p.rotatedSuffix.MatchString(event.NewPath) {
return true
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func loggerWithEvent(logger *logp.Logger, event loginp.FSEvent, src loginp.Source) *logp.Logger {
log := logger.With(
"operation", event.Op,
"operation", event.Op.String(),
"source_name", src.Name(),
)
if event.Info != nil && event.Info.Sys() != nil {
Expand Down
83 changes: 47 additions & 36 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,54 +129,65 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
}

src := p.identifier.GetSource(fe)
log = loggerWithEvent(log, fe, src)

switch fe.Op {
case loginp.OpCreate, loginp.OpWrite:
if fe.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", fe.NewPath)
p.onFSEvent(loggerWithEvent(log, fe, src), ctx, fe, src, s, hg, ignoreInactiveSince)
}
return nil
})

err := s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: p.identifier.Name()})
if err != nil {
log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err)
}
errs := tg.Wait()
if len(errs) > 0 {
log.Error("%s", sderr.WrapAll(errs, "running prospector failed"))
}
}

} else if fe.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", fe.NewPath)
}
func (p *fileProspector) onFSEvent(
log *logp.Logger,
ctx input.Context,
event loginp.FSEvent,
src loginp.Source,
updater loginp.StateMetadataUpdater,
group loginp.HarvesterGroup,
ignoreSince time.Time,
) {

switch event.Op {
case loginp.OpCreate, loginp.OpWrite:
if event.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", event.NewPath)

err := updater.UpdateMetadata(src, fileMeta{Source: event.NewPath, IdentifierName: p.identifier.Name()})
if err != nil {
log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err)
}

if p.isFileIgnored(log, fe, ignoreInactiveSince) {
break
}
} else if event.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", event.NewPath)
}

hg.Start(ctx, src)
if p.isFileIgnored(log, event, ignoreSince) {
return
}

case loginp.OpTruncate:
log.Debugf("File %s has been truncated", fe.NewPath)
group.Start(ctx, src)

s.ResetCursor(src, state{Offset: 0})
hg.Restart(ctx, src)
case loginp.OpTruncate:
log.Debugf("File %s has been truncated", event.NewPath)

case loginp.OpDelete:
log.Debugf("File %s has been removed", fe.OldPath)
updater.ResetCursor(src, state{Offset: 0})
group.Restart(ctx, src)

p.onRemove(log, fe, src, s, hg)
case loginp.OpDelete:
log.Debugf("File %s has been removed", event.OldPath)

case loginp.OpRename:
log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath)
p.onRemove(log, event, src, updater, group)

p.onRename(log, ctx, fe, src, s, hg)
case loginp.OpRename:
log.Debugf("File %s has been renamed to %s", event.OldPath, event.NewPath)

default:
log.Error("Unkown return value %v", fe.Op)
}
}
return nil
})
p.onRename(log, ctx, event, src, updater, group)

errs := tg.Wait()
if len(errs) > 0 {
log.Error("%s", sderr.WrapAll(errs, "running prospector failed"))
default:
log.Error("Unkown return value %v", event.Op)
}
}

Expand Down

0 comments on commit 877d8bc

Please sign in to comment.