Skip to content

Commit

Permalink
feature: tag events that come from a filestream with take_over: true (
Browse files Browse the repository at this point in the history
#39828)

* filestream: tag events with `take_over: true`

* filestream: modify test cases

* add comments

* update documentation

* Update filebeat/input/filestream/input_test.go

Co-authored-by: Tiago Queiroz <[email protected]>

* add changelog

---------

Co-authored-by: Tiago Queiroz <[email protected]>
  • Loading branch information
VihasMakwana and belimawr authored Jun 12, 2024
1 parent f8aedce commit 328670b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Removed deprecated Sophos UTM from Beats. Use the https://docs.elastic.co/integrations/sophos[Sophos] Elastic integration instead. {pull}38037[38037]
- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]

*Heartbeat*
Expand Down
5 changes: 5 additions & 0 deletions filebeat/docs/howto/migrate-to-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,8 @@ and return to old `log` inputs the files that were taken by `filestream` inputs,
6. Run Filebeat with the old configuration (no `filestream` inputs with `take_over: true`).

NOTE: Reverting to backups might cause some events to repeat, depends on the amount of time the new configuration was running.

=== Debugging on Kibana

Events produced by `filestream` with `take_over: true` contains `take_over` tag.
You can filter on this tag in Kibana and see the events which came from a filestream in the "take over" mode.
8 changes: 8 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

const pluginName = "filestream"
Expand All @@ -61,6 +62,7 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
closerConfig closerConfig
parsers parser.Config
takeOver bool
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -101,6 +103,7 @@ func configure(cfg *conf.C) (loginp.Prospector, loginp.Harvester, error) {
encodingFactory: encodingFactory,
closerConfig: config.Close,
parsers: config.Reader.Parsers,
takeOver: config.TakeOver,
}

return prospector, filestream, nil
Expand Down Expand Up @@ -378,6 +381,11 @@ func (inp *filestream) readFromSource(

metrics.BytesProcessed.Add(uint64(message.Bytes))

// add "take_over" tag if `take_over` is set to true
if inp.takeOver {
_ = mapstr.AddTags(message.Fields, []string{"take_over"})
}

if err := p.Publish(message.ToEvent(), s); err != nil {
metrics.ProcessingErrors.Inc()
return err
Expand Down
43 changes: 43 additions & 0 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func BenchmarkFilestream(b *testing.B) {
Expand Down Expand Up @@ -115,6 +116,48 @@ paths:
})
}

func TestTakeOverTags(t *testing.T) {
testCases := []struct {
name string
takeOver bool
testFunc func(t *testing.T, event beat.Event)
}{
{
name: "test-take_over-true",
takeOver: true,
testFunc: func(t *testing.T, event beat.Event) {
tags, err := event.GetValue("tags")
require.NoError(t, err)
require.Contains(t, tags, "take_over")
},
},
{
name: "test-take_over-false",
takeOver: false,
testFunc: func(t *testing.T, event beat.Event) {
_, err := event.GetValue("tags")
require.ErrorIs(t, err, mapstr.ErrKeyNotFound)
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
filename := generateFile(t, t.TempDir(), 5)
cfg := fmt.Sprintf(`
type: filestream
prospector.scanner.check_interval: 1s
take_over: %t
paths:
- %s`, testCase.takeOver, filename)
runner := createFilestreamTestRunner(context.Background(), t, testCase.name, cfg, 5, true)
events := runner(t)
for _, event := range events {
testCase.testFunc(t, event)
}
})
}
}

// runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline.
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
Expand Down

0 comments on commit 328670b

Please sign in to comment.