Skip to content

Commit

Permalink
Filebeat modules: first step of Go implementation
Browse files Browse the repository at this point in the history
This rewrites the following parts of the prototype in Go:

* Reading of the filesets
* Evaluation of variables, including the builtin variables
* Creation of the prospectors configurations and starting them
* Overriding of the variables from the config file and from the command line
* The -module and -M CLI flags
* The ability to override variables from multiple filesets using wildcards,
  for example:
    `-M "nginx.*.var.paths=[/var/log/syslog*]"`

At this point, the system tests are still using the filebeat.py wrapper, but
only for loading the ingest node pipeline. The prospector configuration part
and the overriding of settings is done by Filebeat itself.

Part of #3159.
  • Loading branch information
Tudor Golubenco committed Jan 16, 2017
1 parent 5af5f3e commit ea58bdb
Show file tree
Hide file tree
Showing 27 changed files with 1,261 additions and 169 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
/beats.iml
*.dev.yml
*.generated.yml
coverage.out

# Editor swap files
*.swp
Expand Down
30 changes: 26 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package beater

import (
"errors"
"flag"
"fmt"
"sync"
Expand All @@ -11,6 +12,7 @@ import (

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/fileset"
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
Expand All @@ -20,8 +22,9 @@ var once = flag.Bool("once", false, "Run filebeat only once until all harvesters

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
config *cfg.Config
done chan struct{}
config *cfg.Config
moduleRegistry *fileset.ModuleRegistry
done chan struct{}
}

// New creates a new Filebeat pointer instance.
Expand All @@ -30,13 +33,32 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
if err := rawConfig.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}

moduleRegistry, err := fileset.NewModuleRegistry(config.Modules)
if err != nil {
return nil, err
}

moduleProspectors, err := moduleRegistry.GetProspectorConfigs()
if err != nil {
return nil, err
}

if err := config.FetchConfigs(); err != nil {
return nil, err
}

// Add prospectors created by the modules
config.Prospectors = append(config.Prospectors, moduleProspectors...)

if len(config.Prospectors) == 0 {
return nil, errors.New("No prospectors defined. What files do you want me to watch?")
}

fb := &Filebeat{
done: make(chan struct{}),
config: &config,
done: make(chan struct{}),
config: &config,
moduleRegistry: moduleRegistry,
}
return fb, nil
}
Expand Down
8 changes: 1 addition & 7 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"errors"
"log"
"os"
"path/filepath"
Expand All @@ -26,6 +25,7 @@ type Config struct {
RegistryFile string `config:"registry_file"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
}

var (
Expand Down Expand Up @@ -125,11 +125,5 @@ func (config *Config) FetchConfigs() error {
return err
}

if len(config.Prospectors) == 0 {
err := errors.New("No paths given. What files do you want me to watch?")
log.Fatalf("%v", err)
return err
}

return nil
}
66 changes: 20 additions & 46 deletions filebeat/filebeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main():
print("You need to specify at least a module")
sys.exit(1)

load_dashboards(args)
# load_dashboards(args)
load_datasets(args, modules)


Expand All @@ -49,7 +49,6 @@ def load_dashboards(args):


def load_datasets(args, modules):
prospectors = ""
for module in modules:
path = os.path.join("module", module)
if not os.path.isdir(path):
Expand All @@ -63,56 +62,45 @@ def load_datasets(args, modules):
print("Found filesets: {}".format(filesets))

for fileset in filesets:
prospectors += load_fileset(args, module, fileset,
os.path.join(path, fileset))
load_fileset(args, module, fileset,
os.path.join(path, fileset))

print("Generated configuration: {}".format(prospectors))
run_filebeat(args, prospectors)
run_filebeat(args)


def load_fileset(args, module, fileset, path):
manifest = yaml.load(file(os.path.join(path, "manifest.yml"), "r"))
var = evaluate_vars(args, manifest["vars"], module, fileset)
var = evaluate_vars(args, manifest["var"], module, fileset)
var["beat"] = dict(module=module, fileset=fileset, path=path, args=args)
print("Evaluated variables: {}".format(var))

load_pipeline(var, manifest["ingest_pipeline"])
generate_prospectors(var, manifest["prospectors"])

return var["beat"]["prospectors"]


def evaluate_vars(args, var_in, module, fileset):
var = {
"builtin": get_builtin_vars()
}
for name, vals in var_in.items():
for vals in var_in:
name = vals["name"]
var[name] = vals["default"]
if sys.platform == "darwin" and "os.darwin" in vals:
var[name] = vals["os.darwin"]
elif sys.platform == "windows" and "os.windows" in vals:
var[name] = vals["os.windows"]

if isinstance(var[name], basestring):
var[name] = Template(var[name]).render(var)
var[name] = apply_template(var[name], var)
elif isinstance(var[name], list):
# only supports array of strings atm
var[name] = [Template(x).render(var) for x in var[name]]
var[name] = [apply_template(x, var) for x in var[name]]

# overrides
if args.M is not None:
for pair in args.M:
key, val = pair.partition("=")[::2]
if key.startswith("{}.{}.".format(module, fileset)):
key = key[len("{}.{}.".format(module, fileset)):]
return var

# this is a hack in the prototype only, because
# here we don't know the type of each variable type.
if key == "paths":
val = val.split(",")
var[key] = val

return var
def apply_template(tpl, var):
tpl = tpl.replace("{{.", "{{") # Go templates
return Template(tpl).render(var)


def get_builtin_vars():
Expand All @@ -126,7 +114,7 @@ def get_builtin_vars():


def load_pipeline(var, pipeline):
path = os.path.join(var["beat"]["path"], Template(pipeline).render(var))
path = os.path.join(var["beat"]["path"], apply_template(pipeline, var))
print("Loading ingest pipeline: {}".format(path))
var["beat"]["pipeline_id"] = var["beat"]["module"] + '-' + var["beat"]["fileset"] + \
'-' + os.path.splitext(os.path.basename(path))[0]
Expand All @@ -144,11 +132,8 @@ def load_pipeline(var, pipeline):
sys.exit(1)


def run_filebeat(args, prospectors):
def run_filebeat(args):
cfg_template = """
filebeat.prospectors:
{{prospectors}}
output.elasticsearch.hosts: ["{{es}}"]
output.elasticsearch.pipeline: "%{[fields.pipeline_id]}"
"""
Expand All @@ -165,33 +150,22 @@ def run_filebeat(args, prospectors):
text=True)
with open(fname, "w") as cfgfile:
cfgfile.write(Template(cfg_template).render(
dict(prospectors=prospectors, es=args.es)))
dict(es=args.es)))
print("Wrote configuration file: {}".format(cfgfile.name))
os.close(fd)

cmd = ["./filebeat.test", "-systemTest",
"-modules", args.modules,
"-e", "-c", cfgfile.name, "-d", "*"]
for override in args.M:
cmd.extend(["-M", override])
if args.once:
cmd.extend(["-M", "*.*.prospector.close_eof=true"])
cmd.append("-once")
print("Starting filebeat: " + " ".join(cmd))

subprocess.Popen(cmd).wait()


def generate_prospectors(var, prospectors):
var["beat"]["prospectors"] = ""
for pr in prospectors:
path = os.path.join(var["beat"]["path"], Template(pr).render(var))
with open(path, "r") as f:
contents = Template(f.read()).render(var)
if var["beat"]["args"].once:
contents += "\n close_eof: true"
contents += "\n scan_frequency: 0.2s"
if "multiline" in contents:
contents += "\n multiline.timeout: 0.2s"

var["beat"]["prospectors"] += "\n" + contents


if __name__ == "__main__":
sys.exit(main())
19 changes: 19 additions & 0 deletions filebeat/fileset/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package fileset

// ModuleConfig contains the configuration file options for a module
type ModuleConfig struct {
Module string `config:"module" validate:"required"`
Enabled *bool `config:"enabled"`

// Filesets is inlined by code, see mcfgFromConfig
Filesets map[string]*FilesetConfig
}

// FilesetConfig contains the configuration file options for a fileset
type FilesetConfig struct {
Enabled *bool `config:"enabled"`
Var map[string]interface{} `config:"var"`
Prospector map[string]interface{} `config:"prospector"`
}

var defaultFilesetConfig = FilesetConfig{}
Loading

0 comments on commit ea58bdb

Please sign in to comment.