From ad6223251ea913ce359cd39eb958811b36b9b6aa Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 10 Sep 2020 16:14:33 +0200 Subject: [PATCH] Filebeat: Fix random error on harvester close (#21048) This fixes a race condition when a harvester is closed at the same time that its source file size is being calculated. Before this fix, `Error updating file size` errors are randomly printed, because the file handle becomes closed inside the os.Stat call. (cherry picked from commit af6222d04f4f125d4cc2130a838ddb1daabfb17c) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/harvester.go | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 58d4d4efab81..1a2c866576cd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847] - Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834] - Fixed typo in log message. {pull}17897[17897] +- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 95043e94237a..0e6c42c22a11 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -83,6 +83,7 @@ type Harvester struct { // shutdown handling done chan struct{} + doneWg *sync.WaitGroup stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex @@ -138,6 +139,7 @@ func NewHarvester( publishState: publishState, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, + doneWg: &sync.WaitGroup{}, id: id, outletFactory: outletFactory, } @@ -296,7 +298,11 @@ func (h *Harvester) Run() error { logp.Info("Harvester started for file: %s", h.state.Source) - go h.monitorFileSize() + h.doneWg.Add(1) + go func() { + h.monitorFileSize() + h.doneWg.Done() + }() for { select { @@ -375,7 +381,8 @@ func (h *Harvester) monitorFileSize() { func (h *Harvester) stop() { h.stopOnce.Do(func() { close(h.done) - + // Wait for goroutines monitoring h.done to terminate before closing source. + h.doneWg.Wait() filesMetrics.Remove(h.id.String()) }) }