Skip to content

Commit

Permalink
Load pipelines on new module instances
Browse files Browse the repository at this point in the history
  • Loading branch information
exekias committed Jul 5, 2017
1 parent 9ba3d40 commit 798f41b
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 23 deletions.
23 changes: 18 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
_ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the Ingest Node pipelines or are using Logstash pipelines, you" +
" can ignore this warning."

var (
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
)
Expand Down Expand Up @@ -99,10 +104,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the Ingest Node pipelines or are using Logstash pipelines, you" +
" can ignore this warning.")
logp.Warn(pipelinesWarning)
return nil
}

Expand Down Expand Up @@ -219,7 +221,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
spooler.Stop()
}()

err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules)
var esClient fileset.PipelineLoader
if b.Config.Output.Name() == "elasticsearch" {
esConfig := b.Config.Output.Config()
esClient, err = elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return errors.Wrap(err, "Error creating Elasticsearch client")
}
} else {
logp.Warn(pipelinesWarning)
}

err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, esClient)
if err != nil {
crawler.Stop()
return err
Expand Down
4 changes: 2 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion s
}

// Start starts the crawler with all prospectors
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config) error {
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config, pipelineLoader fileset.PipelineLoader) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand All @@ -64,7 +64,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config
logp.Beta("Loading separate modules is enabled.")

c.reloader = cfgfile.NewReloader(configModules)
factory := fileset.NewFactory(c.out, r, c.beatVersion, c.beatDone)
factory := fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoader, c.beatDone)
go func() {
c.reloader.Run(factory)
}()
Expand Down
50 changes: 36 additions & 14 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,39 @@ import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
)

// Factory is a factory for registrars
// Factory for modules
type Factory struct {
outlet channel.Outleter
registrar *registrar.Registrar
beatVersion string
beatDone chan struct{}
outlet channel.Outleter
registrar *registrar.Registrar
beatVersion string
pipelineLoader PipelineLoader
beatDone chan struct{}
}

// Wrap an array of prospectors and implements cfgfile.Runner interface
type prospectorsRunner struct {
id uint64
prospectors []*prospector.Prospector
id uint64
moduleRegistry *ModuleRegistry
prospectors []*prospector.Prospector
pipelineLoader PipelineLoader
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, beatDone chan struct{}) *Factory {
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, pipelineLoader PipelineLoader, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoader: pipelineLoader,
}
}

Expand Down Expand Up @@ -66,12 +71,29 @@ func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
}

return &prospectorsRunner{
id: id,
prospectors: prospectors,
id: id,
moduleRegistry: m,
prospectors: prospectors,
pipelineLoader: f.pipelineLoader,
}, nil
}

func (p *prospectorsRunner) Start() {
// Load pipelines
if p.pipelineLoader != nil {
// Setup a callback & load now too, as we are already connected
callback := func(esClient *elasticsearch.Client) error {
return p.moduleRegistry.LoadPipelines(p.pipelineLoader)
}
elasticsearch.RegisterConnectCallback(callback)

err := p.moduleRegistry.LoadPipelines(p.pipelineLoader)
if err != nil {
// Log error and continue
logp.Err("Error loading pipeline: %s", err)
}
}

for _, prospector := range p.prospectors {
prospector.Start()
}
Expand Down
4 changes: 4 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,12 @@ processors:
#------------------------------- Elasticsearch output ----------------------------
output.elasticsearch:
hosts: ["{{ elasticsearch.host }}"]
{% if elasticsearch.pipeline %}
pipeline: {{elasticsearch.pipeline}}
{% endif %}
{% if elasticsearch.index %}
index: {{elasticsearch.index}}
{% endif %}
{%- elif logstash %}
#------------------------------- Logstash output ---------------------------------
output.logstash:
Expand Down
8 changes: 8 additions & 0 deletions filebeat/tests/system/module/test/test/ingest/default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"description": "Test pipeline.",
"processors": [{
"remove":{
"field": "message"
}
}]
}

This file was deleted.

2 changes: 1 addition & 1 deletion filebeat/tests/system/module/test/test/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ var:
default:
- test.log

ingest_pipeline: ingest/pipeline.json
ingest_pipeline: ingest/default.json
prospector: config/test.yml
36 changes: 36 additions & 0 deletions filebeat/tests/system/test_reload_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import os
import shutil
import time

from filebeat import BaseTest
from beat.beat import INTEGRATION_TESTS
from elasticsearch import Elasticsearch


moduleConfigTemplate = """
Expand All @@ -24,6 +27,7 @@ class Test(BaseTest):

def setUp(self):
super(BaseTest, self).setUp()
self.es = Elasticsearch([self.get_elasticsearch_url()])
# Copy system module
shutil.copytree(os.path.join("module", "test"),
os.path.join(self.working_dir, "module", "test"))
Expand Down Expand Up @@ -57,6 +61,38 @@ def test_reload(self):
assert self.output_has_message("Hello world")
proc.check_kill_and_wait()

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
def test_reload_writes_pipeline(self):
"""
Test modules reload brings pipelines
"""
self.render_config_template(
reload=True,
reload_path=self.working_dir + "/configs/*.yml",
reload_type="modules",
prospectors=False,
elasticsearch={"host": self.get_elasticsearch_url()}
)

proc = self.start_beat()

os.mkdir(self.working_dir + "/logs/")
logfile = self.working_dir + "/logs/test.log"
os.mkdir(self.working_dir + "/configs/")

with open(self.working_dir + "/configs/system.yml.test", 'w') as f:
f.write(moduleConfigTemplate.format(self.working_dir + "/logs/*"))
os.rename(self.working_dir + "/configs/system.yml.test",
self.working_dir + "/configs/system.yml")

with open(logfile, 'w') as f:
f.write("Hello world\n")

# Check pipeline is present
self.wait_until(lambda: any(re.match("filebeat-.*-test-test-default", key)
for key in self.es.transport.perform_request("GET", "/_ingest/pipeline/").keys()))
proc.check_kill_and_wait()

def test_start_stop(self):
"""
Test basic modules start and stop
Expand Down

0 comments on commit 798f41b

Please sign in to comment.