From bcff0fca81b0ddabf6395b6b7c4b4954eba69706 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Tue, 23 Jun 2020 03:53:55 -0700 Subject: [PATCH] Add support for multiple sets of hints on autodiscover (#18883) Allows defining multiple sets of annotations similar to how processors are defined. This functionality already exists in heartbeat. This change standardizes the utility and adds it to logs and metrics hints builder as well. (cherry picked from commit a5da85c6439bd16130090a8dff8e3ad8f0c09f24) --- CHANGELOG.next.asciidoc | 1 + filebeat/autodiscover/builder/hints/logs.go | 121 +++-- .../autodiscover/builder/hints/logs_test.go | 414 +++++++++++------- filebeat/docs/autodiscover-hints.asciidoc | 16 + .../autodiscover/builder/hints/monitors.go | 42 +- heartbeat/docs/autodiscover-hints.asciidoc | 15 + libbeat/autodiscover/builder/helper.go | 39 ++ libbeat/autodiscover/builder/helper_test.go | 94 ++++ .../autodiscover/builder/hints/metrics.go | 124 +++--- .../builder/hints/metrics_test.go | 251 +++++++---- metricbeat/docs/autodiscover-hints.asciidoc | 19 + 11 files changed, 734 insertions(+), 402 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d1e969cfeb92..7886ab599efa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -359,6 +359,7 @@ field. You can revert this change by configuring tags for the module and omittin - Update RPM packages contained in Beat Docker images. {issue}17035[17035] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] +- Add support for multiple sets of hints on autodiscover {pull}18883[18883] *Auditbeat* diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index 70758a8a0280..05014134106c 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -79,7 +79,7 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm hints, _ = hIface.(common.MapStr) } - inputConfig := l.getInputs(hints) + inputConfig := l.getInputsConfigs(hints) // If default config is disabled return nothing unless it's explicty enabled if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) { @@ -87,16 +87,12 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return []*common.Config{} } - // If explicty disabled, return nothing + // If explictly disabled, return nothing if builder.IsDisabled(hints, l.config.Key) { logp.Debug("hints.builder", "logs disabled by hint: %+v", event) return []*common.Config{} } - // Clone original config, enable it if disabled - config, _ := common.NewConfigFrom(l.config.DefaultConfig) - config.Remove("enabled", -1) - host, _ := event["host"].(string) if host == "" { return []*common.Config{} @@ -114,58 +110,67 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*comm return template.ApplyConfigTemplate(event, configs) } - tempCfg := common.MapStr{} - mline := l.getMultiline(hints) - if len(mline) != 0 { - tempCfg.Put(multiline, mline) - } - if ilines := l.getIncludeLines(hints); len(ilines) != 0 { - tempCfg.Put(includeLines, ilines) - } - if elines := l.getExcludeLines(hints); len(elines) != 0 { - tempCfg.Put(excludeLines, elines) - } - - if procs := l.getProcessors(hints); len(procs) != 0 { - tempCfg.Put(processors, procs) - } - - if jsonOpts := l.getJSONOptions(hints); len(jsonOpts) != 0 { - tempCfg.Put(json, jsonOpts) - } - // Merge config template with the configs from the annotations - if err := config.Merge(tempCfg); err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) - return []*common.Config{config} - } + var configs []*common.Config + inputs := l.getInputs(hints) + for _, h := range inputs { + // Clone original config, enable it if disabled + config, _ := common.NewConfigFrom(l.config.DefaultConfig) + config.Remove("enabled", -1) + + tempCfg := common.MapStr{} + mline := l.getMultiline(h) + if len(mline) != 0 { + tempCfg.Put(multiline, mline) + } + if ilines := l.getIncludeLines(h); len(ilines) != 0 { + tempCfg.Put(includeLines, ilines) + } + if elines := l.getExcludeLines(h); len(elines) != 0 { + tempCfg.Put(excludeLines, elines) + } - module := l.getModule(hints) - if module != "" { - moduleConf := map[string]interface{}{ - "module": module, + if procs := l.getProcessors(h); len(procs) != 0 { + tempCfg.Put(processors, procs) } - filesets := l.getFilesets(hints, module) - for fileset, conf := range filesets { - filesetConf, _ := common.NewConfigFrom(config) + if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 { + tempCfg.Put(json, jsonOpts) + } + // Merge config template with the configs from the annotations + if err := config.Merge(tempCfg); err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + continue + } - if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { - filesetConf.SetString("stream", -1, conf.Stream) - } else { - filesetConf.SetString("containers.stream", -1, conf.Stream) + module := l.getModule(hints) + if module != "" { + moduleConf := map[string]interface{}{ + "module": module, } - moduleConf[fileset+".enabled"] = conf.Enabled - moduleConf[fileset+".input"] = filesetConf + filesets := l.getFilesets(hints, module) + for fileset, conf := range filesets { + filesetConf, _ := common.NewConfigFrom(config) + + if inputType, _ := filesetConf.String("type", -1); inputType == harvester.ContainerType { + filesetConf.SetString("stream", -1, conf.Stream) + } else { + filesetConf.SetString("containers.stream", -1, conf.Stream) + } - logp.Debug("hints.builder", "generated config %+v", moduleConf) + moduleConf[fileset+".enabled"] = conf.Enabled + moduleConf[fileset+".input"] = filesetConf + + logp.Debug("hints.builder", "generated config %+v", moduleConf) + } + config, _ = common.NewConfigFrom(moduleConf) } - config, _ = common.NewConfigFrom(moduleConf) + logp.Debug("hints.builder", "generated config %+v", config) + configs = append(configs, config) } - logp.Debug("hints.builder", "generated config %+v", config) // Apply information in event to the template to generate the final config - return template.ApplyConfigTemplate(event, []*common.Config{config}) + return template.ApplyConfigTemplate(event, configs) } func (l *logHints) getMultiline(hints common.MapStr) common.MapStr { @@ -186,7 +191,7 @@ func (l *logHints) getModule(hints common.MapStr) string { return validModuleNames.ReplaceAllString(module, "") } -func (l *logHints) getInputs(hints common.MapStr) []common.MapStr { +func (l *logHints) getInputsConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, l.config.Key) } @@ -248,3 +253,23 @@ func (l *logHints) getFilesets(hints common.MapStr, module string) map[string]*f return filesets } + +func (l *logHints) getInputs(hints common.MapStr) []common.MapStr { + modules := builder.GetHintsAsList(hints, l.config.Key) + var output []common.MapStr + + for _, mod := range modules { + output = append(output, common.MapStr{ + l.config.Key: mod, + }) + } + + // Generate this so that no hints with completely valid templates work + if len(output) == 0 { + output = append(output, common.MapStr{ + l.config.Key: common.MapStr{}, + }) + } + + return output +} diff --git a/filebeat/autodiscover/builder/hints/logs_test.go b/filebeat/autodiscover/builder/hints/logs_test.go index b316cdb506c2..0dadfe54798c 100644 --- a/filebeat/autodiscover/builder/hints/logs_test.go +++ b/filebeat/autodiscover/builder/hints/logs_test.go @@ -54,7 +54,7 @@ func TestGenerateHints(t *testing.T) { config *common.Config event bus.Event len int - result common.MapStr + result []common.MapStr }{ { msg: "Default config is correct", @@ -73,9 +73,11 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "type": "container", + result: []common.MapStr{ + { + "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "type": "container", + }, }, }, { @@ -94,7 +96,8 @@ func TestGenerateHints(t *testing.T) { "id": "abc", }, }, - len: 0, + len: 0, + result: []common.MapStr{}, }, { msg: "Hint to enable when disabled by default works", @@ -119,10 +122,12 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "container", - "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, - "exclude_lines": []interface{}{"^test2", "^test3"}, + result: []common.MapStr{ + { + "type": "container", + "paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + }, }, }, { @@ -136,7 +141,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { msg: "Hints with logs.disable should return nothing", @@ -149,7 +154,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { msg: "Empty event hints should return default config", @@ -168,12 +173,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, { @@ -199,14 +206,62 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test", "^test1"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + "close_timeout": "true", + }, + }, + }, + { + msg: "Hints with two sets of include|exclude_lines must be part of the input config", + config: customCfg, + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "1": common.MapStr{ + "exclude_lines": "^test1, ^test2", + }, + "2": common.MapStr{ + "include_lines": "^test1, ^test2", + }, + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "exclude_lines": []interface{}{"^test1", "^test2"}, + "close_timeout": "true", + }, + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test1", "^test2"}, + "close_timeout": "true", }, - "include_lines": []interface{}{"^test", "^test1"}, - "exclude_lines": []interface{}{"^test2", "^test3"}, - "close_timeout": "true", }, }, { @@ -234,16 +289,18 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "multiline": map[string]interface{}{ - "pattern": "^test", - "negate": "true", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, { @@ -268,14 +325,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "multiline": map[string]interface{}{ - "pattern": "^test", - "negate": "true", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, }, }, }, @@ -308,20 +367,22 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "type": "docker", - "containers": map[string]interface{}{ - "ids": []interface{}{"abc"}, - }, - "close_timeout": "true", - "processors": []interface{}{ - map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "%{key1} %{key2}", + result: []common.MapStr{ + { + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "%{key1} %{key2}", + }, + }, + map[string]interface{}{ + "drop_event": nil, }, - }, - map[string]interface{}{ - "drop_event": nil, }, }, }, @@ -348,28 +409,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -397,28 +460,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "error": map[string]interface{}{ - "enabled": false, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "all", - "ids": []interface{}{"abc"}, + "error": map[string]interface{}{ + "enabled": false, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "all", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -447,28 +512,30 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "stdout", - "ids": []interface{}{"abc"}, + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "stdout", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, - }, - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "docker", - "containers": map[string]interface{}{ - "stream": "stderr", - "ids": []interface{}{"abc"}, + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "stream": "stderr", + "ids": []interface{}{"abc"}, + }, + "close_timeout": "true", }, - "close_timeout": "true", }, }, }, @@ -495,25 +562,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -542,25 +611,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "error": map[string]interface{}{ - "enabled": false, - "input": map[string]interface{}{ - "type": "container", - "stream": "all", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "error": map[string]interface{}{ + "enabled": false, + "input": map[string]interface{}{ + "type": "container", + "stream": "all", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -590,25 +661,27 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "apache2", - "access": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "stdout", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + result: []common.MapStr{ + { + "module": "apache2", + "access": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "stdout", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, - }, - "error": map[string]interface{}{ - "enabled": true, - "input": map[string]interface{}{ - "type": "container", - "stream": "stderr", - "paths": []interface{}{ - "/var/lib/docker/containers/abc/*-json.log", + "error": map[string]interface{}{ + "enabled": true, + "input": map[string]interface{}{ + "type": "container", + "stream": "stderr", + "paths": []interface{}{ + "/var/lib/docker/containers/abc/*-json.log", + }, }, }, }, @@ -629,15 +702,18 @@ func TestGenerateHints(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len, test.msg) - if test.len != 0 { + assert.Equal(t, test.len, len(cfgs), test.msg) + configs := make([]common.MapStr, 0) + for _, cfg := range cfgs { config := common.MapStr{} - err := cfgs[0].Unpack(&config) - assert.Nil(t, err, test.msg) - - assert.Equal(t, test.result, config, test.msg) + err := cfg.Unpack(&config) + ok := assert.Nil(t, err, test.msg) + if !ok { + break + } + configs = append(configs, config) } - + assert.Equal(t, test.result, configs, test.msg) } } @@ -861,7 +937,7 @@ func TestGenerateHintsWithPaths(t *testing.T) { } cfgs := l.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len, test.msg) + assert.Equal(t, test.len, len(cfgs), test.msg) if test.len != 0 { config := common.MapStr{} err := cfgs[0].Unpack(&config) diff --git a/filebeat/docs/autodiscover-hints.asciidoc b/filebeat/docs/autodiscover-hints.asciidoc index 8201de0e6204..9c1893c83677 100644 --- a/filebeat/docs/autodiscover-hints.asciidoc +++ b/filebeat/docs/autodiscover-hints.asciidoc @@ -155,6 +155,22 @@ annotations: co.elastic.logs.sidecar/exclude_lines: '^DBG' ----- +[float] +===== Multiple sets of hints +When a container needs multiple inputs to be defined on it, sets of annotations can be provided with numeric prefixes. +If there are hints that don't have a numeric prefix then they get grouped together into a single configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.logs/exclude_lines: '^DBG' + co.elastic.logs/1.include_lines: '^DBG' + co.elastic.logs/1.processors.dissect.tokenizer: "%{key2} %{key1}" +------------------------------------------------------------------------------------- + +The above configuration would generate two input configurations. The first input handles only debug logs and passes it through a dissect +tokenizer. The second input handles everything but debug logs. + [float] ===== Configuring Namespace Defaults diff --git a/heartbeat/autodiscover/builder/hints/monitors.go b/heartbeat/autodiscover/builder/hints/monitors.go index f9fe8847d3ec..ca182fe37f26 100644 --- a/heartbeat/autodiscover/builder/hints/monitors.go +++ b/heartbeat/autodiscover/builder/hints/monitors.go @@ -19,8 +19,6 @@ package hints import ( "fmt" - "sort" - "strconv" "strings" "github.com/elastic/go-ucfg" @@ -97,7 +95,7 @@ func (hb *heartbeatHints) CreateConfig(event bus.Event, options ...ucfg.Option) } tempCfg := common.MapStr{} - monitors := hb.getMonitors(hints) + monitors := builder.GetHintsAsList(hints, hb.config.Key) var configs []*common.Config for _, monitor := range monitors { @@ -138,44 +136,6 @@ func (hb *heartbeatHints) getRawConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, hb.config.Key) } -func (hb *heartbeatHints) getMonitors(hints common.MapStr) []common.MapStr { - raw := builder.GetHintMapStr(hints, hb.config.Key, "") - if raw == nil { - return nil - } - - var words, nums []string - - for key := range raw { - if _, err := strconv.Atoi(key); err != nil { - words = append(words, key) - continue - } else { - nums = append(nums, key) - } - } - - sort.Strings(nums) - - var configs []common.MapStr - for _, key := range nums { - rawCfg, _ := raw[key] - if config, ok := rawCfg.(common.MapStr); ok { - configs = append(configs, config) - } - } - - defaultMap := common.MapStr{} - for _, word := range words { - defaultMap[word] = raw[word] - } - - if len(defaultMap) != 0 { - configs = append(configs, defaultMap) - } - return configs -} - func (hb *heartbeatHints) getProcessors(hints common.MapStr) []common.MapStr { return builder.GetConfigs(hints, "", "processors") } diff --git a/heartbeat/docs/autodiscover-hints.asciidoc b/heartbeat/docs/autodiscover-hints.asciidoc index d89b52f85084..0b6a44115c9c 100644 --- a/heartbeat/docs/autodiscover-hints.asciidoc +++ b/heartbeat/docs/autodiscover-hints.asciidoc @@ -92,6 +92,21 @@ annotations: ----- +[float] +===== Multiple sets of hints +When a container needs multiple monitors to be defined on it, sets of annotations can be provided with numeric prefixes. +Annotations without numeric prefixes would default into a single monitor configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.monitor/type: http + co.elastic.monitor/hosts: ${data.host}:8080/healtlz + co.elastic.monitor/schedule: "@every 5s" + co.elastic.monitor/1.type: tcp + co.elastic.monitor/1.hosts: ${data.host}:8080 + co.elastic.monitor/1.schedule: "@every 5s" +------------------------------------------------------------------------------------- [float] diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index b6d52a08eb58..faaa112c65f3 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -248,3 +248,42 @@ func GenerateHints(annotations common.MapStr, container, prefix string) common.M return hints } + +// GetHintsAsList gets a set of hints and tries to convert them into a list of hints +func GetHintsAsList(hints common.MapStr, key string) []common.MapStr { + raw := GetHintMapStr(hints, key, "") + if raw == nil { + return nil + } + + var words, nums []string + + for key := range raw { + if _, err := strconv.Atoi(key); err != nil { + words = append(words, key) + continue + } else { + nums = append(nums, key) + } + } + + sort.Strings(nums) + + var configs []common.MapStr + for _, key := range nums { + rawCfg, _ := raw[key] + if config, ok := rawCfg.(common.MapStr); ok { + configs = append(configs, config) + } + } + + defaultMap := common.MapStr{} + for _, word := range words { + defaultMap[word] = raw[word] + } + + if len(defaultMap) != 0 { + configs = append(configs, defaultMap) + } + return configs +} diff --git a/libbeat/autodiscover/builder/helper_test.go b/libbeat/autodiscover/builder/helper_test.go index e8b5ce521791..dee7c95ef13d 100644 --- a/libbeat/autodiscover/builder/helper_test.go +++ b/libbeat/autodiscover/builder/helper_test.go @@ -219,3 +219,97 @@ func TestGenerateHints(t *testing.T) { assert.Equal(t, test.result, GenerateHints(annMap, "foobar", "co.elastic")) } } +func TestGetHintsAsList(t *testing.T) { + tests := []struct { + input common.MapStr + output []common.MapStr + message string + }{ + { + input: common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + }, + message: "Single hint should return a single set of configs", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + }, + message: "Single hint with numeric prefix should return a single set of configs", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + "2": common.MapStr{ + "module": "dropwizard", + "period": "20s", + }, + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + { + "module": "dropwizard", + "period": "20s", + }, + }, + message: "Multiple hints with numeric prefix should return configs in numeric ordering", + }, + { + input: common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + "module": "dropwizard", + "period": "20s", + }, + }, + output: []common.MapStr{ + { + "module": "prometheus", + "period": "15s", + }, + { + "module": "dropwizard", + "period": "20s", + }, + }, + message: "Multiple hints with numeric prefix and default should return configs with defaults at the last", + }, + } + + for _, test := range tests { + t.Run(test.message, func(t *testing.T) { + assert.Equal(t, test.output, GetHintsAsList(test.input, "metrics")) + }) + } +} diff --git a/metricbeat/autodiscover/builder/hints/metrics.go b/metricbeat/autodiscover/builder/hints/metrics.go index 84ce8d65a861..52eba34a1ff5 100644 --- a/metricbeat/autodiscover/builder/hints/metrics.go +++ b/metricbeat/autodiscover/builder/hints/metrics.go @@ -75,12 +75,12 @@ func NewMetricHints(cfg *common.Config) (autodiscover.Builder, error) { // Create configs based on hints passed from providers func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config { var ( - config []*common.Config - noPort bool + configs []*common.Config + noPort bool ) host, _ := event["host"].(string) if host == "" { - return config + return configs } port, ok := common.TryToInt(event["port"]) @@ -90,10 +90,10 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c hints, ok := event["hints"].(common.MapStr) if !ok { - return config + return configs } - modulesConfig := m.getModules(hints) + modulesConfig := m.getModuleConfigs(hints) // here we handle raw configs if provided if modulesConfig != nil { configs := []*common.Config{} @@ -108,64 +108,67 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c } - mod := m.getModule(hints) - if mod == "" { - return config - } + modules := m.getModules(hints) + for _, hint := range modules { + mod := m.getModule(hint) + if mod == "" { + continue + } - hosts, ok := m.getHostsWithPort(hints, port, noPort) - if !ok { - return config - } + hosts, ok := m.getHostsWithPort(hint, port, noPort) + if !ok { + continue + } - ns := m.getNamespace(hints) - msets := m.getMetricSets(hints, mod) - tout := m.getTimeout(hints) - ival := m.getPeriod(hints) - sslConf := m.getSSLConfig(hints) - procs := m.getProcessors(hints) - metricspath := m.getMetricPath(hints) - username := m.getUsername(hints) - password := m.getPassword(hints) - - moduleConfig := common.MapStr{ - "module": mod, - "metricsets": msets, - "hosts": hosts, - "timeout": tout, - "period": ival, - "enabled": true, - "ssl": sslConf, - "processors": procs, - } + ns := m.getNamespace(hint) + msets := m.getMetricSets(hint, mod) + tout := m.getTimeout(hint) + ival := m.getPeriod(hint) + sslConf := m.getSSLConfig(hint) + procs := m.getProcessors(hint) + metricspath := m.getMetricPath(hint) + username := m.getUsername(hint) + password := m.getPassword(hint) + + moduleConfig := common.MapStr{ + "module": mod, + "metricsets": msets, + "hosts": hosts, + "timeout": tout, + "period": ival, + "enabled": true, + "ssl": sslConf, + "processors": procs, + } - if ns != "" { - moduleConfig["namespace"] = ns - } - if metricspath != "" { - moduleConfig["metrics_path"] = metricspath - } - if username != "" { - moduleConfig["username"] = username - } - if password != "" { - moduleConfig["password"] = password - } + if ns != "" { + moduleConfig["namespace"] = ns + } + if metricspath != "" { + moduleConfig["metrics_path"] = metricspath + } + if username != "" { + moduleConfig["username"] = username + } + if password != "" { + moduleConfig["password"] = password + } - m.logger.Debug("generated config: %v", moduleConfig) + logp.Debug("hints.builder", "generated config: %v", moduleConfig) - // Create config object - cfg, err := common.NewConfigFrom(moduleConfig) - if err != nil { - logp.Debug("", "config merge failed with error: %v", err) + // Create config object + cfg, err := common.NewConfigFrom(moduleConfig) + if err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + } + logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) + configs = append(configs, cfg) } - m.logger.Debug("generated config: %+v", common.DebugString(cfg, true)) - config = append(config, cfg) // Apply information in event to the template to generate the final config // This especially helps in a scenario where endpoints are configured as: // co.elastic.metrics/hosts= "${data.host}:9090" - return template.ApplyConfigTemplate(event, config, options...) + return template.ApplyConfigTemplate(event, configs, options...) } func (m *metricHints) getModule(hints common.MapStr) string { @@ -267,7 +270,7 @@ func (m *metricHints) getSSLConfig(hints common.MapStr) common.MapStr { return builder.GetHintMapStr(hints, m.Key, ssl) } -func (m *metricHints) getModules(hints common.MapStr) []common.MapStr { +func (m *metricHints) getModuleConfigs(hints common.MapStr) []common.MapStr { return builder.GetHintAsConfigs(hints, m.Key) } @@ -275,3 +278,16 @@ func (m *metricHints) getProcessors(hints common.MapStr) []common.MapStr { return builder.GetProcessors(hints, m.Key) } + +func (m *metricHints) getModules(hints common.MapStr) []common.MapStr { + modules := builder.GetHintsAsList(hints, m.Key) + var output []common.MapStr + + for _, mod := range modules { + output = append(output, common.MapStr{ + m.Key: mod, + }) + } + + return output +} diff --git a/metricbeat/autodiscover/builder/hints/metrics_test.go b/metricbeat/autodiscover/builder/hints/metrics_test.go index 3b306ac62c07..d2d0462a0476 100644 --- a/metricbeat/autodiscover/builder/hints/metrics_test.go +++ b/metricbeat/autodiscover/builder/hints/metrics_test.go @@ -38,7 +38,7 @@ func TestGenerateHints(t *testing.T) { message string event bus.Event len int - result common.MapStr + result []common.MapStr }{ { message: "Empty event hints should return empty config", @@ -58,7 +58,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints without host should return nothing", @@ -70,7 +70,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints without matching port should return nothing", @@ -85,7 +85,7 @@ func TestGenerateHints(t *testing.T) { }, }, len: 0, - result: common.MapStr{}, + result: []common.MapStr{}, }, { message: "Hints with multiple hosts return only the matching one", @@ -100,13 +100,15 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -122,13 +124,15 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -142,12 +146,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmodule", - "metricsets": []string{"one", "two"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmodule", + "metricsets": []string{"one", "two"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -162,12 +168,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmodule", - "metricsets": []string{"one"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmodule", + "metricsets": []string{"one"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -181,12 +189,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -200,12 +210,14 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -222,14 +234,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, }, }, { @@ -250,18 +264,20 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "timeout": "3s", - "period": "1m", - "enabled": true, - "hosts": []interface{}{"1.2.3.4:9090"}, - "processors": []interface{}{ - map[string]interface{}{ - "add_locale": map[string]interface{}{ - "abbrevation": "MST", + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + "processors": []interface{}{ + map[string]interface{}{ + "add_locale": map[string]interface{}{ + "abbrevation": "MST", + }, }, }, }, @@ -296,14 +312,16 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "hosts": []interface{}{"1.2.3.4:9090"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "hosts": []interface{}{"1.2.3.4:9090"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, }, }, { @@ -311,15 +329,18 @@ func TestGenerateHints(t *testing.T) { event: bus.Event{ "host": "1.2.3.4", "port": 80, - "hints": common.MapStr{ - "metrics": common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "hosts": "${data.host}:8080", + "hints": []common.MapStr{ + { + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:8080", + }, }, }, }, - len: 0, + len: 0, + result: []common.MapStr{}, }, { message: "Non http URLs with valid host port combination should return a valid config", @@ -335,14 +356,57 @@ func TestGenerateHints(t *testing.T) { }, }, len: 1, - result: common.MapStr{ - "module": "mockmoduledefaults", - "namespace": "test", - "metricsets": []string{"default"}, - "hosts": []interface{}{"tcp(1.2.3.4:3306)/"}, - "timeout": "3s", - "period": "1m", - "enabled": true, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "hosts": []interface{}{"tcp(1.2.3.4:3306)/"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + }, + { + message: "Module with mutliple sets of hints must return the right configs", + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "1": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090", + }, + "2": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test1", + "hosts": "${data.host}:9090/fake", + }, + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, + { + "module": "mockmoduledefaults", + "namespace": "test1", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090/fake"}, + }, }, }, } @@ -361,11 +425,18 @@ func TestGenerateHints(t *testing.T) { cfgs := m.CreateConfig(test.event) assert.Equal(t, len(cfgs), test.len) - if len(cfgs) != 0 { + // The check below helps skipping config validation if there is no config supposed to be emitted. + if len(cfgs) == 0 { + continue + } + configs := make([]common.MapStr, 0) + for _, cfg := range cfgs { config := common.MapStr{} - err := cfgs[0].Unpack(&config) - assert.Nil(t, err, test.message) - + err := cfg.Unpack(&config) + ok := assert.Nil(t, err, test.message) + if !ok { + break + } // metricsets order is random, order it for tests if v, err := config.GetValue("metricsets"); err == nil { if msets, ok := v.([]interface{}); ok { @@ -377,9 +448,9 @@ func TestGenerateHints(t *testing.T) { config["metricsets"] = metricsets } } - - assert.Equal(t, test.result, config, test.message) + configs = append(configs, config) } + assert.Equal(t, test.result, configs, test.message) } } diff --git a/metricbeat/docs/autodiscover-hints.asciidoc b/metricbeat/docs/autodiscover-hints.asciidoc index 25029a61d05d..ef99de73fc69 100644 --- a/metricbeat/docs/autodiscover-hints.asciidoc +++ b/metricbeat/docs/autodiscover-hints.asciidoc @@ -129,6 +129,25 @@ annotations: co.elastic.metrics.sidecar/hosts: '${data.host}:8080' ------------------------------------------------------------------------------------- +[float] +===== Multiple sets of hints +When a container port needs multiple modules to be defined on it, sets of annotations can be provided with numeric prefixes. +If there are hints that don't have a numeric prefix then they get grouped together into a single configuration. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------------- +annotations: + co.elastic.metrics/1.module: prometheus + co.elastic.metrics/1.hosts: '${data.host}:80/metrics' + co.elastic.metrics/1.period: 60s + co.elastic.metrics/module: prometheus + co.elastic.metrics/hosts: '${data.host}:80/metrics/p1' + co.elastic.metrics/period: 5s +------------------------------------------------------------------------------------- + +The above configuration would spin up two metricbeat module configurations to ensure that the endpoint "/metrics/p1" is +polled every 60s whereas the "/metrics" endpoint is polled every 60s. + [float] ===== Configuring Namespace Defaults