diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6a28c321add7..645d611e6030 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di the system test. {pull}6121[6121] - Add IIS module to parse access log and error log. {pull}6127[6127] - Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662] +- Add validation for Stdin, when Filebeat is configured with Stdin and any other inputs, Filebeat + will now refuses to start. {pull}6463[6463] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 5b59e62a81a3..20e253f43a81 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -3,6 +3,7 @@ package beater import ( "flag" "fmt" + "strings" "github.com/joeshaw/multierror" "github.com/pkg/errors" @@ -93,12 +94,10 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // Add inputs created by the modules config.Inputs = append(config.Inputs, moduleInputs...) - haveEnabledInputs := false - for _, input := range config.Inputs { - if input.Enabled() { - haveEnabledInputs = true - break - } + enabledInputs := config.ListEnabledInputs() + var haveEnabledInputs bool + if len(enabledInputs) > 0 { + haveEnabledInputs = true } if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil { @@ -114,6 +113,10 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { return nil, errors.New("input configs and -once cannot be used together") } + if config.IsInputEnabled("stdin") && len(enabledInputs) > 1 { + return nil, fmt.Errorf("stdin requires to be run in exclusive mode, configured inputs: %s", strings.Join(enabledInputs, ", ")) + } + fb := &Filebeat{ done: make(chan struct{}), config: &config, diff --git a/filebeat/config/config.go b/filebeat/config/config.go index df01d9a5841a..2ebeb7a8a697 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -5,6 +5,7 @@ import ( "log" "os" "path/filepath" + "sort" "time" "github.com/elastic/beats/libbeat/autodiscover" @@ -132,3 +133,30 @@ func (config *Config) FetchConfigs() error { return nil } + +// ListEnabledInputs returns a list of enabled inputs sorted by alphabetical order. +func (config *Config) ListEnabledInputs() []string { + t := struct { + Type string `config:"type"` + }{} + var inputs []string + for _, input := range config.Inputs { + if input.Enabled() { + input.Unpack(&t) + inputs = append(inputs, t.Type) + } + } + sort.Strings(inputs) + return inputs +} + +// IsInputEnabled returns true if the plugin name is enabled. +func (config *Config) IsInputEnabled(name string) bool { + enabledInputs := config.ListEnabledInputs() + for _, input := range enabledInputs { + if name == input { + return true + } + } + return false +} diff --git a/filebeat/config/config_test.go b/filebeat/config/config_test.go index c8df1b614368..6c7937836a5b 100644 --- a/filebeat/config/config_test.go +++ b/filebeat/config/config_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" ) func TestReadConfig2(t *testing.T) { @@ -95,3 +96,81 @@ func TestMergeConfigFiles(t *testing.T) { assert.Equal(t, 4, len(config.Inputs)) } + +func TestEnabledInputs(t *testing.T) { + stdinEnabled, err := common.NewConfigFrom(map[string]interface{}{ + "type": "stdin", + "enabled": true, + }) + if !assert.NoError(t, err) { + return + } + + udpDisabled, err := common.NewConfigFrom(map[string]interface{}{ + "type": "udp", + "enabled": false, + }) + if !assert.NoError(t, err) { + return + } + + logDisabled, err := common.NewConfigFrom(map[string]interface{}{ + "type": "log", + "enabled": false, + }) + if !assert.NoError(t, err) { + return + } + + t.Run("ListEnabledInputs", func(t *testing.T) { + tests := []struct { + name string + config *Config + expected []string + }{ + { + name: "all inputs disabled", + config: &Config{Inputs: []*common.Config{udpDisabled, logDisabled}}, + expected: []string{}, + }, + { + name: "all inputs enabled", + config: &Config{Inputs: []*common.Config{stdinEnabled}}, + expected: []string{"stdin"}, + }, + { + name: "disabled and enabled inputs", + config: &Config{Inputs: []*common.Config{stdinEnabled, udpDisabled, logDisabled}}, + expected: []string{"stdin"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.ElementsMatch(t, test.expected, test.config.ListEnabledInputs()) + }) + } + }) + + t.Run("IsInputEnabled", func(t *testing.T) { + config := &Config{Inputs: []*common.Config{stdinEnabled, udpDisabled, logDisabled}} + + tests := []struct { + name string + input string + expected bool + config *Config + }{ + {name: "input exists and enabled", input: "stdin", expected: true, config: config}, + {name: "input exists and disabled", input: "udp", expected: false, config: config}, + {name: "input doesn't exist", input: "redis", expected: false, config: config}, + {name: "no inputs are enabled", input: "redis", expected: false, config: &Config{}}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, config.IsInputEnabled(test.input)) + }) + } + }) +} diff --git a/filebeat/docs/inputs/input-stdin.asciidoc b/filebeat/docs/inputs/input-stdin.asciidoc index f86f8736474e..1e9b6acaf061 100644 --- a/filebeat/docs/inputs/input-stdin.asciidoc +++ b/filebeat/docs/inputs/input-stdin.asciidoc @@ -9,6 +9,8 @@ Use the `stdin` input to read events from standard in. +Note: This input cannot be run at the same time with other input types. + Example configuration: ["source","yaml",subs="attributes"] diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index eab8b963c34c..4a9dd9e58c3b 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -76,77 +76,6 @@ def test_not_ignore_old_files(self): objs = self.read_output() assert len(objs) == 5 - def test_stdin(self): - """ - Test stdin input. Checks if reading is continued after the first read. - """ - self.render_config_template( - type="stdin" - ) - - proc = self.start_beat() - - self.wait_until( - lambda: self.log_contains( - "Harvester started for file: -"), - max_timeout=10) - - iterations1 = 5 - for n in range(0, iterations1): - os.write(proc.stdin_write, "Hello World\n") - - self.wait_until( - lambda: self.output_has(lines=iterations1), - max_timeout=15) - - iterations2 = 10 - for n in range(0, iterations2): - os.write(proc.stdin_write, "Hello World\n") - - self.wait_until( - lambda: self.output_has(lines=iterations1 + iterations2), - max_timeout=15) - - proc.check_kill_and_wait() - - objs = self.read_output() - assert len(objs) == iterations1 + iterations2 - - def test_stdin_eof(self): - """ - Test that Filebeat works when stdin is closed. - """ - self.render_config_template( - type="stdin", - close_eof="true", - ) - - args = [self.test_binary, - "-systemTest", - "-test.coverprofile", - os.path.join(self.working_dir, "coverage.cov"), - "-c", os.path.join(self.working_dir, "filebeat.yml"), - "-e", "-v", "-d", "*", - ] - proc = Proc(args, os.path.join(self.working_dir, "filebeat.log")) - os.write(proc.stdin_write, "Hello World\n") - - proc.start() - self.wait_until(lambda: self.output_has(lines=1)) - - # Continue writing after end was reached - os.write(proc.stdin_write, "Hello World2\n") - os.close(proc.stdin_write) - - self.wait_until(lambda: self.output_has(lines=2)) - - proc.proc.terminate() - proc.proc.wait() - - objs = self.read_output() - assert objs[0]["message"] == "Hello World" - assert objs[1]["message"] == "Hello World2" - def test_rotating_close_inactive_larger_write_rate(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", diff --git a/filebeat/tests/system/test_stdin.py b/filebeat/tests/system/test_stdin.py new file mode 100644 index 000000000000..6a79c050d00b --- /dev/null +++ b/filebeat/tests/system/test_stdin.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +from filebeat import BaseTest +import os + +from beat.beat import Proc + +""" +Tests for the stdin functionality. +""" + + +class Test(BaseTest): + + def test_stdin(self): + """ + Test stdin input. Checks if reading is continued after the first read. + """ + self.render_config_template( + type="stdin" + ) + + proc = self.start_beat() + + self.wait_until( + lambda: self.log_contains( + "Harvester started for file: -"), + max_timeout=10) + + iterations1 = 5 + for n in range(0, iterations1): + os.write(proc.stdin_write, "Hello World\n") + + self.wait_until( + lambda: self.output_has(lines=iterations1), + max_timeout=15) + + iterations2 = 10 + for n in range(0, iterations2): + os.write(proc.stdin_write, "Hello World\n") + + self.wait_until( + lambda: self.output_has(lines=iterations1 + iterations2), + max_timeout=15) + + proc.check_kill_and_wait() + + objs = self.read_output() + assert len(objs) == iterations1 + iterations2 + + def test_stdin_eof(self): + """ + Test that Filebeat works when stdin is closed. + """ + self.render_config_template( + type="stdin", + close_eof="true", + ) + + args = [self.test_binary, + "-systemTest", + "-test.coverprofile", + os.path.join(self.working_dir, "coverage.cov"), + "-c", os.path.join(self.working_dir, "filebeat.yml"), + "-e", "-v", "-d", "*", + ] + proc = Proc(args, os.path.join(self.working_dir, "filebeat.log")) + os.write(proc.stdin_write, "Hello World\n") + + proc.start() + self.wait_until(lambda: self.output_has(lines=1)) + + # Continue writing after end was reached + os.write(proc.stdin_write, "Hello World2\n") + os.close(proc.stdin_write) + + self.wait_until(lambda: self.output_has(lines=2)) + + proc.proc.terminate() + proc.proc.wait() + + objs = self.read_output() + assert objs[0]["message"] == "Hello World" + assert objs[1]["message"] == "Hello World2" + + def test_stdin_is_exclusive(self): + """ + Test that Filebeat run Stdin in exclusive mode. + """ + + input_raw = """ +- type: stdin + enabled: true +- type: udp + host: 127.0.0.0:10000 + enabled: true +""" + + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + filebeat.check_wait(exit_code=1) + assert self.log_contains("Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp")