Skip to content

Commit

Permalink
[Heartbeat] Log error on dupe monitor ID instead of strict req (#29041)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Nov 22, 2021
1 parent ef83c59 commit 96cbf50
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]
- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041]

*Journalbeat*

Expand Down
6 changes: 2 additions & 4 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler),
dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg),
}
return bt, nil
}
Expand Down Expand Up @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient,

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
created, err := bt.dynamicFactory.Create(b.Publisher, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.Info("skipping disabled monitor: %s", err)
Expand Down
62 changes: 56 additions & 6 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monitors

import (
"fmt"
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
Expand All @@ -37,8 +39,12 @@ import (
// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
info beat.Info
sched *scheduler.Scheduler
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
}

type publishSettings struct {
Expand All @@ -61,8 +67,15 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, sched *scheduler.Scheduler) *RunnerFactory {
return &RunnerFactory{info, sched}
func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory {
return &RunnerFactory{
info: info,
sched: sched,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.NewLogger("monitor-factory"),
}
}

// Create makes a new Runner for a new monitor with the given Config.
Expand All @@ -78,8 +91,45 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched)
return monitor, err

f.mtx.Lock()
defer f.mtx.Unlock()

// This is a callback executed on stop of a monitor, it ensures we delete the entry in
// byId.
// It's a little tricky, because it handles the problem of this function being
// occasionally invoked twice in one stack.
// f.mtx would be locked given that golang does not support reentrant locks.
// The important thing is clearing the map, not ensuring it stops exactly on time
// so we can defer its removal from the map with a goroutine, thus breaking out of the current stack
// and ensuring the cleanup happen soon enough.
safeStop := func(m *Monitor) {
go func() {
// We can safely relock now, since we're in a new goroutine.
f.mtx.Lock()
defer f.mtx.Unlock()

// If this element hasn't already been removed or replaced with a new
// instance delete it from the map. Check monitor identity via pointer equality.
if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m {
delete(f.byId, m.stdFields.ID)
}
}()
}
monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop)
if err != nil {
return nil, err
}

if mon, ok := f.byId[monitor.stdFields.ID]; ok {
f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID)
// Stop the old monitor, since we'll swap our new one in place
mon.Stop()
}

f.byId[monitor.stdFields.ID] = monitor

return monitor, nil
}

// CheckConfig checks to see if the given monitor config is valid.
Expand Down
63 changes: 58 additions & 5 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

var binfo = beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}

func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}
tests := map[string]struct {
settings publishSettings
expectedIndex string
Expand Down Expand Up @@ -143,3 +146,53 @@ func TestPreProcessors(t *testing.T) {
})
}
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

f := NewFactory(binfo, sched, reg)
makeTestMon := func() (*Monitor, error) {
mIface, err := f.Create(pipelineConnector, serverMonConf)
if mIface == nil {
return nil, err
} else {
return mIface.(*Monitor), err
}
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
m1.Start()
m2, m2Err := makeTestMon()
require.NoError(t, m2Err)
m2.Start()
// Change the name so we can ensure that this is the currently active monitor
m2.stdFields.Name = "mon2"
// This used to trigger an error, but shouldn't any longer, we just log
// the error, and ensure the last monitor wins
require.NoError(t, m2Err)

m, ok := f.byId[m2.stdFields.ID]
require.True(t, ok)
require.Equal(t, m2.stdFields.Name, m.stdFields.Name)
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
}
66 changes: 32 additions & 34 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,22 @@ import (
// ErrMonitorDisabled is returned when the monitor plugin is marked as disabled.
var ErrMonitorDisabled = errors.New("monitor not loaded, plugin is disabled")

const (
MON_INIT = iota
MON_STARTED
MON_STOPPED
)

// Monitor represents a configured recurring monitoring configuredJob loaded from a config file. Starting it
// will cause it to run with the given scheduler until Stop() is called.
type Monitor struct {
stdFields stdfields.StdMonitorFields
pluginName string
config *common.Config
registrar *plugin.PluginsReg
uniqueName string
scheduler *scheduler.Scheduler
configuredJobs []*configuredJob
enabled bool
state int
// endpoints is a count of endpoints this monitor measures.
endpoints int
// internalsMtx is used to synchronize access to critical
Expand All @@ -69,32 +74,21 @@ func (m *Monitor) String() string {
}

func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error {
m, err := newMonitor(config, registrar, nil, nil)
if m != nil {
m.Stop() // Stop the monitor to free up the ID from uniqueness checks
}
return err
}
_, err := newMonitor(config, registrar, nil, nil, nil)

// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a
// given heartbeat instance.
var uniqueMonitorIDs sync.Map

// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor.
type ErrDuplicateMonitorID struct{ ID string }

func (e ErrDuplicateMonitorID) Error() string {
return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID)
return err
}

// newMonitor Creates a new monitor, without leaking resources in the event of an error.
// newMonitor creates a new monitor, without leaking resources in the event of an error.
// you do not need to call Stop(), it will be safely garbage collected unless Start is called.
func newMonitor(
config *common.Config,
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
onStop func(*Monitor),
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler)
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, onStop)
if m != nil && err != nil {
m.Stop()
}
Expand All @@ -108,6 +102,7 @@ func newMonitorUnsafe(
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
onStop func(*Monitor),
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand Down Expand Up @@ -135,14 +130,10 @@ func newMonitorUnsafe(
internalsMtx: sync.Mutex{},
config: config,
stats: pluginFactory.Stats,
state: MON_INIT,
}

if m.stdFields.ID != "" {
// Ensure we don't have duplicate IDs
if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded {
return m, ErrDuplicateMonitorID{m.stdFields.ID}
}
} else {
if m.stdFields.ID == "" {
// If there's no explicit ID generate one
hash, err := m.configHash()
if err != nil {
Expand All @@ -152,7 +143,14 @@ func newMonitorUnsafe(
}

p, err := pluginFactory.Create(config)
m.close = p.Close

m.close = func() error {
if onStop != nil {
onStop(m)
}
return p.Close()
}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

Expand Down Expand Up @@ -217,14 +215,18 @@ func (m *Monitor) Start() {
}

m.stats.StartMonitor(int64(m.endpoints))
m.state = MON_STARTED
}

// Stop stops the Monitor's execution in its configured scheduler.
// This is safe to call even if the Monitor was never started.
// Stop stops the monitor without freeing it in global dedup
// needed by dedup itself to avoid a reentrant lock.
func (m *Monitor) Stop() {
m.internalsMtx.Lock()
defer m.internalsMtx.Unlock()
defer m.freeID()

if m.state == MON_STOPPED {
return
}

for _, t := range m.configuredJobs {
t.Stop()
Expand All @@ -238,9 +240,5 @@ func (m *Monitor) Stop() {
}

m.stats.StopMonitor(int64(m.endpoints))
}

func (m *Monitor) freeID() {
// Free up the monitor ID for reuse
uniqueMonitorIDs.Delete(m.stdFields.ID)
m.state = MON_STOPPED
}
Loading

0 comments on commit 96cbf50

Please sign in to comment.