From 0e73241a294b10af10aeace1fc164ebf8760b317 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Wed, 10 Jul 2024 19:42:49 +0530 Subject: [PATCH] [filebeat][filestream] Enable status reporter for filestream input (#40121) * initial commit filestream status * fix: test cleanup * fix: move the statusReporter to correct place * fix: remove test cases for now * chore: add changelog * fix: address review comments (cherry picked from commit 0e9c9dee86348fdc987d570e19a7c322d11010ce) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/filestream/input.go | 7 ++++++- .../input/filestream/internal/input-logfile/harvester.go | 6 +++++- filebeat/input/filestream/internal/input-logfile/input.go | 6 ++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 854111cff17..e911b742bad 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] - Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] +- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121] *Heartbeat* diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index ea761ec177f..f1f9b764600 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/debug" "github.com/elastic/beats/v7/libbeat/reader/parser" @@ -163,7 +164,11 @@ func (inp *filestream) Run( }) defer streamCancel() - return inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics) + if err := inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics); err != nil { + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while reading from source: %v", err)) + return err + } + return nil } func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 41cfc83857f..a7f70c6d31f 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/filestream/internal/task" inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" ) @@ -204,7 +205,7 @@ func startHarvester( if errors.Is(err, ErrHarvesterAlreadyRunning) { return nil } - + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while adding new reader to the bookkeeper %v", err)) return fmt.Errorf("error while adding new reader to the bookkeeper %w", err) } @@ -214,6 +215,7 @@ func startHarvester( resource, err := lock(ctx, hg.store, srcID) if err != nil { hg.readers.remove(srcID) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while locking resource: %v", err)) return fmt.Errorf("error while locking resource: %w", err) } defer releaseResource(resource) @@ -223,6 +225,7 @@ func startHarvester( }) if err != nil { hg.readers.remove(srcID) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while connecting to output with pipeline: %v", err)) return fmt.Errorf("error while connecting to output with pipeline: %w", err) } defer client.Close() @@ -234,6 +237,7 @@ func startHarvester( err = hg.harvester.Run(ctx, src, cursor, publisher, metrics) if err != nil && !errors.Is(err, context.Canceled) { hg.readers.remove(srcID) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("error while running harvester: %v", err)) return fmt.Errorf("error while running harvester: %w", err) } // If the context was not cancelled it means that the Harvester is stopping because of diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index db3e713fbdd..88adb5622ce 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -25,6 +25,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/ctxtool" ) @@ -53,6 +54,7 @@ func (inp *managedInput) Run( ctx input.Context, pipeline beat.PipelineConnector, ) (err error) { + ctx.UpdateStatus(status.Starting, "") groupStore := inp.manager.getRetainedStore() defer groupStore.Release() @@ -85,6 +87,10 @@ func (inp *managedInput) Run( defer prospectorStore.Release() sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier) + // Mark it as running for now. + // Any errors encountered by harverter will change state to Degraded + ctx.UpdateStatus(status.Running, "") + inp.prospector.Run(ctx, sourceStore, hg) // Notify the manager the input has stopped, currently that is used to