From a30ac42ee0f89d1fe2b3aee441681f7856554cf0 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Fri, 31 Jan 2020 17:05:13 +0100 Subject: [PATCH] [Metricbeat] Support processors defined for light modules (#15923) (#15992) * Adjust test cases first * Setup processors for light modules * Fix: comments, mage check * Adjust code * Fix: missing comment * Fix: mage check * Test light modules * Test connector * Test runner * Fix: ToLower * Adjust code after review * Fix: mage check * Adjust code after review * Adjust code after review * Fix: check error * Fix: imports * Increase test coverage * Add unit tests * Fix: hound * beater: use factory * Beater: modules * Fix: system tests * Fix: implements interface * Add changelog entry * Verify if processors are setup (cherry picked from commit a70d6e8051c8572480ab47efca523d6dfb39ad28) --- CHANGELOG.next.asciidoc | 1 + metricbeat/beater/metricbeat.go | 98 ++++--------------- metricbeat/docs/modules/activemq.asciidoc | 30 ------ metricbeat/docs/modules/ibmmq.asciidoc | 20 ---- metricbeat/mb/lightmetricset.go | 2 + metricbeat/mb/lightmodules.go | 14 +++ metricbeat/mb/lightmodules_test.go | 26 +++++ metricbeat/mb/module/configuration.go | 65 ++++++++++++ metricbeat/mb/module/connector.go | 27 +++++ metricbeat/mb/module/connector_test.go | 41 ++++++++ metricbeat/mb/module/factory.go | 42 ++++---- metricbeat/mb/module/runner_group.go | 62 ++++++++++++ metricbeat/mb/module/runner_group_test.go | 88 +++++++++++++++++ metricbeat/mb/module/wrapper.go | 23 +++-- metricbeat/mb/module/wrapper_test.go | 71 +++++++++----- metricbeat/mb/registry.go | 24 +++++ metricbeat/mb/registry_test.go | 86 ++++++++-------- .../testdata/lightmodules/unpack/module.yml | 4 + .../unpack/noprocessors/manifest.yml | 6 ++ .../unpack/withprocessors/manifest.yml | 8 ++ metricbeat/tests/system/test_autodiscover.py | 12 +-- x-pack/metricbeat/metricbeat.reference.yml | 50 ---------- .../module/activemq/_meta/config.yml | 30 ------ .../module/activemq/broker/manifest.yml | 20 ++++ .../module/activemq/queue/manifest.yml | 10 ++ .../module/activemq/topic/manifest.yml | 10 ++ .../metricbeat/module/ibmmq/_meta/config.yml | 20 ---- .../metricbeat/module/ibmmq/qmgr/manifest.yml | 24 +++++ x-pack/metricbeat/module/ibmmq/test_ibmmq.py | 5 + .../modules.d/activemq.yml.disabled | 30 ------ .../metricbeat/modules.d/ibmmq.yml.disabled | 20 ---- 31 files changed, 586 insertions(+), 383 deletions(-) create mode 100644 metricbeat/mb/module/configuration.go create mode 100644 metricbeat/mb/module/runner_group.go create mode 100644 metricbeat/mb/module/runner_group_test.go create mode 100644 metricbeat/mb/testdata/lightmodules/unpack/module.yml create mode 100644 metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml create mode 100644 metricbeat/mb/testdata/lightmodules/unpack/withprocessors/manifest.yml diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c9b6af75ff04..da1cf124af8a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -199,6 +199,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `key/value` mode for SQL module. {issue}15770[15770] {pull]15845[15845] - Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822] - Make the `system/cpu` metricset collect normalized CPU metrics by default. {issue}15618[15618] {pull}15729[15729] +- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] *Packetbeat* diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 8d27d8066664..f7f79f7e42a0 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -20,18 +20,16 @@ package beater import ( "sync" - "github.com/elastic/beats/libbeat/common/reload" - "github.com/elastic/beats/libbeat/management" - "github.com/elastic/beats/libbeat/paths" - - "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/management" + "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" @@ -44,8 +42,8 @@ import ( // Metricbeat implements the Beater interface for metricbeat. type Metricbeat struct { - done chan struct{} // Channel used to initiate shutdown. - modules []staticModule // Active list of modules. + done chan struct{} // Channel used to initiate shutdown. + runners []module.Runner // Active list of module runners. config Config autodiscover *autodiscover.Autodiscover @@ -53,11 +51,6 @@ type Metricbeat struct { moduleOptions []module.Option } -type staticModule struct { - connector *module.Connector - module *module.Wrapper -} - // Option specifies some optional arguments used for configuring the behavior // of the Metricbeat framework. type Option func(mb *Metricbeat) @@ -162,46 +155,28 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe moduleOptions := append( []module.Option{module.WithMaxStartDelay(config.MaxStartDelay)}, metricbeat.moduleOptions...) - var errs multierror.Errors + + factory := module.NewFactory(b.Info, moduleOptions...) + for _, moduleCfg := range config.Modules { if !moduleCfg.Enabled() { continue } - failed := false - - connector, err := module.NewConnector(b.Info, b.Publisher, moduleCfg, nil) + runner, err := factory.Create(b.Publisher, moduleCfg, nil) if err != nil { - errs = append(errs, err) - failed = true - } - - module, err := module.NewWrapper(moduleCfg, mb.Registry, moduleOptions...) - if err != nil { - errs = append(errs, err) - failed = true - } - - if failed { - continue + return nil, err } - metricbeat.modules = append(metricbeat.modules, staticModule{ - connector: connector, - module: module, - }) + metricbeat.runners = append(metricbeat.runners, runner) } - if err := errs.Err(); err != nil { - return nil, err - } - if len(metricbeat.modules) == 0 && !dynamicCfgEnabled { + if len(metricbeat.runners) == 0 && !dynamicCfgEnabled { return nil, mb.ErrAllModulesDisabled } if config.Autodiscover != nil { var err error - factory := module.NewFactory(b.Info, metricbeat.moduleOptions...) adapter := autodiscover.NewFactoryAdapter(factory) metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) if err != nil { @@ -220,20 +195,16 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe func (bt *Metricbeat) Run(b *beat.Beat) error { var wg sync.WaitGroup - // Static modules (metricbeat.modules) - for _, m := range bt.modules { - client, err := m.connector.Connect() - if err != nil { - return err - } - - r := module.NewRunner(client, m.module) + // Static modules (metricbeat.runners) + for _, r := range bt.runners { r.Start() wg.Add(1) + + thatRunner := r go func() { defer wg.Done() <-bt.done - r.Stop() + thatRunner.Stop() }() } @@ -289,38 +260,7 @@ func (bt *Metricbeat) Stop() { close(bt.done) } -// Modules return a list of all configured modules, including anyone present -// under dynamic config settings. +// Modules return a list of all configured modules. func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) { - var modules []*module.Wrapper - for _, m := range bt.modules { - modules = append(modules, m.module) - } - - // Add dynamic modules - if bt.config.ConfigModules.Enabled() { - config := cfgfile.DefaultDynamicConfig - bt.config.ConfigModules.Unpack(&config) - - modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") - if err != nil { - return nil, errors.Wrap(err, "initialization error") - } - - for _, file := range modulesManager.ListEnabled() { - confs, err := cfgfile.LoadList(file.Path) - if err != nil { - return nil, errors.Wrap(err, "error loading config files") - } - for _, conf := range confs { - m, err := module.NewWrapper(conf, mb.Registry, bt.moduleOptions...) - if err != nil { - return nil, errors.Wrap(err, "module initialization error") - } - modules = append(modules, m) - } - } - } - - return modules, nil + return module.ConfiguredModules(bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions) } diff --git a/metricbeat/docs/modules/activemq.asciidoc b/metricbeat/docs/modules/activemq.asciidoc index 3bd03addf9cc..8f3e0551fd16 100644 --- a/metricbeat/docs/modules/activemq.asciidoc +++ b/metricbeat/docs/modules/activemq.asciidoc @@ -35,36 +35,6 @@ metricbeat.modules: path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } ---- [float] diff --git a/metricbeat/docs/modules/ibmmq.asciidoc b/metricbeat/docs/modules/ibmmq.asciidoc index 4912139f9dc1..911659d37d47 100644 --- a/metricbeat/docs/modules/ibmmq.asciidoc +++ b/metricbeat/docs/modules/ibmmq.asciidoc @@ -55,26 +55,6 @@ metricbeat.modules: # This module uses the Prometheus collector metricset, all # the options for this metricset are also available here. metrics_path: /metrics - - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } ---- It also supports the options described in <>. diff --git a/metricbeat/mb/lightmetricset.go b/metricbeat/mb/lightmetricset.go index eb0ec0749184..dee1d0afb7ba 100644 --- a/metricbeat/mb/lightmetricset.go +++ b/metricbeat/mb/lightmetricset.go @@ -21,6 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" ) // LightMetricSet contains the definition of a non-registered metric set @@ -33,6 +34,7 @@ type LightMetricSet struct { MetricSet string `config:"metricset" validate:"required"` Defaults interface{} `config:"defaults"` } `config:"input" validate:"required"` + Processors processors.PluginConfig `config:"processors"` } // Registration obtains a metric set registration for this light metric set, this registration diff --git a/metricbeat/mb/lightmodules.go b/metricbeat/mb/lightmodules.go index 51269c363942..0f0fb9f23cfa 100644 --- a/metricbeat/mb/lightmodules.go +++ b/metricbeat/mb/lightmodules.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) const ( @@ -150,6 +151,19 @@ type lightModuleConfig struct { MetricSets []string `config:"metricsets"` } +// ProcessorsForMetricSet returns processors defined for the light metricset. +func (s *LightModulesSource) ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) { + module, err := s.loadModule(r, moduleName) + if err != nil { + return nil, errors.Wrapf(err, "reading processors for metricset '%s' in module '%s' failed", metricSetName, moduleName) + } + metricSet, ok := module.MetricSets[metricSetName] + if !ok { + return nil, fmt.Errorf("unknown metricset '%s' in module '%s'", metricSetName, moduleName) + } + return processors.New(metricSet.Processors) +} + // LightModule contains the definition of a light module type LightModule struct { Name string diff --git a/metricbeat/mb/lightmodules_test.go b/metricbeat/mb/lightmodules_test.go index 4278adc049b0..a1bb999f8b1c 100644 --- a/metricbeat/mb/lightmodules_test.go +++ b/metricbeat/mb/lightmodules_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + _ "github.com/elastic/beats/libbeat/processors/add_id" ) // TestLightModulesAsModuleSource checks that registry correctly lists @@ -302,6 +303,31 @@ func TestNewModulesCallModuleFactory(t *testing.T) { assert.True(t, called, "module factory must be called if registered") } +func TestProcessorsForMetricSet_UnknownModule(t *testing.T) { + r := NewRegister() + source := NewLightModulesSource("testdata/lightmodules") + procs, err := source.ProcessorsForMetricSet(r, "nonexisting", "fake") + require.Error(t, err) + require.Nil(t, procs) +} + +func TestProcessorsForMetricSet_UnknownMetricSet(t *testing.T) { + r := NewRegister() + source := NewLightModulesSource("testdata/lightmodules") + procs, err := source.ProcessorsForMetricSet(r, "unpack", "nonexisting") + require.Error(t, err) + require.Nil(t, procs) +} + +func TestProcessorsForMetricSet_ProcessorsRead(t *testing.T) { + r := NewRegister() + source := NewLightModulesSource("testdata/lightmodules") + procs, err := source.ProcessorsForMetricSet(r, "unpack", "withprocessors") + require.NoError(t, err) + require.NotNil(t, procs) + require.Len(t, procs.List, 1) +} + type metricSetWithOption struct { BaseMetricSet Option string diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go new file mode 100644 index 000000000000..2ccd0bd99aca --- /dev/null +++ b/metricbeat/mb/module/configuration.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" +) + +// ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings. +func ConfiguredModules(modulesData []*common.Config, configModulesData *common.Config, moduleOptions []Option) ([]*Wrapper, error) { + var modules []*Wrapper + + for _, moduleCfg := range modulesData { + module, err := NewWrapper(moduleCfg, mb.Registry, nil) + if err != nil { + return nil, err + } + modules = append(modules, module) + } + + // Add dynamic modules + if configModulesData.Enabled() { + config := cfgfile.DefaultDynamicConfig + configModulesData.Unpack(&config) + + modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") + if err != nil { + return nil, errors.Wrap(err, "initialization error") + } + + for _, file := range modulesManager.ListEnabled() { + confs, err := cfgfile.LoadList(file.Path) + if err != nil { + return nil, errors.Wrap(err, "error loading config files") + } + for _, conf := range confs { + m, err := NewWrapper(conf, mb.Registry, moduleOptions...) + if err != nil { + return nil, errors.Wrap(err, "module initialization error") + } + modules = append(modules, m) + } + } + } + return modules, nil +} diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index b85aecc93285..15456c160a8e 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -18,6 +18,8 @@ package module import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" @@ -47,6 +49,10 @@ type connectorConfig struct { common.EventMetadata `config:",inline"` // Fields and tags to add to events. } +type metricSetRegister interface { + ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) +} + func NewConnector( beatInfo beat.Info, pipeline beat.Pipeline, c *common.Config, dynFields *common.MapStrPointer, @@ -70,6 +76,27 @@ func NewConnector( }, nil } +// UseMetricSetProcessors appends processors defined in metricset configuration to the connector properties. +func (c *Connector) UseMetricSetProcessors(r metricSetRegister, moduleName, metricSetName string) error { + metricSetProcessors, err := r.ProcessorsForMetricSet(moduleName, metricSetName) + if err != nil { + return errors.Wrapf(err, "reading metricset processors failed (module: %s, metricset: %s)", + moduleName, metricSetName) + } + + if metricSetProcessors == nil || len(metricSetProcessors.List) == 0 { + return nil // no processors are defined + } + + procs := processors.NewList(nil) + procs.AddProcessors(*metricSetProcessors) + for _, p := range c.processors.List { + procs.AddProcessor(p) + } + c.processors = procs + return nil +} + func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ Processing: beat.ProcessingConfig{ diff --git a/metricbeat/mb/module/connector_test.go b/metricbeat/mb/module/connector_test.go index 40789374bf81..9ca3e48cd17e 100644 --- a/metricbeat/mb/module/connector_test.go +++ b/metricbeat/mb/module/connector_test.go @@ -18,13 +18,16 @@ package module import ( + "errors" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" ) func TestProcessorsForConfig(t *testing.T) { @@ -91,6 +94,44 @@ func TestProcessorsForConfig(t *testing.T) { } } +type fakeMetricSetRegister struct { + success bool +} + +func (fmsr *fakeMetricSetRegister) ProcessorsForMetricSet(moduleName, metricSetName string) (*processors.Processors, error) { + if !fmsr.success { + return nil, errors.New("failure") + } + + procs := new(processors.Processors) + procs.List = []processors.Processor{nil, nil} + return procs, nil +} + +func TestUseMetricSetProcessors_ReadingProcessorsFailed(t *testing.T) { + r := new(fakeMetricSetRegister) + + var connector Connector + err := connector.UseMetricSetProcessors(r, "module", "metricset") + require.Error(t, err) + require.Nil(t, connector.processors) +} + +func TestUseMetricSetProcessors_ReadingProcessorsSucceeded(t *testing.T) { + r := &fakeMetricSetRegister{ + success: true, + } + + connector := Connector{ + processors: &processors.Processors{ + List: []processors.Processor{}, + }, + } + err := connector.UseMetricSetProcessors(r, "module", "metricset") + require.NoError(t, err) + require.Len(t, connector.processors.List, 2) +} + // Helper function to convert from YML input string to an unpacked // connectorConfig func connectorConfigFromString(s string) (connectorConfig, error) { diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 8661bee3d035..08aafb36875f 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -18,8 +18,6 @@ package module import ( - "github.com/joeshaw/multierror" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" @@ -43,28 +41,36 @@ func NewFactory(beatInfo beat.Info, options ...Option) *Factory { // Create creates a new metricbeat module runner reporting events to the passed pipeline. func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - var errs multierror.Errors - - connector, err := NewConnector(r.beatInfo, p, c, meta) - if err != nil { - errs = append(errs, err) - } - w, err := NewWrapper(c, mb.Registry, r.options...) + module, metricSets, err := mb.NewModule(c, mb.Registry) if err != nil { - errs = append(errs, err) - } - - if err := errs.Err(); err != nil { return nil, err } - client, err := connector.Connect() - if err != nil { - return nil, err + var runners []Runner + for _, metricSet := range metricSets { + wrapper, err := NewWrapperForMetricSet(module, metricSet, r.options...) + if err != nil { + return nil, err + } + + connector, err := NewConnector(r.beatInfo, p, c, meta) + if err != nil { + return nil, err + } + + err = connector.UseMetricSetProcessors(mb.Registry, module.Name(), metricSet.Name()) + if err != nil { + return nil, err + } + + client, err := connector.Connect() + if err != nil { + return nil, err + } + runners = append(runners, NewRunner(client, wrapper)) } - mr := NewRunner(client, w) - return mr, nil + return newRunnerGroup(runners), nil } // CheckConfig checks if a config is valid or not diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go new file mode 100644 index 000000000000..e242a0281c85 --- /dev/null +++ b/metricbeat/mb/module/runner_group.go @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "strings" + "sync" +) + +type runnerGroup struct { + runners []Runner + + startOnce sync.Once + stopOnce sync.Once +} + +var _ Runner = new(runnerGroup) + +func newRunnerGroup(runners []Runner) Runner { + return &runnerGroup{ + runners: runners, + } +} + +func (rg *runnerGroup) Start() { + rg.startOnce.Do(func() { + for _, runner := range rg.runners { + runner.Start() + } + }) +} + +func (rg *runnerGroup) Stop() { + rg.stopOnce.Do(func() { + for _, runner := range rg.runners { + runner.Stop() + } + }) +} + +func (rg *runnerGroup) String() string { + var entries []string + for _, runner := range rg.runners { + entries = append(entries, runner.String()) + } + return "RunnerGroup{" + strings.Join(entries, ", ") + "}" +} diff --git a/metricbeat/mb/module/runner_group_test.go b/metricbeat/mb/module/runner_group_test.go new file mode 100644 index 000000000000..cbf2f310a87f --- /dev/null +++ b/metricbeat/mb/module/runner_group_test.go @@ -0,0 +1,88 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common/atomic" +) + +const ( + fakeRunnersNum = 3 + fakeRunnerName = "fakeRunner" +) + +type fakeRunner struct { + id int + + startCounter *atomic.Int + stopCounter *atomic.Int +} + +func (fr *fakeRunner) Start() { + if fr.startCounter != nil { + fr.startCounter.Inc() + } +} + +func (fr *fakeRunner) Stop() { + if fr.stopCounter != nil { + fr.stopCounter.Inc() + } +} + +func (fr *fakeRunner) String() string { + return fmt.Sprintf("%s-%d", fakeRunnerName, fr.id) +} + +func TestStartStop(t *testing.T) { + startCounter := atomic.NewInt(0) + stopCounter := atomic.NewInt(0) + + var runners []Runner + for i := 0; i < fakeRunnersNum; i++ { + runners = append(runners, &fakeRunner{ + id: i, + startCounter: startCounter, + stopCounter: stopCounter, + }) + } + + runnerGroup := newRunnerGroup(runners) + runnerGroup.Start() + + runnerGroup.Stop() + + assert.Equal(t, fakeRunnersNum, startCounter.Load()) + assert.Equal(t, fakeRunnersNum, stopCounter.Load()) +} + +func TestString(t *testing.T) { + var runners []Runner + for i := 0; i < fakeRunnersNum; i++ { + runners = append(runners, &fakeRunner{ + id: i, + }) + } + runnerGroup := newRunnerGroup(runners) + assert.Equal(t, "RunnerGroup{fakeRunner-0, fakeRunner-1, fakeRunner-2}", runnerGroup.String()) +} diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index e0bd7d12dbda..dfb547dcd27d 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -78,30 +78,37 @@ type stats struct { events *monitoring.Int // Total events published. } -// NewWrapper create a new Module and its associated MetricSets based -// on the given configuration. +// NewWrapper creates a new module and its associated metricsets based on the given configuration. func NewWrapper(config *common.Config, r *mb.Register, options ...Option) (*Wrapper, error) { - module, metricsets, err := mb.NewModule(config, r) + module, metricSets, err := mb.NewModule(config, r) if err != nil { return nil, err } + return createWrapper(module, metricSets, options...) +} + +// NewWrapperForMetricSet creates a wrapper for the selected module and metricset. +func NewWrapperForMetricSet(module mb.Module, metricSet mb.MetricSet, options ...Option) (*Wrapper, error) { + return createWrapper(module, []mb.MetricSet{metricSet}, options...) +} +func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Option) (*Wrapper, error) { wrapper := &Wrapper{ Module: module, - metricSets: make([]*metricSetWrapper, len(metricsets)), + metricSets: make([]*metricSetWrapper, len(metricSets)), } + for _, applyOption := range options { applyOption(wrapper) } - for i, ms := range metricsets { + for i, metricSet := range metricSets { wrapper.metricSets[i] = &metricSetWrapper{ - MetricSet: ms, + MetricSet: metricSet, module: wrapper, - stats: getMetricSetStats(wrapper.Name(), ms.Name()), + stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), } } - return wrapper, nil } diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index 8901937fadc4..b83db1ff6bf8 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -23,11 +23,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" - - "github.com/stretchr/testify/assert" ) const ( @@ -107,24 +108,18 @@ func newFakePushMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { func newTestRegistry(t testing.TB) *mb.Register { r := mb.NewRegister() - if err := r.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher); err != nil { - t.Fatal(err) - } - if err := r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher); err != nil { - t.Fatal(err) - } - if err := r.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet); err != nil { - t.Fatal(err) - } - + err := r.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher) + require.NoError(t, err) + err = r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher) + require.NoError(t, err) + err = r.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet) + require.NoError(t, err) return r } func newConfig(t testing.TB, moduleConfig interface{}) *common.Config { config, err := common.NewConfigFrom(moduleConfig) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) return config } @@ -139,9 +134,7 @@ func TestWrapperOfEventFetcher(t *testing.T) { }) m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) output := m.Start(done) @@ -172,9 +165,7 @@ func TestWrapperOfReportingFetcher(t *testing.T) { }) m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) output := m.Start(done) @@ -205,9 +196,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) { }) m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) output := m.Start(done) @@ -254,9 +243,7 @@ func TestPeriodIsAddedToEvent(t *testing.T) { }) m, err := module.NewWrapper(config, registry, module.WithMetricSetInfo()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) done := make(chan struct{}) defer close(done) @@ -270,3 +257,33 @@ func TestPeriodIsAddedToEvent(t *testing.T) { }) } } + +func TestNewWrapperForMetricSet(t *testing.T) { + hosts := []string{"alpha"} + c := newConfig(t, map[string]interface{}{ + "module": moduleName, + "metricsets": []string{eventFetcherName}, + "hosts": hosts, + }) + + aModule, metricSets, err := mb.NewModule(c, newTestRegistry(t)) + require.NoError(t, err) + + m, err := module.NewWrapperForMetricSet(aModule, metricSets[0], module.WithMetricSetInfo()) + require.NoError(t, err) + + done := make(chan struct{}) + output := m.Start(done) + + <-output + close(done) + + // Validate that the channel is closed after receiving the event. + select { + case _, ok := <-output: + if !ok { + return // Channel is closed. + } + assert.Fail(t, "received unexpected event") + } +} diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index d57d1999a5e6..c7b13b3a60bc 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) const initialSize = 20 // initialSize specifies the initial size of the Register. @@ -121,6 +122,7 @@ type ModulesSource interface { HasMetricSet(module, name string) bool MetricSetRegistration(r *Register, module, name string) (MetricSetRegistration, error) ModulesInfo(r *Register) string + ProcessorsForMetricSet(r *Register, module, name string) (*processors.Processors, error) } // NewRegister creates and returns a new Register. @@ -362,6 +364,28 @@ func (r *Register) MetricSets(module string) []string { return metricsets } +// ProcessorsForMetricSet returns a list of processors defined in manifest of the registered metricset. +func (r *Register) ProcessorsForMetricSet(module, name string) (*processors.Processors, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + module = strings.ToLower(module) + name = strings.ToLower(name) + + metricSets, exists := r.metricSets[module] + if exists { + _, exists := metricSets[name] + if exists { + return processors.NewList(nil), nil // Standard metricsets don't have processor definitions. + } + } + + if source := r.secondarySource; source != nil { + return source.ProcessorsForMetricSet(r, module, name) + } + return nil, fmt.Errorf(`metricset "%s" is not registered (module: %s)'`, name, module) +} + // SetSecondarySource sets an additional source of modules func (r *Register) SetSecondarySource(source ModulesSource) { r.lock.Lock() diff --git a/metricbeat/mb/registry_test.go b/metricbeat/mb/registry_test.go index eb0a6a32fdab..f4ba90f4dd40 100644 --- a/metricbeat/mb/registry_test.go +++ b/metricbeat/mb/registry_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -52,9 +53,7 @@ func TestAddModuleNilFactory(t *testing.T) { func TestAddModuleDuplicateName(t *testing.T) { registry := NewRegister() err := registry.AddModule(moduleName, fakeModuleFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = registry.AddModule(moduleName, fakeModuleFactory) if assert.Error(t, err) { @@ -65,9 +64,7 @@ func TestAddModuleDuplicateName(t *testing.T) { func TestAddModule(t *testing.T) { registry := NewRegister() err := registry.AddModule(moduleName, fakeModuleFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) factory, found := registry.modules[moduleName] assert.True(t, found, "module not found") assert.NotNil(t, factory, "factory fuction is nil") @@ -100,9 +97,7 @@ func TestAddMetricSetNilFactory(t *testing.T) { func TestAddMetricSetDuplicateName(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) if assert.Error(t, err) { @@ -113,9 +108,7 @@ func TestAddMetricSetDuplicateName(t *testing.T) { func TestAddMetricSet(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) f, found := registry.metricSets[moduleName][metricSetName] assert.True(t, found, "metricset not found") assert.NotNil(t, f, "factory function is nil") @@ -139,14 +132,10 @@ func TestMetricSetFactory(t *testing.T) { t.Run("without HostParser", func(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Equal(t, metricSetName, reg.Name) assert.NotNil(t, reg.Factory) assert.Nil(t, reg.HostParser) @@ -158,14 +147,10 @@ func TestMetricSetFactory(t *testing.T) { registry := NewRegister() hostParser := func(Module, string) (HostData, error) { return HostData{}, nil } err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory, hostParser) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.NotNil(t, reg.HostParser) // Can't compare functions in Go so just check for non-nil. }) @@ -173,14 +158,10 @@ func TestMetricSetFactory(t *testing.T) { registry := NewRegister() hostParser := func(Module, string) (HostData, error) { return HostData{}, nil } err := registry.addMetricSet(moduleName, metricSetName, fakeMetricSetFactory, WithHostParser(hostParser)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.NotNil(t, reg.HostParser) // Can't compare functions in Go so just check for non-nil. }) @@ -189,14 +170,10 @@ func TestMetricSetFactory(t *testing.T) { registry := NewRegister() err := registry.addMetricSet(moduleName, metricSetName, fakeMetricSetFactory, WithNamespace(ns)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) reg, err := registry.metricSetRegistration(moduleName, metricSetName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Equal(t, metricSetName, reg.Name) assert.NotNil(t, reg.Factory) assert.Nil(t, reg.HostParser) @@ -208,23 +185,17 @@ func TestMetricSetFactory(t *testing.T) { func TestDefaultMetricSet(t *testing.T) { registry := NewRegister() err := registry.addMetricSet(moduleName, metricSetName, fakeMetricSetFactory, DefaultMetricSet()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) names, err := registry.DefaultMetricSets(moduleName) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Contains(t, names, metricSetName) } func TestMetricSetQuery(t *testing.T) { registry := NewRegister() err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) metricsets := registry.MetricSets(moduleName) assert.Equal(t, len(metricsets), 1) @@ -242,3 +213,28 @@ func TestModuleQuery(t *testing.T) { assert.Equal(t, len(modules), 1) assert.Equal(t, modules[0], moduleName) } + +func TestProcessorsForMetricSet_StandardMetricSet(t *testing.T) { + registry := NewRegister() + err := registry.AddMetricSet(moduleName, metricSetName, fakeMetricSetFactory) + procs, err := registry.ProcessorsForMetricSet(moduleName, metricSetName) + require.NotNil(t, procs) + require.Empty(t, procs.List) + require.NoError(t, err) +} + +func TestProcessorsForMetricSet_UndefinedSecondarySource(t *testing.T) { + registry := NewRegister() + procs, err := registry.ProcessorsForMetricSet(moduleName, metricSetName) + require.Nil(t, procs) + require.Error(t, err) +} + +func TestProcessorsForMetricSet_FromSource(t *testing.T) { + registry := NewRegister() + registry.SetSecondarySource(NewLightModulesSource("testdata/lightmodules")) + procs, err := registry.ProcessorsForMetricSet("unpack", "withprocessors") + require.NoError(t, err) + require.NotNil(t, procs) + require.Len(t, procs.List, 1) +} diff --git a/metricbeat/mb/testdata/lightmodules/unpack/module.yml b/metricbeat/mb/testdata/lightmodules/unpack/module.yml new file mode 100644 index 000000000000..d86e9c4ae450 --- /dev/null +++ b/metricbeat/mb/testdata/lightmodules/unpack/module.yml @@ -0,0 +1,4 @@ +name: service +metricsets: +- withprocessors +- noprocessors diff --git a/metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml b/metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml new file mode 100644 index 000000000000..5291cac44e68 --- /dev/null +++ b/metricbeat/mb/testdata/lightmodules/unpack/noprocessors/manifest.yml @@ -0,0 +1,6 @@ +default: true +input: + module: foo + metricset: bar + defaults: + option: test diff --git a/metricbeat/mb/testdata/lightmodules/unpack/withprocessors/manifest.yml b/metricbeat/mb/testdata/lightmodules/unpack/withprocessors/manifest.yml new file mode 100644 index 000000000000..63984b9bb3d9 --- /dev/null +++ b/metricbeat/mb/testdata/lightmodules/unpack/withprocessors/manifest.yml @@ -0,0 +1,8 @@ +default: true +input: + module: foo + metricset: bar + defaults: + option: test +processors: + - add_id: diff --git a/metricbeat/tests/system/test_autodiscover.py b/metricbeat/tests/system/test_autodiscover.py index ae135a8f7fc5..db627c0516fe 100644 --- a/metricbeat/tests/system/test_autodiscover.py +++ b/metricbeat/tests/system/test_autodiscover.py @@ -41,12 +41,12 @@ def test_docker(self): docker_client.images.pull('memcached:latest') container = docker_client.containers.run('memcached:latest', detach=True) - self.wait_until(lambda: self.log_contains('Starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: RunnerGroup{memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: RunnerGroup{memcached')) output = self.read_output_json() proc.check_kill_and_wait() @@ -85,12 +85,12 @@ def test_docker_labels(self): } container = docker_client.containers.run('memcached:latest', labels=labels, detach=True) - self.wait_until(lambda: self.log_contains('Starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: RunnerGroup{memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: RunnerGroup{memcached')) output = self.read_output_json() proc.check_kill_and_wait() @@ -136,12 +136,12 @@ def test_config_appender(self): } container = docker_client.containers.run('memcached:latest', labels=labels, detach=True) - self.wait_until(lambda: self.log_contains('Starting runner: memcached')) + self.wait_until(lambda: self.log_contains('Starting runner: RunnerGroup{memcached')) self.wait_until(lambda: self.output_count(lambda x: x >= 1)) container.stop() - self.wait_until(lambda: self.log_contains('Stopping runner: memcached')) + self.wait_until(lambda: self.log_contains('Stopping runner: RunnerGroup{memcached')) output = self.read_output_json() proc.check_kill_and_wait() diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 9e94fc905b21..eb06b910b71d 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -143,36 +143,6 @@ metricbeat.modules: path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } #------------------------------ Aerospike Module ------------------------------ - module: aerospike @@ -517,26 +487,6 @@ metricbeat.modules: # the options for this metricset are also available here. metrics_path: /metrics - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } - #-------------------------------- Istio Module -------------------------------- # Istio mesh. To collect all all Mixer-generated metrics - module: istio diff --git a/x-pack/metricbeat/module/activemq/_meta/config.yml b/x-pack/metricbeat/module/activemq/_meta/config.yml index cddafae3352d..08791c5dc069 100644 --- a/x-pack/metricbeat/module/activemq/_meta/config.yml +++ b/x-pack/metricbeat/module/activemq/_meta/config.yml @@ -5,33 +5,3 @@ path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } diff --git a/x-pack/metricbeat/module/activemq/broker/manifest.yml b/x-pack/metricbeat/module/activemq/broker/manifest.yml index b266e4a054de..bb62a9adc13b 100644 --- a/x-pack/metricbeat/module/activemq/broker/manifest.yml +++ b/x-pack/metricbeat/module/activemq/broker/manifest.yml @@ -27,3 +27,23 @@ input: field: messages.count - attr: TotalProducerCount field: producers.count +processors: + - script: + lang: javascript + source: > + function process(event) { + var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") + if (broker_memory_broker_pct != null) { + event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) + } + + var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") + if (broker_memory_temp_pct != null) { + event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) + } + + var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") + if (broker_memory_store_pct != null) { + event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) + } + } diff --git a/x-pack/metricbeat/module/activemq/queue/manifest.yml b/x-pack/metricbeat/module/activemq/queue/manifest.yml index 5521f5e68862..84ea800aa45c 100644 --- a/x-pack/metricbeat/module/activemq/queue/manifest.yml +++ b/x-pack/metricbeat/module/activemq/queue/manifest.yml @@ -35,3 +35,13 @@ input: field: messages.enqueue.time.min - attr: ProducerCount field: producers.count +processors: + - script: + lang: javascript + source: > + function process(event) { + var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") + if (queue_memory_broker_pct != null) { + event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) + } + } diff --git a/x-pack/metricbeat/module/activemq/topic/manifest.yml b/x-pack/metricbeat/module/activemq/topic/manifest.yml index 7f9bc9f3ae5e..a2e64f0868f6 100644 --- a/x-pack/metricbeat/module/activemq/topic/manifest.yml +++ b/x-pack/metricbeat/module/activemq/topic/manifest.yml @@ -33,3 +33,13 @@ input: field: messages.enqueue.time.min - attr: ProducerCount field: producers.count +processors: + - script: + lang: javascript + source: > + function process(event) { + var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") + if (topic_memory_broker_pct != null) { + event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) + } + } diff --git a/x-pack/metricbeat/module/ibmmq/_meta/config.yml b/x-pack/metricbeat/module/ibmmq/_meta/config.yml index 2f8973d97302..f16580b07afe 100644 --- a/x-pack/metricbeat/module/ibmmq/_meta/config.yml +++ b/x-pack/metricbeat/module/ibmmq/_meta/config.yml @@ -6,23 +6,3 @@ # This module uses the Prometheus collector metricset, all # the options for this metricset are also available here. metrics_path: /metrics - - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - } diff --git a/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml b/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml index ec802f1ca1b3..21f660d8482a 100644 --- a/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml +++ b/x-pack/metricbeat/module/ibmmq/qmgr/manifest.yml @@ -4,3 +4,27 @@ input: metricset: collector defaults: metrics_path: /metrics + +# The custom processor is responsible for filtering Prometheus metrics +# not stricly related to the IBM MQ domain, e.g. system load, process, +# metrics HTTP server. +processors: + - script: + lang: javascript + source: > + function process(event) { + var metrics = event.Get("prometheus.metrics"); + if (metrics == null) { + event.Cancel(); + return; + } + Object.keys(metrics).forEach(function(key) { + if (!(key.match(/^ibmmq_.*$/))) { + event.Delete("prometheus.metrics." + key); + } + }); + metrics = event.Get("prometheus.metrics"); + if (Object.keys(metrics).length == 0) { + event.Cancel(); + } + } diff --git a/x-pack/metricbeat/module/ibmmq/test_ibmmq.py b/x-pack/metricbeat/module/ibmmq/test_ibmmq.py index c882860d6bda..436f15aa7dba 100644 --- a/x-pack/metricbeat/module/ibmmq/test_ibmmq.py +++ b/x-pack/metricbeat/module/ibmmq/test_ibmmq.py @@ -33,3 +33,8 @@ def test_qmgr(self): self.assert_fields_are_documented(evt) self.assertIn("prometheus", evt.keys(), evt) self.assertIn("metrics", evt["prometheus"].keys(), evt) + self.assertGreater(len(evt["prometheus"]["metrics"].keys()), 0) + + # Verify if processors are correctly setup. + for metric in evt["prometheus"]["metrics"].keys(): + assert metric.startswith("ibmmq_") diff --git a/x-pack/metricbeat/modules.d/activemq.yml.disabled b/x-pack/metricbeat/modules.d/activemq.yml.disabled index db028c03cad9..16756a9c40c7 100644 --- a/x-pack/metricbeat/modules.d/activemq.yml.disabled +++ b/x-pack/metricbeat/modules.d/activemq.yml.disabled @@ -8,33 +8,3 @@ path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false' username: admin # default username password: admin # default password - processors: - - script: - lang: javascript - source: > - function process(event) { - var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct") - if (broker_memory_broker_pct != null) { - event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0) - } - - var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct") - if (broker_memory_temp_pct != null) { - event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0) - } - - var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct") - if (broker_memory_store_pct != null) { - event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0) - } - - var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct") - if (queue_memory_broker_pct != null) { - event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0) - } - - var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct") - if (topic_memory_broker_pct != null) { - event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0) - } - } diff --git a/x-pack/metricbeat/modules.d/ibmmq.yml.disabled b/x-pack/metricbeat/modules.d/ibmmq.yml.disabled index c64dff1daeef..f26e218d0bfd 100644 --- a/x-pack/metricbeat/modules.d/ibmmq.yml.disabled +++ b/x-pack/metricbeat/modules.d/ibmmq.yml.disabled @@ -9,23 +9,3 @@ # This module uses the Prometheus collector metricset, all # the options for this metricset are also available here. metrics_path: /metrics - - # The custom processor is responsible for filtering Prometheus metrics - # not stricly related to the IBM MQ domain, e.g. system load, process, - # metrics HTTP server. - processors: - - script: - lang: javascript - source: > - function process(event) { - var metrics = event.Get("prometheus.metrics"); - Object.keys(metrics).forEach(function(key) { - if (!(key.match(/^ibmmq_.*$/))) { - event.Delete("prometheus.metrics." + key); - } - }); - metrics = event.Get("prometheus.metrics"); - if (Object.keys(metrics).length == 0) { - event.Cancel(); - } - }