From 6aeff15a7a9d53c4ad1e7959390ea7ec6c27281f Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 15 Sep 2020 00:51:09 +0200 Subject: [PATCH] Filebeat: Fix random error on harvester close (#21048) (#21056) This fixes a race condition when a harvester is closed at the same time that its source file size is being calculated. Before this fix, `Error updating file size` errors are randomly printed, because the file handle becomes closed inside the os.Stat call. (cherry picked from commit 918f17a7c772167c536e5f7c97e7ebc8b8a68712) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/harvester.go | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0da17cc4a95..3d6874efc00 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847] - Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834] - Fixed typo in log message. {pull}17897[17897] +- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 95043e94237..0e6c42c22a1 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -83,6 +83,7 @@ type Harvester struct { // shutdown handling done chan struct{} + doneWg *sync.WaitGroup stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex @@ -138,6 +139,7 @@ func NewHarvester( publishState: publishState, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, + doneWg: &sync.WaitGroup{}, id: id, outletFactory: outletFactory, } @@ -296,7 +298,11 @@ func (h *Harvester) Run() error { logp.Info("Harvester started for file: %s", h.state.Source) - go h.monitorFileSize() + h.doneWg.Add(1) + go func() { + h.monitorFileSize() + h.doneWg.Done() + }() for { select { @@ -375,7 +381,8 @@ func (h *Harvester) monitorFileSize() { func (h *Harvester) stop() { h.stopOnce.Do(func() { close(h.done) - + // Wait for goroutines monitoring h.done to terminate before closing source. + h.doneWg.Wait() filesMetrics.Remove(h.id.String()) }) }