Skip to content

Commit

Permalink
[Metricbeat] Support processors defined for light modules (#15923) (#…
Browse files Browse the repository at this point in the history
…15992)

* Adjust test cases first

* Setup processors for light modules

* Fix: comments, mage check

* Adjust code

* Fix: missing comment

* Fix: mage check

* Test light modules

* Test connector

* Test runner

* Fix: ToLower

* Adjust code after review

* Fix: mage check

* Adjust code after review

* Adjust code after review

* Fix: check error

* Fix: imports

* Increase test coverage

* Add unit tests

* Fix: hound

* beater: use factory

* Beater: modules

* Fix: system tests

* Fix: implements interface

* Add changelog entry

* Verify if processors are setup

(cherry picked from commit a70d6e8)
  • Loading branch information
mtojek authored Jan 31, 2020
1 parent 6e093e9 commit a30ac42
Show file tree
Hide file tree
Showing 31 changed files with 586 additions and 383 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `key/value` mode for SQL module. {issue}15770[15770] {pull]15845[15845]
- Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822]
- Make the `system/cpu` metricset collect normalized CPU metrics by default. {issue}15618[15618] {pull}15729[15729]
- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923]

*Packetbeat*

Expand Down
98 changes: 19 additions & 79 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ package beater
import (
"sync"

"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/management"
"github.com/elastic/beats/libbeat/paths"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/management"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"

Expand All @@ -44,20 +42,15 @@ import (

// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
modules []staticModule // Active list of modules.
done chan struct{} // Channel used to initiate shutdown.
runners []module.Runner // Active list of module runners.
config Config
autodiscover *autodiscover.Autodiscover

// Options
moduleOptions []module.Option
}

type staticModule struct {
connector *module.Connector
module *module.Wrapper
}

// Option specifies some optional arguments used for configuring the behavior
// of the Metricbeat framework.
type Option func(mb *Metricbeat)
Expand Down Expand Up @@ -162,46 +155,28 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
moduleOptions := append(
[]module.Option{module.WithMaxStartDelay(config.MaxStartDelay)},
metricbeat.moduleOptions...)
var errs multierror.Errors

factory := module.NewFactory(b.Info, moduleOptions...)

for _, moduleCfg := range config.Modules {
if !moduleCfg.Enabled() {
continue
}

failed := false

connector, err := module.NewConnector(b.Info, b.Publisher, moduleCfg, nil)
runner, err := factory.Create(b.Publisher, moduleCfg, nil)
if err != nil {
errs = append(errs, err)
failed = true
}

module, err := module.NewWrapper(moduleCfg, mb.Registry, moduleOptions...)
if err != nil {
errs = append(errs, err)
failed = true
}

if failed {
continue
return nil, err
}

metricbeat.modules = append(metricbeat.modules, staticModule{
connector: connector,
module: module,
})
metricbeat.runners = append(metricbeat.runners, runner)
}

if err := errs.Err(); err != nil {
return nil, err
}
if len(metricbeat.modules) == 0 && !dynamicCfgEnabled {
if len(metricbeat.runners) == 0 && !dynamicCfgEnabled {
return nil, mb.ErrAllModulesDisabled
}

if config.Autodiscover != nil {
var err error
factory := module.NewFactory(b.Info, metricbeat.moduleOptions...)
adapter := autodiscover.NewFactoryAdapter(factory)
metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover)
if err != nil {
Expand All @@ -220,20 +195,16 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
func (bt *Metricbeat) Run(b *beat.Beat) error {
var wg sync.WaitGroup

// Static modules (metricbeat.modules)
for _, m := range bt.modules {
client, err := m.connector.Connect()
if err != nil {
return err
}

r := module.NewRunner(client, m.module)
// Static modules (metricbeat.runners)
for _, r := range bt.runners {
r.Start()
wg.Add(1)

thatRunner := r
go func() {
defer wg.Done()
<-bt.done
r.Stop()
thatRunner.Stop()
}()
}

Expand Down Expand Up @@ -289,38 +260,7 @@ func (bt *Metricbeat) Stop() {
close(bt.done)
}

// Modules return a list of all configured modules, including anyone present
// under dynamic config settings.
// Modules return a list of all configured modules.
func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) {
var modules []*module.Wrapper
for _, m := range bt.modules {
modules = append(modules, m.module)
}

// Add dynamic modules
if bt.config.ConfigModules.Enabled() {
config := cfgfile.DefaultDynamicConfig
bt.config.ConfigModules.Unpack(&config)

modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
if err != nil {
return nil, errors.Wrap(err, "initialization error")
}

for _, file := range modulesManager.ListEnabled() {
confs, err := cfgfile.LoadList(file.Path)
if err != nil {
return nil, errors.Wrap(err, "error loading config files")
}
for _, conf := range confs {
m, err := module.NewWrapper(conf, mb.Registry, bt.moduleOptions...)
if err != nil {
return nil, errors.Wrap(err, "module initialization error")
}
modules = append(modules, m)
}
}
}

return modules, nil
return module.ConfiguredModules(bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions)
}
30 changes: 0 additions & 30 deletions metricbeat/docs/modules/activemq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,6 @@ metricbeat.modules:
path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false'
username: admin # default username
password: admin # default password
processors:
- script:
lang: javascript
source: >
function process(event) {
var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct")
if (broker_memory_broker_pct != null) {
event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0)
}
var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct")
if (broker_memory_temp_pct != null) {
event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0)
}
var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct")
if (broker_memory_store_pct != null) {
event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0)
}
var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct")
if (queue_memory_broker_pct != null) {
event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0)
}
var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct")
if (topic_memory_broker_pct != null) {
event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0)
}
}
----

[float]
Expand Down
20 changes: 0 additions & 20 deletions metricbeat/docs/modules/ibmmq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,6 @@ metricbeat.modules:
# This module uses the Prometheus collector metricset, all
# the options for this metricset are also available here.
metrics_path: /metrics
# The custom processor is responsible for filtering Prometheus metrics
# not stricly related to the IBM MQ domain, e.g. system load, process,
# metrics HTTP server.
processors:
- script:
lang: javascript
source: >
function process(event) {
var metrics = event.Get("prometheus.metrics");
Object.keys(metrics).forEach(function(key) {
if (!(key.match(/^ibmmq_.*$/))) {
event.Delete("prometheus.metrics." + key);
}
});
metrics = event.Get("prometheus.metrics");
if (Object.keys(metrics).length == 0) {
event.Cancel();
}
}
----

It also supports the options described in <<module-http-config-options>>.
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/mb/lightmetricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

// LightMetricSet contains the definition of a non-registered metric set
Expand All @@ -33,6 +34,7 @@ type LightMetricSet struct {
MetricSet string `config:"metricset" validate:"required"`
Defaults interface{} `config:"defaults"`
} `config:"input" validate:"required"`
Processors processors.PluginConfig `config:"processors"`
}

// Registration obtains a metric set registration for this light metric set, this registration
Expand Down
14 changes: 14 additions & 0 deletions metricbeat/mb/lightmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

const (
Expand Down Expand Up @@ -150,6 +151,19 @@ type lightModuleConfig struct {
MetricSets []string `config:"metricsets"`
}

// ProcessorsForMetricSet returns processors defined for the light metricset.
func (s *LightModulesSource) ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) {
module, err := s.loadModule(r, moduleName)
if err != nil {
return nil, errors.Wrapf(err, "reading processors for metricset '%s' in module '%s' failed", metricSetName, moduleName)
}
metricSet, ok := module.MetricSets[metricSetName]
if !ok {
return nil, fmt.Errorf("unknown metricset '%s' in module '%s'", metricSetName, moduleName)
}
return processors.New(metricSet.Processors)
}

// LightModule contains the definition of a light module
type LightModule struct {
Name string
Expand Down
26 changes: 26 additions & 0 deletions metricbeat/mb/lightmodules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
_ "github.com/elastic/beats/libbeat/processors/add_id"
)

// TestLightModulesAsModuleSource checks that registry correctly lists
Expand Down Expand Up @@ -302,6 +303,31 @@ func TestNewModulesCallModuleFactory(t *testing.T) {
assert.True(t, called, "module factory must be called if registered")
}

func TestProcessorsForMetricSet_UnknownModule(t *testing.T) {
r := NewRegister()
source := NewLightModulesSource("testdata/lightmodules")
procs, err := source.ProcessorsForMetricSet(r, "nonexisting", "fake")
require.Error(t, err)
require.Nil(t, procs)
}

func TestProcessorsForMetricSet_UnknownMetricSet(t *testing.T) {
r := NewRegister()
source := NewLightModulesSource("testdata/lightmodules")
procs, err := source.ProcessorsForMetricSet(r, "unpack", "nonexisting")
require.Error(t, err)
require.Nil(t, procs)
}

func TestProcessorsForMetricSet_ProcessorsRead(t *testing.T) {
r := NewRegister()
source := NewLightModulesSource("testdata/lightmodules")
procs, err := source.ProcessorsForMetricSet(r, "unpack", "withprocessors")
require.NoError(t, err)
require.NotNil(t, procs)
require.Len(t, procs.List, 1)
}

type metricSetWithOption struct {
BaseMetricSet
Option string
Expand Down
65 changes: 65 additions & 0 deletions metricbeat/mb/module/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package module

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
)

// ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings.
func ConfiguredModules(modulesData []*common.Config, configModulesData *common.Config, moduleOptions []Option) ([]*Wrapper, error) {
var modules []*Wrapper

for _, moduleCfg := range modulesData {
module, err := NewWrapper(moduleCfg, mb.Registry, nil)
if err != nil {
return nil, err
}
modules = append(modules, module)
}

// Add dynamic modules
if configModulesData.Enabled() {
config := cfgfile.DefaultDynamicConfig
configModulesData.Unpack(&config)

modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
if err != nil {
return nil, errors.Wrap(err, "initialization error")
}

for _, file := range modulesManager.ListEnabled() {
confs, err := cfgfile.LoadList(file.Path)
if err != nil {
return nil, errors.Wrap(err, "error loading config files")
}
for _, conf := range confs {
m, err := NewWrapper(conf, mb.Registry, moduleOptions...)
if err != nil {
return nil, errors.Wrap(err, "module initialization error")
}
modules = append(modules, m)
}
}
}
return modules, nil
}
Loading

0 comments on commit a30ac42

Please sign in to comment.