Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Log error on dupe monitor ID instead of strict req #29041

Merged
merged 14 commits into from
Nov 22, 2021
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change `threatintel` module to use new `threat.*` ECS fields. {pull}29014[29014]

*Heartbeat*
- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041]

*Journalbeat*

Expand Down
6 changes: 2 additions & 4 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler),
dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg),
}
return bt, nil
}
Expand Down Expand Up @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient,

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
created, err := bt.dynamicFactory.Create(b.Publisher, cfg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleans everything up so that we only use a single factory instance in heartbeat

if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.Info("skipping disabled monitor: %s", err)
Expand Down
56 changes: 50 additions & 6 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monitors

import (
"fmt"
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
Expand All @@ -37,8 +39,12 @@ import (
// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
info beat.Info
sched *scheduler.Scheduler
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
}

type publishSettings struct {
Expand All @@ -61,8 +67,15 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, sched *scheduler.Scheduler) *RunnerFactory {
return &RunnerFactory{info, sched}
func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory {
return &RunnerFactory{
info: info,
sched: sched,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.NewLogger("monitor-factory"),
}
}

// Create makes a new Runner for a new monitor with the given Config.
Expand All @@ -78,8 +91,39 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched)
return monitor, err

f.mtx.Lock()
defer f.mtx.Unlock()

// Handle the problem of this function being occasionally invoked from within
// f.mtx being locked given that golang does not support reentrant locks.
// The important thing is clearing the map, not ensuring it stops exactly on time
// so we can defer its removal from the map with a goroutine.
safeStop := func(m *Monitor) {
go func() {
// We can safely relock now, since we're in a new goroutine.
f.mtx.Lock()
defer f.mtx.Unlock()

// If this element hasn't already been removed or replaced with a new
// instance delete it from the map.
if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m {
delete(f.byId, m.stdFields.ID)
}
}()
}
monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop)
if err != nil {
return nil, err
}
if mon, ok := f.byId[monitor.stdFields.ID]; ok {
f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID)
mon.Stop()
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
}

f.byId[monitor.stdFields.ID] = monitor

return monitor, nil
}

// CheckConfig checks to see if the given monitor config is valid.
Expand Down
63 changes: 58 additions & 5 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

var binfo = beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}

func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}

tests := map[string]struct {
settings publishSettings
expectedIndex string
Expand Down Expand Up @@ -143,3 +147,52 @@ func TestPreProcessors(t *testing.T) {
})
}
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

f := NewFactory(binfo, sched, reg)
makeTestMon := func() (*Monitor, error) {
mIface, err := f.Create(pipelineConnector, serverMonConf)
if mIface == nil {
return nil, err
} else {
return mIface.(*Monitor), err
}
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
m1.Start()
m2, m2Err := makeTestMon()
require.NoError(t, m2Err)
m2.Start()
// Change the name so we can ensure that this is the currently active monitor
m2.stdFields.Name = "MON2!!!"
// This used to trigger an error, but shouldn't any longer, we just log
// the error, and ensure the last monitor wins
require.NoError(t, m2Err)

m, ok := f.byId[m2.stdFields.ID]
require.True(t, ok)
require.Equal(t, m2.stdFields.Name, m.stdFields.Name)
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
require.Equal(t, closed.Load(), 3)
}
51 changes: 18 additions & 33 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,21 @@ func (m *Monitor) String() string {
}

func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error {
m, err := newMonitor(config, registrar, nil, nil)
if m != nil {
m.Stop() // Stop the monitor to free up the ID from uniqueness checks
}
return err
}

// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a
// given heartbeat instance.
var uniqueMonitorIDs sync.Map
_, err := newMonitor(config, registrar, nil, nil, nil)

// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor.
type ErrDuplicateMonitorID struct{ ID string }

func (e ErrDuplicateMonitorID) Error() string {
return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID)
return err
}

// newMonitor Creates a new monitor, without leaking resources in the event of an error.
// newMonitor creates a new monitor, without leaking resources in the event of an error.
// you do not need to call Stop(), it will be safely garbage collected unless Start is called.
func newMonitor(
config *common.Config,
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
onStop func(*Monitor),
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler)
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, onStop)
if m != nil && err != nil {
m.Stop()
}
Expand All @@ -108,6 +97,7 @@ func newMonitorUnsafe(
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
onStop func(*Monitor),
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand Down Expand Up @@ -137,12 +127,7 @@ func newMonitorUnsafe(
stats: pluginFactory.Stats,
}

if m.stdFields.ID != "" {
// Ensure we don't have duplicate IDs
if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded {
return m, ErrDuplicateMonitorID{m.stdFields.ID}
}
} else {
if m.stdFields.ID == "" {
// If there's no explicit ID generate one
hash, err := m.configHash()
if err != nil {
Expand All @@ -152,7 +137,14 @@ func newMonitorUnsafe(
}

p, err := pluginFactory.Create(config)
m.close = p.Close

m.close = func() error {
if onStop != nil {
onStop(m)
}
return p.Close()
}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

Expand Down Expand Up @@ -211,20 +203,18 @@ func (m *Monitor) makeTasks(config *common.Config, jobs []jobs.Job) ([]*configur
func (m *Monitor) Start() {
m.internalsMtx.Lock()
defer m.internalsMtx.Unlock()

for _, t := range m.configuredJobs {
t.Start()
}

m.stats.StartMonitor(int64(m.endpoints))
}

// Stop stops the Monitor's execution in its configured scheduler.
// This is safe to call even if the Monitor was never started.
// stopUnsafe stops the monitor without freeing it in global dedup
// needed by dedup itself to avoid a reentrant lock.
func (m *Monitor) Stop() {
m.internalsMtx.Lock()
defer m.internalsMtx.Unlock()
defer m.freeID()

for _, t := range m.configuredJobs {
t.Stop()
Expand All @@ -239,8 +229,3 @@ func (m *Monitor) Stop() {

m.stats.StopMonitor(int64(m.endpoints))
}

func (m *Monitor) freeID() {
// Free up the monitor ID for reuse
uniqueMonitorIDs.Delete(m.stdFields.ID)
}
42 changes: 3 additions & 39 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestMonitor(t *testing.T) {
require.NoError(t, err)
defer sched.Stop()

mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched)
mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, nil)
require.NoError(t, err)

mon.Start()
Expand Down Expand Up @@ -78,43 +78,6 @@ func TestMonitor(t *testing.T) {
assert.Equal(t, true, pcClient.closed)
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

makeTestMon := func() (*Monitor, error) {
return newMonitor(serverMonConf, reg, pipelineConnector, sched)
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
_, m2Err := makeTestMon()
require.Error(t, m2Err)
m1.Stop()
m3, m3Err := makeTestMon()
require.NoError(t, m3Err)
m3.Stop()

// We count 3 because built doesn't count successful builds,
// just attempted creations of monitors
require.Equal(t, 3, built.Load())
// Only one stops because the others errored on create
require.Equal(t, 2, closed.Load())
require.NoError(t, m3Err)
}

func TestCheckInvalidConfig(t *testing.T) {
serverMonConf := mockInvalidPluginConf(t)
reg, built, closed := mockPluginsReg()
Expand All @@ -125,7 +88,8 @@ func TestCheckInvalidConfig(t *testing.T) {
require.NoError(t, err)
defer sched.Stop()

m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched)
m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, nil)
require.Error(t, err)
// This could change if we decide the contract for newMonitor should always return a monitor
require.Nil(t, m, "For this test to work we need a nil value for the monitor.")

Expand Down