Skip to content

Commit

Permalink
[x-pack/filebeat/netflow] add netflow status reporting under Agent ma…
Browse files Browse the repository at this point in the history
…nagement (#40080)

* feat: add netflow status reporting under Agent management

* doc: update CHANGELOG.next.asciidoc

* fix: ignore acknowledgment on delete stream

* fix: use ES_SUPERUSER_USER and ES_SUPERUSER_PASS to support data_stream access through http

* fix: avoid race condition of nil channel

* fix: replace Allow with Wait for rate limit

* fix: specify time threshold in constants

* fix: exit directly in debugPrintProcessor when log doesn't have debug level enabled

* fix: rework on ratelimit to batch up requests and make code lighter

* feat: check also for malformed events

* feat: add deterministic check for expected events number

* Revert "fix: exit directly in debugPrintProcessor when log doesn't have debug level enabled"

This reverts commit c7978ee.

* fix: check err from json unmarshal of expectedFlows
  • Loading branch information
pkoutsovasilis authored Jul 4, 2024
1 parent c111789 commit 8e607a3
Show file tree
Hide file tree
Showing 3 changed files with 425 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]

Expand Down
11 changes: 10 additions & 1 deletion x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"

Expand Down Expand Up @@ -110,6 +111,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
n.started = true
n.mtx.Unlock()

ctx.UpdateStatus(status.Starting, "Starting netflow input")
n.logger.Info("Starting netflow input")

n.logger.Info("Connecting to beat event publishing")
Expand All @@ -121,6 +123,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
EventListener: nil,
})
if err != nil {
ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed connecting to beat event publishing: %v", err))
n.logger.Errorw("Failed connecting to beat event publishing", "error", err)
n.stop()
return err
Expand All @@ -142,11 +145,13 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
WithSharedTemplates(n.cfg.ShareTemplates).
WithActiveSessionsMetric(flowMetrics.ActiveSessions()))
if err != nil {
ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed to initialize netflow decoder: %v", err))
return fmt.Errorf("error initializing netflow decoder: %w", err)
}

n.logger.Info("Starting netflow decoder")
if err := n.decoder.Start(); err != nil {
ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed to start netflow decoder: %v", err))
n.logger.Errorw("Failed to start netflow decoder", "error", err)
n.stop()
return err
Expand All @@ -167,7 +172,9 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
})
err = udpServer.Start()
if err != nil {
n.logger.Errorf("Failed to start udp server: %v", err)
errorMsg := fmt.Sprintf("Failed to start udp server: %v", err)
n.logger.Errorf(errorMsg)
ctx.UpdateStatus(status.Failed, errorMsg)
n.stop()
return err
}
Expand All @@ -178,6 +185,8 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
n.stop()
}()

ctx.UpdateStatus(status.Running, "")

for packet := range n.queueC {
flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source)
if err != nil {
Expand Down
Loading

0 comments on commit 8e607a3

Please sign in to comment.