diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0da17cc4a95..3d6874efc00 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847] - Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834] - Fixed typo in log message. {pull}17897[17897] +- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 95043e94237..0e6c42c22a1 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -83,6 +83,7 @@ type Harvester struct { // shutdown handling done chan struct{} + doneWg *sync.WaitGroup stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex @@ -138,6 +139,7 @@ func NewHarvester( publishState: publishState, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, + doneWg: &sync.WaitGroup{}, id: id, outletFactory: outletFactory, } @@ -296,7 +298,11 @@ func (h *Harvester) Run() error { logp.Info("Harvester started for file: %s", h.state.Source) - go h.monitorFileSize() + h.doneWg.Add(1) + go func() { + h.monitorFileSize() + h.doneWg.Done() + }() for { select { @@ -375,7 +381,8 @@ func (h *Harvester) monitorFileSize() { func (h *Harvester) stop() { h.stopOnce.Do(func() { close(h.done) - + // Wait for goroutines monitoring h.done to terminate before closing source. + h.doneWg.Wait() filesMetrics.Remove(h.id.String()) }) }