Skip to content

Commit

Permalink
Filebeat: Fix random error on harvester close (elastic#21048)
Browse files Browse the repository at this point in the history
This fixes a race condition when a harvester is closed at the same time
that its source file size is being calculated.

Before this fix, `Error updating file size` errors are randomly printed, because
the file handle becomes closed inside the os.Stat call.

(cherry picked from commit af6222d)
  • Loading branch information
adriansr committed Sep 10, 2020
1 parent a914041 commit 6ce4595
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix event types and categories in auditd module to comply with ECS {pull}20652[20652]
- Update documentation in the azure module filebeat. {pull}20815[20815]
- Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983]
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]

*Heartbeat*

Expand Down
11 changes: 9 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Harvester struct {

// shutdown handling
done chan struct{}
doneWg *sync.WaitGroup
stopOnce sync.Once
stopWg *sync.WaitGroup
stopLock sync.Mutex
Expand Down Expand Up @@ -138,6 +139,7 @@ func NewHarvester(
publishState: publishState,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
doneWg: &sync.WaitGroup{},
id: id,
outletFactory: outletFactory,
}
Expand Down Expand Up @@ -299,7 +301,11 @@ func (h *Harvester) Run() error {

logp.Info("Harvester started for file: %s", h.state.Source)

go h.monitorFileSize()
h.doneWg.Add(1)
go func() {
h.monitorFileSize()
h.doneWg.Done()
}()

for {
select {
Expand Down Expand Up @@ -378,7 +384,8 @@ func (h *Harvester) monitorFileSize() {
func (h *Harvester) stop() {
h.stopOnce.Do(func() {
close(h.done)

// Wait for goroutines monitoring h.done to terminate before closing source.
h.doneWg.Wait()
filesMetrics.Remove(h.id.String())
})
}
Expand Down

0 comments on commit 6ce4595

Please sign in to comment.