Skip to content

Commit

Permalink
[Heartbeat] Add functional-ish tests in golang (elastic#32542)
Browse files Browse the repository at this point in the history
Fixes elastic#31999

This PR turned out to be a bit of a monster. The main thing it does is add testing for functional 'scenarios', that is testing full heartbeat configs and the events they output going through all heartbeat specific code using the libbeat factory interface. This lets us do something sort of close to functional testing, where instead of using the heartbeat binary we actually use this factory interface. This gets us pretty close with a much simpler testing experience that's easier to debug and faster to run. It probably won't miss any error conditions from generating the full binary.

This also cleans up a bunch of linter issues in modified files, which is a bit distracting, things like missing error return values, shadowed variable names etc.

Finally, the new scenario test this adds, which checks that all events always contain a check group, revealed two bugs:

Synthetics test runs create an empty event at the start of each run (with no check group)
Any synth events prior to a journey start event would be missing a check group
This PR fixes this by:

Moving the check group generation code for synthexec up to the stream enricher, rather than the journey enricher.
Creating the check group UUIDs after journeys end, rather than waiting for them to start. The synthetics node agent can produce output before any journey starts
  • Loading branch information
andrewvc authored Aug 8, 2022
1 parent dde2208 commit 3fe6871
Show file tree
Hide file tree
Showing 24 changed files with 754 additions and 213 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Reduced memory usage slightly for browser monitors. {pull}32317[32317]
- Automatically kill zombie-ish node processes. {pull}32393[32393]
- Added timeout for browser monitors. {pull}32434[32434]
- Fix bug browser jobs would sometimes be missing check groups or send empty events. {pull}32542[32542]

*Metricbeat*

Expand Down
48 changes: 32 additions & 16 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"syscall"
"time"

"github.com/elastic/beats/v7/libbeat/publisher/pipeline"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/hbregistry"
"github.com/elastic/beats/v7/heartbeat/monitors"
Expand All @@ -33,8 +38,6 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

_ "github.com/elastic/beats/v7/heartbeat/security"
)
Expand Down Expand Up @@ -67,23 +70,36 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
}
jobConfig := parsedConfig.Jobs

scheduler := scheduler.Create(limit, hbregistry.SchedulerRegistry, location, jobConfig, parsedConfig.RunOnce)
sched := scheduler.Create(limit, hbregistry.SchedulerRegistry, location, jobConfig, parsedConfig.RunOnce)

pipelineClientFactory := func(p beat.Pipeline) (pipeline.ISyncClient, error) {
if parsedConfig.RunOnce {
client, err := pipeline.NewSyncClient(logp.L(), p, beat.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("could not create pipeline sync client for run_once: %w", err)
}
return client, nil
} else {
client, err := p.Connect()
return monitors.SyncPipelineClientAdaptor{C: client}, err
}
}

bt := &Heartbeat{
done: make(chan struct{}),
config: parsedConfig,
scheduler: scheduler,
scheduler: sched,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler.Add, plugin.GlobalPluginsReg, parsedConfig.RunOnce),
dynamicFactory: monitors.NewFactory(b.Info, sched.Add, plugin.GlobalPluginsReg, pipelineClientFactory),
}
return bt, nil
}

// Run executes the beat.
func (bt *Heartbeat) Run(b *beat.Beat) error {
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")
logp.L().Info("heartbeat is running! Hit CTRL-C to stop it.")
groups, _ := syscall.Getgroups()
logp.Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)
logp.L().Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)

// It is important this appear before we check for run once mode
// In run once mode we depend on these monitors being loaded, but not other more
Expand All @@ -96,7 +112,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {

if bt.config.RunOnce {
bt.scheduler.WaitForRunOnce()
logp.Info("Ending run_once run")
logp.L().Info("Ending run_once run")
return nil
}

Expand All @@ -108,7 +124,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunReloadableMonitors(b)
err := bt.RunReloadableMonitors()
if err != nil {
return err
}
Expand All @@ -134,7 +150,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {

<-bt.done

logp.Info("Shutting down.")
logp.L().Info("Shutting down.")
return nil
}

Expand All @@ -145,7 +161,7 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
created, err := bt.dynamicFactory.Create(b.Publisher, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.Info("skipping disabled monitor: %s", err)
logp.L().Info("skipping disabled monitor: %s", err)
continue // don't stop loading monitors just because they're disabled
}

Expand All @@ -166,14 +182,14 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {

// RunCentralMgmtMonitors loads any central management configured configs.
func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
monitors := cfgfile.NewRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
reload.Register.MustRegisterList(b.Info.Beat+".monitors", monitors)
mons := cfgfile.NewRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
reload.Register.MustRegisterList(b.Info.Beat+".monitors", mons)
inputs := cfgfile.NewRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
reload.Register.MustRegisterList("inputs", inputs)
}

// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
func (bt *Heartbeat) RunReloadableMonitors() (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil {
logp.Error(fmt.Errorf("error loading reloadable monitors: %w", err))
Expand All @@ -187,7 +203,7 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
autodiscover, err := autodiscover.NewAutodiscover(
ad, err := autodiscover.NewAutodiscover(
"heartbeat",
b.Publisher,
bt.dynamicFactory,
Expand All @@ -198,7 +214,7 @@ func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover,
if err != nil {
return nil, err
}
return autodiscover, nil
return ad, nil
}

// Stop stops the beat.
Expand Down
8 changes: 8 additions & 0 deletions heartbeat/ecserr/ecserr.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ func NewCmdTimeoutStatusErr(timeout time.Duration, cmd string) *ECSErr {
fmt.Sprintf("command '%s' did not exit before extended timeout: %s", cmd, timeout.String()),
)
}

func NewSyntheticsCmdCouldNotStartErr(reason error) *ECSErr {
return NewECSErr(
ETYPE_IO,
"SYNTHETICS_CMD_COULD_NOT_START",
fmt.Sprintf("could not start command not found: %s", reason),
)
}
8 changes: 8 additions & 0 deletions heartbeat/hbtestllext/isdefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ var IsInt64 = isdef.Is("positiveInt64", func(path llpath.Path, v interface{}) *l
}
return llresult.ValidResult(path)
})

var IsUint16 = isdef.Is("positiveUInt16", func(path llpath.Path, v interface{}) *llresult.Results {
_, ok := v.(uint16)
if !ok {
return llresult.SimpleResult(path, false, "expected a uint16")
}
return llresult.ValidResult(path)
})
67 changes: 47 additions & 20 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"fmt"
"sync"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/scheduler"
Expand All @@ -31,24 +35,24 @@ import (
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
addTask scheduler.AddTask
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
runOnce bool
info beat.Info
addTask scheduler.AddTask
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
pipelineClientFactory PipelineClientFactory
}

type PipelineClientFactory func(pipeline beat.Pipeline) (pipeline.ISyncClient, error)

type publishSettings struct {
// Fields and tags to add to monitor.
EventMetadata mapstr.EventMetadata `config:",inline"`
Expand All @@ -69,20 +73,36 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, addTask scheduler.AddTask, pluginsReg *plugin.PluginsReg, runOnce bool) *RunnerFactory {
func NewFactory(info beat.Info, addTask scheduler.AddTask, pluginsReg *plugin.PluginsReg, pcf PipelineClientFactory) *RunnerFactory {
return &RunnerFactory{
info: info,
addTask: addTask,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.L(),
runOnce: runOnce,
info: info,
addTask: addTask,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.L(),
pipelineClientFactory: pcf,
}
}

type NoopRunner struct{}

func (NoopRunner) String() string {
return "<noop runner>"
}

func (NoopRunner) Start() {
}

func (NoopRunner) Stop() {
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.Pipeline, c *conf.C) (cfgfile.Runner, error) {
if !c.Enabled() {
return NoopRunner{}, nil
}

c, err := stdfields.UnnestStream(c)
if err != nil {
return nil, err
Expand Down Expand Up @@ -119,9 +139,13 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *conf.C) (cfgfile.Runner, erro
}
}()
}
monitor, err := newMonitor(c, f.pluginsReg, p, f.addTask, safeStop, f.runOnce)
pc, err := f.pipelineClientFactory(p)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not create pipeline client via factory: %w", err)
}
monitor, err := newMonitor(c, f.pluginsReg, pc, f.addTask, safeStop)
if err != nil {
return nil, fmt.Errorf("factory could not create monitor: %w", err)
}

if mon, ok := f.byId[monitor.stdFields.ID]; ok {
Expand All @@ -137,6 +161,9 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *conf.C) (cfgfile.Runner, erro

// CheckConfig checks to see if the given monitor config is valid.
func (f *RunnerFactory) CheckConfig(config *conf.C) error {
if !config.Enabled() {
return nil
}
return checkMonitorConfig(config, plugin.GlobalPluginsReg)
}

Expand Down
38 changes: 29 additions & 9 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ package monitors
import (
"regexp"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

var binfo = beat.Info{
Expand Down Expand Up @@ -160,18 +159,39 @@ func TestPreProcessors(t *testing.T) {
}
}

func TestDisabledMonitor(t *testing.T) {
confMap := map[string]interface{}{
"type": "test",
"enabled": "false",
}

conf, err := config.NewConfigFrom(confMap)
require.NoError(t, err)

reg, built, closed := mockPluginsReg()
f, sched, fClose := makeMockFactory(reg)
defer fClose()
defer sched.Stop()
runner, err := f.Create(&MockPipeline{}, conf)
require.NoError(t, err)
require.IsType(t, runner, NoopRunner{})

require.Equal(t, 0, built.Load())
require.Equal(t, 0, closed.Load())
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}
mockPipeline := &MockPipeline{}

sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
f, sched, fClose := makeMockFactory(reg)
defer fClose()
defer sched.Stop()

f := NewFactory(binfo, sched.Add, reg, false)
makeTestMon := func() (*Monitor, error) {
mIface, err := f.Create(pipelineConnector, serverMonConf)
mIface, err := f.Create(mockPipeline, serverMonConf)
if mIface == nil {
return nil, err
} else {
Expand All @@ -180,7 +200,7 @@ func TestDuplicateMonitorIDs(t *testing.T) {
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched.Add, nil, false)
_, m0Err := newMonitor(badConf, reg, mockPipeline.ConnectSync(), sched.Add, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
Expand Down
Loading

0 comments on commit 3fe6871

Please sign in to comment.