Skip to content

Commit

Permalink
Decouple fileset from inputs (#16701)
Browse files Browse the repository at this point in the history
* Make fileset package independent of filebeat.input

The fileset package used to depend on the filebeat.input package
to generate and load actual inputs. This change removes the dependency,
by passing a cfgfile.RunnerFactory instance to the fileset factory
constructor.
  • Loading branch information
Steffen Siering authored Mar 4, 2020
1 parent c858c7e commit b5465d8
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 68 deletions.
3 changes: 2 additions & 1 deletion filebeat/autodiscover/builder/hints/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
"github.com/elastic/beats/v7/libbeat/autodiscover/template"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewLogHints(cfg *common.Config) (autodiscover.Builder, error) {
return nil, fmt.Errorf("unable to unpack hints config due to error: %v", err)
}

moduleRegistry, err := fileset.NewModuleRegistry([]*common.Config{}, "", false)
moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false)
if err != nil {
return nil, err
}
Expand Down
12 changes: 3 additions & 9 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, err
}

moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info.Version, true)
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
// When running the subcommand setup, configuration from modules.d directories
// have to be loaded using cfg.Reloader. Otherwise those configurations are skipped.
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
modulesFactory := fileset.NewSetupFactory(b.Info.Version, pipelineLoaderFactory)
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory)
if fb.config.ConfigModules.Enabled() {
modulesLoader := cfgfile.NewReloader(b.Publisher, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
Expand Down Expand Up @@ -245,13 +245,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

inputLoader := input.NewRunnerFactory(pipelineConnector, registrar, fb.done)
moduleLoader := fileset.NewFactory(pipelineConnector,
registrar,
b.Info.Version,
pipelineLoaderFactory,
config.OverwritePipelines,
fb.done,
)
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)

crawler, err := newCrawler(inputLoader, moduleLoader, pipelineConnector, config.Inputs, fb.done, *once)
if err != nil {
Expand Down
32 changes: 14 additions & 18 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package fileset
import (
"github.com/gofrs/uuid"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -43,33 +40,33 @@ func init() {

// Factory for modules
type Factory struct {
outlet channel.Factory
registrar *registrar.Registrar
beatVersion string
beatInfo beat.Info
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
pipelineCallbackID uuid.UUID
beatDone chan struct{}
inputFactory cfgfile.RunnerFactory
}

// Wrap an array of inputs and implements cfgfile.Runner interface
type inputsRunner struct {
id uint64
moduleRegistry *ModuleRegistry
inputs []*input.Runner
inputs []cfgfile.Runner
pipelineLoaderFactory PipelineLoaderFactory
pipelineCallbackID uuid.UUID
overwritePipelines bool
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string,
pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, beatDone chan struct{}) *Factory {
func NewFactory(
inputFactory cfgfile.RunnerFactory,
beatInfo beat.Info,
pipelineLoaderFactory PipelineLoaderFactory,
overwritePipelines bool,
) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
inputFactory: inputFactory,
beatInfo: beatInfo,
pipelineLoaderFactory: pipelineLoaderFactory,
pipelineCallbackID: uuid.Nil,
overwritePipelines: overwritePipelines,
Expand All @@ -79,7 +76,7 @@ func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVers
// Create creates a module based on a config
func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion, false)
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
if err != nil {
return nil, err
}
Expand All @@ -97,10 +94,9 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
return nil, err
}

inputs := make([]*input.Runner, len(pConfigs))
connector := f.outlet(p)
inputs := make([]cfgfile.Runner, len(pConfigs))
for i, pConfig := range pConfigs {
inputs[i], err = input.New(pConfig, connector, f.beatDone, f.registrar.GetStates(), meta)
inputs[i], err = f.inputFactory.Create(p, pConfig, meta)
if err != nil {
logp.Err("Error creating input: %s", err)
return nil, err
Expand Down
34 changes: 17 additions & 17 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
// specific language governing permissions and limitations
// under the License.

/*
Package fileset contains the code that loads Filebeat modules (which are
composed of filesets).
*/

// Package fileset contains the code that loads Filebeat modules (which are
// composed of filesets).
package fileset

import (
Expand All @@ -39,6 +36,7 @@ import (
errw "github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -69,7 +67,7 @@ func New(

modulePath := filepath.Join(modulesPath, mcfg.Module)
if _, err := os.Stat(modulePath); os.IsNotExist(err) {
return nil, fmt.Errorf("Module %s (%s) doesn't exist.", mcfg.Module, modulePath)
return nil, fmt.Errorf("module %s (%s) doesn't exist", mcfg.Module, modulePath)
}

return &Fileset{
Expand All @@ -86,19 +84,19 @@ func (fs *Fileset) String() string {
}

// Read reads the manifest file and evaluates the variables.
func (fs *Fileset) Read(beatVersion string) error {
func (fs *Fileset) Read(info beat.Info) error {
var err error
fs.manifest, err = fs.readManifest()
if err != nil {
return err
}

fs.vars, err = fs.evaluateVars(beatVersion)
fs.vars, err = fs.evaluateVars(info)
if err != nil {
return err
}

fs.pipelineIDs, err = fs.getPipelineIDs(beatVersion)
fs.pipelineIDs, err = fs.getPipelineIDs(info)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,10 +151,10 @@ func (fs *Fileset) readManifest() (*manifest, error) {
}

// evaluateVars resolves the fileset variables.
func (fs *Fileset) evaluateVars(beatVersion string) (map[string]interface{}, error) {
func (fs *Fileset) evaluateVars(info beat.Info) (map[string]interface{}, error) {
var err error
vars := map[string]interface{}{}
vars["builtin"], err = fs.getBuiltinVars(beatVersion)
vars["builtin"], err = fs.getBuiltinVars(info)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -294,6 +292,7 @@ func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error)
return template.FuncMap{
"IngestPipeline": func(shortID string) string {
return formatPipelineID(
builtinVars["prefix"].(string),
builtinVars["module"].(string),
builtinVars["fileset"].(string),
shortID,
Expand All @@ -305,7 +304,7 @@ func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error)

// getBuiltinVars computes the supported built in variables and groups them
// in a dictionary
func (fs *Fileset) getBuiltinVars(beatVersion string) (map[string]interface{}, error) {
func (fs *Fileset) getBuiltinVars(info beat.Info) (map[string]interface{}, error) {
host, err := os.Hostname()
if err != nil || len(host) == 0 {
return nil, fmt.Errorf("Error getting the hostname: %v", err)
Expand All @@ -318,11 +317,12 @@ func (fs *Fileset) getBuiltinVars(beatVersion string) (map[string]interface{}, e
}

return map[string]interface{}{
"prefix": info.IndexPrefix,
"hostname": hostname,
"domain": domain,
"module": fs.mcfg.Module,
"fileset": fs.name,
"beatVersion": beatVersion,
"beatVersion": info.Version,
}, nil
}

Expand Down Expand Up @@ -390,15 +390,15 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
}

// getPipelineIDs returns the Ingest Node pipeline IDs
func (fs *Fileset) getPipelineIDs(beatVersion string) ([]string, error) {
func (fs *Fileset) getPipelineIDs(info beat.Info) ([]string, error) {
var pipelineIDs []string
for _, ingestPipeline := range fs.manifest.IngestPipeline {
path, err := applyTemplate(fs.vars, ingestPipeline, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

pipelineIDs = append(pipelineIDs, formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion))
pipelineIDs = append(pipelineIDs, formatPipelineID(info.IndexPrefix, fs.mcfg.Module, fs.name, path, info.Version))
}

return pipelineIDs, nil
Expand Down Expand Up @@ -493,8 +493,8 @@ func fixYAMLMaps(elem interface{}) (_ interface{}, err error) {
}

// formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch
func formatPipelineID(module, fileset, path, beatVersion string) string {
return fmt.Sprintf("filebeat-%s-%s-%s-%s", beatVersion, module, fileset, removeExt(filepath.Base(path)))
func formatPipelineID(prefix, module, fileset, path, version string) string {
return fmt.Sprintf("%s-%s-%s-%s-%s", prefix, version, module, fileset, removeExt(filepath.Base(path)))
}

// removeExt returns the file name without the extension. If no dot is found,
Expand Down
22 changes: 15 additions & 7 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func makeTestInfo(version string) beat.Info {
return beat.Info{
IndexPrefix: "filebeat",
Version: version,
}
}

func getModuleForTesting(t *testing.T, module, fileset string) *Fileset {
modulesPath, err := filepath.Abs("../module")
assert.NoError(t, err)
Expand Down Expand Up @@ -60,7 +68,7 @@ func TestLoadManifestNginx(t *testing.T) {
func TestGetBuiltinVars(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")

vars, err := fs.getBuiltinVars("6.6.0")
vars, err := fs.getBuiltinVars(makeTestInfo("6.6.0"))
assert.NoError(t, err)

assert.IsType(t, vars["hostname"], "a-mac-with-esc-key")
Expand All @@ -77,7 +85,7 @@ func TestEvaluateVarsNginx(t *testing.T) {
fs.manifest, err = fs.readManifest()
assert.NoError(t, err)

vars, err := fs.evaluateVars("6.6.0")
vars, err := fs.evaluateVars(makeTestInfo("6.6.0"))
assert.NoError(t, err)

builtin := vars["builtin"].(map[string]interface{})
Expand All @@ -100,7 +108,7 @@ func TestEvaluateVarsNginxOverride(t *testing.T) {
fs.manifest, err = fs.readManifest()
assert.NoError(t, err)

vars, err := fs.evaluateVars("6.6.0")
vars, err := fs.evaluateVars(makeTestInfo("6.6.0"))
assert.NoError(t, err)

assert.Equal(t, "no_plugins", vars["pipeline"])
Expand All @@ -113,7 +121,7 @@ func TestEvaluateVarsMySQL(t *testing.T) {
fs.manifest, err = fs.readManifest()
assert.NoError(t, err)

vars, err := fs.evaluateVars("6.6.0")
vars, err := fs.evaluateVars(makeTestInfo("6.6.0"))
assert.NoError(t, err)

builtin := vars["builtin"].(map[string]interface{})
Expand Down Expand Up @@ -171,7 +179,7 @@ func TestResolveVariable(t *testing.T) {

func TestGetInputConfigNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))
assert.NoError(t, fs.Read(makeTestInfo("5.2.0")))

cfg, err := fs.getInputConfig()
assert.NoError(t, err)
Expand Down Expand Up @@ -236,7 +244,7 @@ func TestGetInputConfigNginxOverrides(t *testing.T) {
})
assert.NoError(t, err)

assert.NoError(t, fs.Read("5.2.0"))
assert.NoError(t, fs.Read(makeTestInfo("5.2.0")))

cfg, err := fs.getInputConfig()
assert.NoError(t, err)
Expand All @@ -260,7 +268,7 @@ func TestGetInputConfigNginxOverrides(t *testing.T) {

func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))
assert.NoError(t, fs.Read(makeTestInfo("5.2.0")))

version := common.MustNewVersion("5.2.0")
pipelines, err := fs.GetPipelines(*version)
Expand Down
12 changes: 7 additions & 5 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/paths"
Expand All @@ -46,7 +47,8 @@ type ModuleRegistry struct {
func newModuleRegistry(modulesPath string,
moduleConfigs []*ModuleConfig,
overrides *ModuleOverrides,
beatVersion string) (*ModuleRegistry, error) {
beatInfo beat.Info,
) (*ModuleRegistry, error) {

var reg ModuleRegistry
reg.registry = map[string]map[string]*Fileset{}
Expand Down Expand Up @@ -87,7 +89,7 @@ func newModuleRegistry(modulesPath string,
if err != nil {
return nil, err
}
err = fileset.Read(beatVersion)
err = fileset.Read(beatInfo)
if err != nil {
return nil, fmt.Errorf("Error reading fileset %s/%s: %v", mcfg.Module, filesetName, err)
}
Expand All @@ -106,7 +108,7 @@ func newModuleRegistry(modulesPath string,
}
}
if !found {
return nil, fmt.Errorf("Fileset %s/%s is configured but doesn't exist", mcfg.Module, filesetName)
return nil, fmt.Errorf("fileset %s/%s is configured but doesn't exist", mcfg.Module, filesetName)
}
}
}
Expand All @@ -115,7 +117,7 @@ func newModuleRegistry(modulesPath string,
}

// NewModuleRegistry reads and loads the configured module into the registry.
func NewModuleRegistry(moduleConfigs []*common.Config, beatVersion string, init bool) (*ModuleRegistry, error) {
func NewModuleRegistry(moduleConfigs []*common.Config, beatInfo beat.Info, init bool) (*ModuleRegistry, error) {
modulesPath := paths.Resolve(paths.Home, "module")

stat, err := os.Stat(modulesPath)
Expand Down Expand Up @@ -151,7 +153,7 @@ func NewModuleRegistry(moduleConfigs []*common.Config, beatVersion string, init
return nil, err
}

return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatVersion)
return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo)
}

func mcfgFromConfig(cfg *common.Config) (*ModuleConfig, error) {
Expand Down
Loading

0 comments on commit b5465d8

Please sign in to comment.