Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent to start filebeat when STDIN is used with other inputs #6463

Merged
merged 1 commit into from
Mar 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
the system test. {pull}6121[6121]
- Add IIS module to parse access log and error log. {pull}6127[6127]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Add validation for Stdin, when Filebeat is configured with Stdin and any other inputs, Filebeat
will now refuses to start. {pull}6463[6463]

*Heartbeat*

Expand Down
15 changes: 9 additions & 6 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package beater
import (
"flag"
"fmt"
"strings"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
Expand Down Expand Up @@ -93,12 +94,10 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Add inputs created by the modules
config.Inputs = append(config.Inputs, moduleInputs...)

haveEnabledInputs := false
for _, input := range config.Inputs {
if input.Enabled() {
haveEnabledInputs = true
break
}
enabledInputs := config.ListEnabledInputs()
var haveEnabledInputs bool
if len(enabledInputs) > 0 {
haveEnabledInputs = true
}

if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil {
Expand All @@ -114,6 +113,10 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, errors.New("input configs and -once cannot be used together")
}

if config.IsInputEnabled("stdin") && len(enabledInputs) > 1 {
return nil, fmt.Errorf("stdin requires to be run in exclusive mode, configured inputs: %s", strings.Join(enabledInputs, ", "))
}

fb := &Filebeat{
done: make(chan struct{}),
config: &config,
Expand Down
28 changes: 28 additions & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"path/filepath"
"sort"
"time"

"github.com/elastic/beats/libbeat/autodiscover"
Expand Down Expand Up @@ -132,3 +133,30 @@ func (config *Config) FetchConfigs() error {

return nil
}

// ListEnabledInputs returns a list of enabled inputs sorted by alphabetical order.
func (config *Config) ListEnabledInputs() []string {
t := struct {
Type string `config:"type"`
}{}
var inputs []string
for _, input := range config.Inputs {
if input.Enabled() {
input.Unpack(&t)
inputs = append(inputs, t.Type)
}
}
sort.Strings(inputs)
return inputs
}

// IsInputEnabled returns true if the plugin name is enabled.
func (config *Config) IsInputEnabled(name string) bool {
enabledInputs := config.ListEnabledInputs()
for _, input := range enabledInputs {
if name == input {
return true
}
}
return false
}
79 changes: 79 additions & 0 deletions filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
)

func TestReadConfig2(t *testing.T) {
Expand Down Expand Up @@ -95,3 +96,81 @@ func TestMergeConfigFiles(t *testing.T) {

assert.Equal(t, 4, len(config.Inputs))
}

func TestEnabledInputs(t *testing.T) {
stdinEnabled, err := common.NewConfigFrom(map[string]interface{}{
"type": "stdin",
"enabled": true,
})
if !assert.NoError(t, err) {
return
}

udpDisabled, err := common.NewConfigFrom(map[string]interface{}{
"type": "udp",
"enabled": false,
})
if !assert.NoError(t, err) {
return
}

logDisabled, err := common.NewConfigFrom(map[string]interface{}{
"type": "log",
"enabled": false,
})
if !assert.NoError(t, err) {
return
}

t.Run("ListEnabledInputs", func(t *testing.T) {
tests := []struct {
name string
config *Config
expected []string
}{
{
name: "all inputs disabled",
config: &Config{Inputs: []*common.Config{udpDisabled, logDisabled}},
expected: []string{},
},
{
name: "all inputs enabled",
config: &Config{Inputs: []*common.Config{stdinEnabled}},
expected: []string{"stdin"},
},
{
name: "disabled and enabled inputs",
config: &Config{Inputs: []*common.Config{stdinEnabled, udpDisabled, logDisabled}},
expected: []string{"stdin"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.ElementsMatch(t, test.expected, test.config.ListEnabledInputs())
})
}
})

t.Run("IsInputEnabled", func(t *testing.T) {
config := &Config{Inputs: []*common.Config{stdinEnabled, udpDisabled, logDisabled}}

tests := []struct {
name string
input string
expected bool
config *Config
}{
{name: "input exists and enabled", input: "stdin", expected: true, config: config},
{name: "input exists and disabled", input: "udp", expected: false, config: config},
{name: "input doesn't exist", input: "redis", expected: false, config: config},
{name: "no inputs are enabled", input: "redis", expected: false, config: &Config{}},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expected, config.IsInputEnabled(test.input))
})
}
})
}
2 changes: 2 additions & 0 deletions filebeat/docs/inputs/input-stdin.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

Use the `stdin` input to read events from standard in.

Note: This input cannot be run at the same time with other input types.

Example configuration:

["source","yaml",subs="attributes"]
Expand Down
71 changes: 0 additions & 71 deletions filebeat/tests/system/test_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,77 +76,6 @@ def test_not_ignore_old_files(self):
objs = self.read_output()
assert len(objs) == 5

def test_stdin(self):
"""
Test stdin input. Checks if reading is continued after the first read.
"""
self.render_config_template(
type="stdin"
)

proc = self.start_beat()

self.wait_until(
lambda: self.log_contains(
"Harvester started for file: -"),
max_timeout=10)

iterations1 = 5
for n in range(0, iterations1):
os.write(proc.stdin_write, "Hello World\n")

self.wait_until(
lambda: self.output_has(lines=iterations1),
max_timeout=15)

iterations2 = 10
for n in range(0, iterations2):
os.write(proc.stdin_write, "Hello World\n")

self.wait_until(
lambda: self.output_has(lines=iterations1 + iterations2),
max_timeout=15)

proc.check_kill_and_wait()

objs = self.read_output()
assert len(objs) == iterations1 + iterations2

def test_stdin_eof(self):
"""
Test that Filebeat works when stdin is closed.
"""
self.render_config_template(
type="stdin",
close_eof="true",
)

args = [self.test_binary,
"-systemTest",
"-test.coverprofile",
os.path.join(self.working_dir, "coverage.cov"),
"-c", os.path.join(self.working_dir, "filebeat.yml"),
"-e", "-v", "-d", "*",
]
proc = Proc(args, os.path.join(self.working_dir, "filebeat.log"))
os.write(proc.stdin_write, "Hello World\n")

proc.start()
self.wait_until(lambda: self.output_has(lines=1))

# Continue writing after end was reached
os.write(proc.stdin_write, "Hello World2\n")
os.close(proc.stdin_write)

self.wait_until(lambda: self.output_has(lines=2))

proc.proc.terminate()
proc.proc.wait()

objs = self.read_output()
assert objs[0]["message"] == "Hello World"
assert objs[1]["message"] == "Hello World2"

def test_rotating_close_inactive_larger_write_rate(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
Expand Down
106 changes: 106 additions & 0 deletions filebeat/tests/system/test_stdin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python

from filebeat import BaseTest
import os

from beat.beat import Proc

"""
Tests for the stdin functionality.
"""


class Test(BaseTest):

def test_stdin(self):
"""
Test stdin input. Checks if reading is continued after the first read.
"""
self.render_config_template(
type="stdin"
)

proc = self.start_beat()

self.wait_until(
lambda: self.log_contains(
"Harvester started for file: -"),
max_timeout=10)

iterations1 = 5
for n in range(0, iterations1):
os.write(proc.stdin_write, "Hello World\n")

self.wait_until(
lambda: self.output_has(lines=iterations1),
max_timeout=15)

iterations2 = 10
for n in range(0, iterations2):
os.write(proc.stdin_write, "Hello World\n")

self.wait_until(
lambda: self.output_has(lines=iterations1 + iterations2),
max_timeout=15)

proc.check_kill_and_wait()

objs = self.read_output()
assert len(objs) == iterations1 + iterations2

def test_stdin_eof(self):
"""
Test that Filebeat works when stdin is closed.
"""
self.render_config_template(
type="stdin",
close_eof="true",
)

args = [self.test_binary,
"-systemTest",
"-test.coverprofile",
os.path.join(self.working_dir, "coverage.cov"),
"-c", os.path.join(self.working_dir, "filebeat.yml"),
"-e", "-v", "-d", "*",
]
proc = Proc(args, os.path.join(self.working_dir, "filebeat.log"))
os.write(proc.stdin_write, "Hello World\n")

proc.start()
self.wait_until(lambda: self.output_has(lines=1))

# Continue writing after end was reached
os.write(proc.stdin_write, "Hello World2\n")
os.close(proc.stdin_write)

self.wait_until(lambda: self.output_has(lines=2))

proc.proc.terminate()
proc.proc.wait()

objs = self.read_output()
assert objs[0]["message"] == "Hello World"
assert objs[1]["message"] == "Hello World2"

def test_stdin_is_exclusive(self):
"""
Test that Filebeat run Stdin in exclusive mode.
"""

input_raw = """
- type: stdin
enabled: true
- type: udp
host: 127.0.0.0:10000
enabled: true
"""

self.render_config_template(
input_raw=input_raw,
inputs=False,
)

filebeat = self.start_beat()
filebeat.check_wait(exit_code=1)
assert self.log_contains("Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp")