diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 28b49fb98146..386d4c0fa4a6 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506] - Add udp prospector type. {pull}4452[4452] - Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546] +- Add module config reloading mechanism {pull}4566[4566] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 2339507e3fbd..77b357ba4a2c 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -262,8 +262,14 @@ filebeat.prospectors: #filebeat.shutdown_timeout: 0 # Enable filebeat config reloading -#filebeat.config.prospectors: - #enabled: false - #path: configs/*.yml - #reload.enabled: true - #reload.period: 10s +#filebeat.config: + #prospectors: + #enabled: false + #path: prospectors.d/*.yml + #reload.enabled: true + #reload.period: 10s + #modules: + #enabled: false + #path: modules.d/*.yml + #reload.enabled: true + #reload.period: 10s diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 1d092288a1e7..a2f1c3fadad6 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -67,7 +67,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } } - if !config.ConfigProspector.Enabled() && !haveEnabledProspectors { + if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors { if !b.InSetupCmd { return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?") } else { @@ -76,7 +76,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } } - if *once && config.ConfigProspector.Enabled() { + if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() { return nil, errors.New("prospector configs and -once cannot be used together") } @@ -176,7 +176,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once) + outlet := channel.NewOutlet(fb.done, spooler.Channel, wgEvents) + crawler, err := crawler.New(outlet, config.Prospectors, b.Info.Version, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err @@ -218,7 +219,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { spooler.Stop() }() - err = crawler.Start(registrar, config.ConfigProspector) + err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules) if err != nil { crawler.Stop() return err diff --git a/filebeat/config/config.go b/filebeat/config/config.go index e69b6b77f087..eec92518c597 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -28,6 +28,7 @@ type Config struct { ShutdownTimeout time.Duration `config:"shutdown_timeout"` Modules []*common.Config `config:"modules"` ConfigProspector *common.Config `config:"config.prospectors"` + ConfigModules *common.Config `config:"config.modules"` } var ( diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 5a54784085aa..96bffe372667 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/fileset" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" @@ -20,22 +21,24 @@ type Crawler struct { wg sync.WaitGroup reloader *cfgfile.Reloader once bool + beatVersion string beatDone chan struct{} } -func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) { +func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) { return &Crawler{ out: out, prospectors: map[uint64]*prospector.Prospector{}, prospectorConfigs: prospectorConfigs, once: once, + beatVersion: beatVersion, beatDone: beatDone, }, nil } // Start starts the crawler with all prospectors -func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config) error { +func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config) error { logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) @@ -57,6 +60,17 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config }() } + if configModules.Enabled() { + logp.Beta("Loading separate prospectors is enabled.") + + c.reloader = cfgfile.NewReloader(configModules) + // TODO add beatVersion here + factory := fileset.NewFactory(c.out, r, "", c.beatDone) + go func() { + c.reloader.Run(factory) + }() + } + logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors)) return nil diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index d4bcf8a60a4b..816adeb4ea46 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -431,11 +431,17 @@ filebeat.prospectors: #filebeat.shutdown_timeout: 0 # Enable filebeat config reloading -#filebeat.config.prospectors: - #enabled: false - #path: configs/*.yml - #reload.enabled: true - #reload.period: 10s +#filebeat.config: + #prospectors: + #enabled: false + #path: prospectors.d/*.yml + #reload.enabled: true + #reload.period: 10s + #modules: + #enabled: false + #path: modules.d/*.yml + #reload.enabled: true + #reload.period: 10s #================================ General ====================================== diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go new file mode 100644 index 000000000000..7b223f69d41e --- /dev/null +++ b/filebeat/fileset/factory.go @@ -0,0 +1,86 @@ +package fileset + +import ( + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/mitchellh/hashstructure" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/prospector" + "github.com/elastic/beats/filebeat/registrar" +) + +// Factory is a factory for registrars +type Factory struct { + outlet channel.Outleter + registrar *registrar.Registrar + beatVersion string + beatDone chan struct{} +} + +// Wrap an array of prospectors and implements cfgfile.Runner interface +type prospectorsRunner struct { + id uint64 + prospectors []*prospector.Prospector +} + +// NewFactory instantiates a new Factory +func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, beatDone chan struct{}) *Factory { + return &Factory{ + outlet: outlet, + registrar: registrar, + beatVersion: beatVersion, + beatDone: beatDone, + } +} + +// Create creates a module based on a config +func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) { + // Start a registry of one module: + m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion) + if err != nil { + return nil, err + } + + pConfigs, err := m.GetProspectorConfigs() + if err != nil { + return nil, err + } + + // Hash module ID + var h map[string]interface{} + c.Unpack(&h) + id, err := hashstructure.Hash(h, nil) + if err != nil { + return nil, err + } + + prospectors := make([]*prospector.Prospector, len(pConfigs)) + for i, pConfig := range pConfigs { + prospectors[i], err = prospector.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates()) + if err != nil { + logp.Err("Error creating prospector: %s", err) + return nil, err + } + } + + return &prospectorsRunner{ + id: id, + prospectors: prospectors, + }, nil +} + +func (p *prospectorsRunner) Start() { + for _, prospector := range p.prospectors { + prospector.Start() + } +} +func (p *prospectorsRunner) Stop() { + for _, prospector := range p.prospectors { + prospector.Stop() + } +} +func (p *prospectorsRunner) ID() uint64 { + return p.id +} diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index cfe5fb64d54c..4961a452cf98 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -98,7 +98,7 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg filebeat.publish_async: {{publish_async}} {% if reload or reload_path -%} -filebeat.config.prospectors: +filebeat.config.{{ reload_type|default("prospectors") }}: enabled: true path: {{ reload_path }} {% if reload -%} diff --git a/filebeat/tests/system/module/test/test/config/test.yml b/filebeat/tests/system/module/test/test/config/test.yml new file mode 100644 index 000000000000..21548bbaf93d --- /dev/null +++ b/filebeat/tests/system/module/test/test/config/test.yml @@ -0,0 +1,5 @@ +type: log +paths: +{{ range $i, $path := .paths }} + - {{$path}} +{{ end }} diff --git a/filebeat/tests/system/module/test/test/ingest/pipeline.json b/filebeat/tests/system/module/test/test/ingest/pipeline.json new file mode 100644 index 000000000000..0967ef424bce --- /dev/null +++ b/filebeat/tests/system/module/test/test/ingest/pipeline.json @@ -0,0 +1 @@ +{} diff --git a/filebeat/tests/system/module/test/test/manifest.yml b/filebeat/tests/system/module/test/test/manifest.yml new file mode 100644 index 000000000000..8b90842943f1 --- /dev/null +++ b/filebeat/tests/system/module/test/test/manifest.yml @@ -0,0 +1,9 @@ +module_version: "1.0" + +var: + - name: paths + default: + - test.log + +ingest_pipeline: ingest/pipeline.json +prospector: config/test.yml diff --git a/filebeat/tests/system/test_reload_modules.py b/filebeat/tests/system/test_reload_modules.py new file mode 100644 index 000000000000..d6b3f183ff52 --- /dev/null +++ b/filebeat/tests/system/test_reload_modules.py @@ -0,0 +1,152 @@ +import re +import sys +import unittest +import os +import shutil +import time +from filebeat import BaseTest + + +moduleConfigTemplate = """ +- module: test + test: + enabled: true + var.paths: + - {} + prospector: + scan_frequency: 1s + auth: + enabled: false +""" + + +class Test(BaseTest): + + def setUp(self): + super(BaseTest, self).setUp() + # Copy system module + shutil.copytree(os.path.join("module", "test"), + os.path.join(self.working_dir, "module", "test")) + + def test_reload(self): + """ + Test modules basic reload + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/system.yml.test", 'w') as f: + f.write(moduleConfigTemplate.format(self.working_dir + "/logs/*")) + os.rename(self.working_dir + "/configs/system.yml.test", + self.working_dir + "/configs/system.yml") + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + self.wait_until(lambda: self.output_lines() > 0) + assert self.output_has_message("Hello world") + proc.check_kill_and_wait() + + def test_start_stop(self): + """ + Test basic modules start and stop + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/system.yml.test", 'w') as f: + f.write(moduleConfigTemplate.format(self.working_dir + "/logs/*")) + os.rename(self.working_dir + "/configs/system.yml.test", + self.working_dir + "/configs/system.yml") + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + self.wait_until(lambda: self.output_lines() == 1, max_timeout=10) + print(self.output_lines()) + + # Remove prospector + with open(self.working_dir + "/configs/system.yml", 'w') as f: + f.write("") + + # Wait until prospector is stopped + self.wait_until( + lambda: self.log_contains("Runner stopped:"), + max_timeout=15) + + with open(logfile, 'a') as f: + f.write("Hello world\n") + + # Wait to give a change to pick up the new line (it shouldn't) + time.sleep(1) + + self.wait_until(lambda: self.output_lines() == 1, max_timeout=5) + proc.check_kill_and_wait() + + def test_load_configs(self): + """ + Test loading separate module configs + """ + self.render_config_template( + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + ) + + os.mkdir(self.working_dir + "/logs/") + os.mkdir(self.working_dir + "/configs/") + logfile1 = self.working_dir + "/logs/test1.log" + logfile2 = self.working_dir + "/logs/test2.log" + + with open(self.working_dir + "/configs/module1.yml", 'w') as f: + f.write(moduleConfigTemplate.format( + self.working_dir + "/logs/test1.log")) + + with open(self.working_dir + "/configs/module2.yml", 'w') as f: + f.write(moduleConfigTemplate.format( + self.working_dir + "/logs/test2.log")) + + proc = self.start_beat() + + with open(logfile1, 'w') as f: + f.write("Hello 1\n") + + self.wait_until(lambda: self.output_lines() == 1) + + with open(logfile2, 'w') as f: + f.write("Hello 2\n") + + self.wait_until(lambda: self.output_lines() == 2) + + output = self.read_output() + + # Reloading stopped. + self.wait_until( + lambda: self.log_contains("Loading of config files completed."), + max_timeout=15) + + # Make sure the correct lines were picked up + assert self.output_lines() == 2 + assert output[0]["message"] == "Hello 1" + assert output[1]["message"] == "Hello 2" + proc.check_kill_and_wait() diff --git a/filebeat/tests/system/test_reload.py b/filebeat/tests/system/test_reload_prospectors.py similarity index 98% rename from filebeat/tests/system/test_reload.py rename to filebeat/tests/system/test_reload_prospectors.py index 084d2a01eab5..d9db94c8c50f 100644 --- a/filebeat/tests/system/test_reload.py +++ b/filebeat/tests/system/test_reload_prospectors.py @@ -18,7 +18,7 @@ class Test(BaseTest): def test_reload(self): """ - Test basic reload + Test basic prospectors reload """ self.render_config_template( reload=True, @@ -43,7 +43,7 @@ def test_reload(self): def test_start_stop(self): """ - Test basic start and stop + Test basic prospectors start and stop """ self.render_config_template( reload=True, @@ -86,7 +86,7 @@ def test_start_stop(self): def test_start_stop_replace(self): """ - Test basic start and replace with an other prospecto + Test basic start and replace with another prospector """ self.render_config_template( reload=True,