From 3737eb140d3f5f626af08f7b7b3efb644fa20741 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 4 Jun 2020 13:24:50 +0200 Subject: [PATCH] Cherry-pick #18797 to 7.7: Fix panic on `metricbeat test modules` (#18853) Since metricbeat light modules support processors (#15923), module initialization requires a publisher in the beat so modules can attach their processors. `metricbeat test modules` is not initializing as normal metricbeat commands, and it is not initializing any output or publisher pipeline, so metricbeat panics when trying to initialize modules with the new method. This change adds a dummy publisher for this case, and fixes also a condition that was adding a `nil` module option, causing additional panics. A test that reproduced the issues is also added. (cherry picked from commit 316793d9a56fd1243982814173a1882ce69772f0) Add nil pipeline from #16715, required for the fix. (cherry picked from commit 257fdfc7fa3b350dc9eb09d406f44c0bdb8da7c5) Co-authored-by: Steffen Siering --- CHANGELOG.next.asciidoc | 1 + libbeat/publisher/pipeline/nilpipeline.go | 83 +++++++++++++++++++++++ metricbeat/cmd/test/modules.go | 17 +++++ metricbeat/mb/module/configuration.go | 2 +- metricbeat/tests/system/test_cmd.py | 15 ++++ 5 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 libbeat/publisher/pipeline/nilpipeline.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1c9642b83cf..c3d2e047075 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624] - Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753] - Fix tags_filter for cloudwatch metricset in aws. {pull}18524[18524] +- Fix panic on `metricbeat test modules` when modules are configured in `metricbeat.modules`. {issue}18789[18789] {pull}18797[18797] - Add missing network.sent_packets_count metric into compute metricset in googlecloud module. {pull}18802[18802] *Packetbeat* diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go new file mode 100644 index 00000000000..f32785a8d22 --- /dev/null +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import "github.com/elastic/beats/v7/libbeat/beat" + +type nilPipeline struct{} + +type nilClient struct { + eventer beat.ClientEventer + ackCount func(int) + ackEvents func([]interface{}) + ackLastEvent func(interface{}) +} + +var _nilPipeline = (*nilPipeline)(nil) + +// NewNilPipeline returns a new pipeline that is compatible with +// beats.PipelineConnector. The pipeline will discard all events that have been +// published. Client ACK handlers will still be executed, but the callbacks +// will be executed immediately when the event is published. +func NewNilPipeline() beat.PipelineConnector { return _nilPipeline } + +func (p *nilPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + return &nilClient{ + eventer: cfg.Events, + ackCount: cfg.ACKCount, + ackEvents: cfg.ACKEvents, + ackLastEvent: cfg.ACKLastEvent, + }, nil +} + +func (c *nilClient) Publish(event beat.Event) { + c.PublishAll([]beat.Event{event}) +} + +func (c *nilClient) PublishAll(events []beat.Event) { + L := len(events) + if L == 0 { + return + } + + if c.ackLastEvent != nil { + c.ackLastEvent(events[L-1].Private) + } + if c.ackEvents != nil { + tmp := make([]interface{}, L) + for i := range events { + tmp[i] = events[i].Private + } + c.ackEvents(tmp) + } + if c.ackCount != nil { + c.ackCount(L) + } +} + +func (c *nilClient) Close() error { + if c.eventer != nil { + c.eventer.Closing() + c.eventer.Closed() + } + return nil +} diff --git a/metricbeat/cmd/test/modules.go b/metricbeat/cmd/test/modules.go index 93950973a5d..6bf312d17b9 100644 --- a/metricbeat/cmd/test/modules.go +++ b/metricbeat/cmd/test/modules.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cmd/instance" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/testing" "github.com/elastic/beats/v7/metricbeat/beater" ) @@ -49,6 +50,8 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com os.Exit(1) } + // A publisher is needed for modules that add their own pipelines + b.Beat.Publisher = newPublisher() mb, err := create(&b.Beat, b.Beat.BeatConfig) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing metricbeat: %s\n", err) @@ -78,3 +81,17 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com }, } } + +type publisher struct { + beat.PipelineConnector +} + +// newPublisher returns a functional publisher that does nothing. +func newPublisher() *publisher { + return &publisher{pipeline.NewNilPipeline()} +} + +// SetACKHandler is a dummy implementation of the ack handler for the test publisher. +func (*publisher) SetACKHandler(beat.PipelineACKHandler) error { + return nil +} diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 8b0701c0ae9..d5049690b48 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -30,7 +30,7 @@ func ConfiguredModules(modulesData []*common.Config, configModulesData *common.C var modules []*Wrapper for _, moduleCfg := range modulesData { - module, err := NewWrapper(moduleCfg, mb.Registry, nil) + module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...) if err != nil { return nil, err } diff --git a/metricbeat/tests/system/test_cmd.py b/metricbeat/tests/system/test_cmd.py index 740beedb97a..ad9a507d08c 100644 --- a/metricbeat/tests/system/test_cmd.py +++ b/metricbeat/tests/system/test_cmd.py @@ -124,6 +124,21 @@ def test_modules_test(self): assert self.log_contains("cpu...OK") assert self.log_contains("memory...OK") + def test_modules_test_with_module_in_main_config(self): + self.render_config_template(reload=False, modules=[{ + "name": "system", + "metricsets": ["cpu", "memory"], + "period": "10s", + }]) + + exit_code = self.run_beat( + logging_args=None, + extra_args=["test", "modules"]) + + assert exit_code == 0 + assert self.log_contains("cpu...OK") + assert self.log_contains("memory...OK") + def test_modules_test_error(self): """ Test test modules command with an error result