Skip to content

Commit

Permalink
Cherry-pick elastic#18797 to 7.7: Fix panic on `metricbeat test modul…
Browse files Browse the repository at this point in the history
…es` (elastic#18853)

Since metricbeat light modules support processors (elastic#15923), module
initialization requires a publisher in the beat so modules can attach
their processors. `metricbeat test modules` is not initializing as
normal metricbeat commands, and it is not initializing any output or
publisher pipeline, so metricbeat panics when trying to initialize
modules with the new method.

This change adds a dummy publisher for this case, and fixes also a
condition that was adding a `nil` module option, causing additional
panics. A test that reproduced the issues is also added.

(cherry picked from commit 316793d)

Add nil pipeline from elastic#16715, required for the fix.

(cherry picked from commit 257fdfc)

Co-authored-by: Steffen Siering <[email protected]>
  • Loading branch information
jsoriano and Steffen Siering authored Jun 4, 2020
1 parent 6d5251e commit 3737eb1
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624]
- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753]
- Fix tags_filter for cloudwatch metricset in aws. {pull}18524[18524]
- Fix panic on `metricbeat test modules` when modules are configured in `metricbeat.modules`. {issue}18789[18789] {pull}18797[18797]
- Add missing network.sent_packets_count metric into compute metricset in googlecloud module. {pull}18802[18802]

*Packetbeat*
Expand Down
83 changes: 83 additions & 0 deletions libbeat/publisher/pipeline/nilpipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

import "github.com/elastic/beats/v7/libbeat/beat"

type nilPipeline struct{}

type nilClient struct {
eventer beat.ClientEventer
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})
}

var _nilPipeline = (*nilPipeline)(nil)

// NewNilPipeline returns a new pipeline that is compatible with
// beats.PipelineConnector. The pipeline will discard all events that have been
// published. Client ACK handlers will still be executed, but the callbacks
// will be executed immediately when the event is published.
func NewNilPipeline() beat.PipelineConnector { return _nilPipeline }

func (p *nilPipeline) Connect() (beat.Client, error) {
return p.ConnectWith(beat.ClientConfig{})
}

func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return &nilClient{
eventer: cfg.Events,
ackCount: cfg.ACKCount,
ackEvents: cfg.ACKEvents,
ackLastEvent: cfg.ACKLastEvent,
}, nil
}

func (c *nilClient) Publish(event beat.Event) {
c.PublishAll([]beat.Event{event})
}

func (c *nilClient) PublishAll(events []beat.Event) {
L := len(events)
if L == 0 {
return
}

if c.ackLastEvent != nil {
c.ackLastEvent(events[L-1].Private)
}
if c.ackEvents != nil {
tmp := make([]interface{}, L)
for i := range events {
tmp[i] = events[i].Private
}
c.ackEvents(tmp)
}
if c.ackCount != nil {
c.ackCount(L)
}
}

func (c *nilClient) Close() error {
if c.eventer != nil {
c.eventer.Closing()
c.eventer.Closed()
}
return nil
}
17 changes: 17 additions & 0 deletions metricbeat/cmd/test/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/testing"
"github.com/elastic/beats/v7/metricbeat/beater"
)
Expand All @@ -49,6 +50,8 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com
os.Exit(1)
}

// A publisher is needed for modules that add their own pipelines
b.Beat.Publisher = newPublisher()
mb, err := create(&b.Beat, b.Beat.BeatConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing metricbeat: %s\n", err)
Expand Down Expand Up @@ -78,3 +81,17 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com
},
}
}

type publisher struct {
beat.PipelineConnector
}

// newPublisher returns a functional publisher that does nothing.
func newPublisher() *publisher {
return &publisher{pipeline.NewNilPipeline()}
}

// SetACKHandler is a dummy implementation of the ack handler for the test publisher.
func (*publisher) SetACKHandler(beat.PipelineACKHandler) error {
return nil
}
2 changes: 1 addition & 1 deletion metricbeat/mb/module/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ConfiguredModules(modulesData []*common.Config, configModulesData *common.C
var modules []*Wrapper

for _, moduleCfg := range modulesData {
module, err := NewWrapper(moduleCfg, mb.Registry, nil)
module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions metricbeat/tests/system/test_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ def test_modules_test(self):
assert self.log_contains("cpu...OK")
assert self.log_contains("memory...OK")

def test_modules_test_with_module_in_main_config(self):
self.render_config_template(reload=False, modules=[{
"name": "system",
"metricsets": ["cpu", "memory"],
"period": "10s",
}])

exit_code = self.run_beat(
logging_args=None,
extra_args=["test", "modules"])

assert exit_code == 0
assert self.log_contains("cpu...OK")
assert self.log_contains("memory...OK")

def test_modules_test_error(self):
"""
Test test modules command with an error result
Expand Down

0 comments on commit 3737eb1

Please sign in to comment.