Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Log error on dupe monitor ID instead of strict req #29041

Merged
merged 14 commits into from
Nov 22, 2021
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change `threatintel` module to use new `threat.*` ECS fields. {pull}29014[29014]

*Heartbeat*
- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041]

*Journalbeat*

Expand Down
6 changes: 2 additions & 4 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler),
dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg),
}
return bt, nil
}
Expand Down Expand Up @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient,

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
created, err := bt.dynamicFactory.Create(b.Publisher, cfg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleans everything up so that we only use a single factory instance in heartbeat

if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.Info("skipping disabled monitor: %s", err)
Expand Down
62 changes: 56 additions & 6 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monitors

import (
"fmt"
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
Expand All @@ -37,8 +39,12 @@ import (
// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
info beat.Info
sched *scheduler.Scheduler
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
}

type publishSettings struct {
Expand All @@ -61,8 +67,15 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, sched *scheduler.Scheduler) *RunnerFactory {
return &RunnerFactory{info, sched}
func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory {
return &RunnerFactory{
info: info,
sched: sched,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.NewLogger("monitor-factory"),
}
}

// Create makes a new Runner for a new monitor with the given Config.
Expand All @@ -78,8 +91,45 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched)
return monitor, err

f.mtx.Lock()
defer f.mtx.Unlock()

// This is a callback executed on stop of a monitor, it ensures we delete the entry in
// byId.
// It's a little tricky, because it handles the problem of this function being
// occasionally invoked twice in one stack.
// f.mtx would be locked given that golang does not support reentrant locks.
// The important thing is clearing the map, not ensuring it stops exactly on time
// so we can defer its removal from the map with a goroutine, thus breaking out of the current stack
// and ensuring the cleanup happen soon enough.
safeStop := func(m *Monitor) {
go func() {
// We can safely relock now, since we're in a new goroutine.
f.mtx.Lock()
defer f.mtx.Unlock()

// If this element hasn't already been removed or replaced with a new
// instance delete it from the map. Check monitor identity via pointer equality.
if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m {
delete(f.byId, m.stdFields.ID)
}
}()
}
monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop)
if err != nil {
return nil, err
}

if mon, ok := f.byId[monitor.stdFields.ID]; ok {
f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID)
// Stop the old monitor, since we'll swap our new one in place
mon.Stop()
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
}

f.byId[monitor.stdFields.ID] = monitor

return monitor, nil
}

// CheckConfig checks to see if the given monitor config is valid.
Expand Down
63 changes: 58 additions & 5 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

var binfo = beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}

func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}
tests := map[string]struct {
settings publishSettings
expectedIndex string
Expand Down Expand Up @@ -143,3 +146,53 @@ func TestPreProcessors(t *testing.T) {
})
}
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

f := NewFactory(binfo, sched, reg)
makeTestMon := func() (*Monitor, error) {
mIface, err := f.Create(pipelineConnector, serverMonConf)
if mIface == nil {
return nil, err
} else {
return mIface.(*Monitor), err
}
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
m1.Start()
m2, m2Err := makeTestMon()
require.NoError(t, m2Err)
m2.Start()
// Change the name so we can ensure that this is the currently active monitor
m2.stdFields.Name = "mon2"
// This used to trigger an error, but shouldn't any longer, we just log
// the error, and ensure the last monitor wins
require.NoError(t, m2Err)

m, ok := f.byId[m2.stdFields.ID]
require.True(t, ok)
require.Equal(t, m2.stdFields.Name, m.stdFields.Name)
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
}
66 changes: 32 additions & 34 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,22 @@ import (
// ErrMonitorDisabled is returned when the monitor plugin is marked as disabled.
var ErrMonitorDisabled = errors.New("monitor not loaded, plugin is disabled")

const (
MON_INIT = iota
MON_STARTED
MON_STOPPED
)

// Monitor represents a configured recurring monitoring configuredJob loaded from a config file. Starting it
// will cause it to run with the given scheduler until Stop() is called.
type Monitor struct {
stdFields stdfields.StdMonitorFields
pluginName string
config *common.Config
registrar *plugin.PluginsReg
uniqueName string
scheduler *scheduler.Scheduler
configuredJobs []*configuredJob
enabled bool
state int
// endpoints is a count of endpoints this monitor measures.
endpoints int
// internalsMtx is used to synchronize access to critical
Expand All @@ -69,32 +74,21 @@ func (m *Monitor) String() string {
}

func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error {
m, err := newMonitor(config, registrar, nil, nil)
if m != nil {
m.Stop() // Stop the monitor to free up the ID from uniqueness checks
}
return err
}
_, err := newMonitor(config, registrar, nil, nil, nil)

// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a
// given heartbeat instance.
var uniqueMonitorIDs sync.Map

// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor.
type ErrDuplicateMonitorID struct{ ID string }

func (e ErrDuplicateMonitorID) Error() string {
return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID)
return err
}

// newMonitor Creates a new monitor, without leaking resources in the event of an error.
// newMonitor creates a new monitor, without leaking resources in the event of an error.
// you do not need to call Stop(), it will be safely garbage collected unless Start is called.
func newMonitor(
config *common.Config,
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
onStop func(*Monitor),
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler)
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, onStop)
if m != nil && err != nil {
m.Stop()
}
Expand All @@ -108,6 +102,7 @@ func newMonitorUnsafe(
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
onStop func(*Monitor),
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand Down Expand Up @@ -135,14 +130,10 @@ func newMonitorUnsafe(
internalsMtx: sync.Mutex{},
config: config,
stats: pluginFactory.Stats,
state: MON_INIT,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prevents weird situations with dual stops, makes stop() idempotent.

}

if m.stdFields.ID != "" {
// Ensure we don't have duplicate IDs
if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded {
return m, ErrDuplicateMonitorID{m.stdFields.ID}
}
} else {
if m.stdFields.ID == "" {
// If there's no explicit ID generate one
hash, err := m.configHash()
if err != nil {
Expand All @@ -152,7 +143,14 @@ func newMonitorUnsafe(
}

p, err := pluginFactory.Create(config)
m.close = p.Close

m.close = func() error {
if onStop != nil {
onStop(m)
}
return p.Close()
}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

Expand Down Expand Up @@ -217,14 +215,18 @@ func (m *Monitor) Start() {
}

m.stats.StartMonitor(int64(m.endpoints))
m.state = MON_STARTED
}

// Stop stops the Monitor's execution in its configured scheduler.
// This is safe to call even if the Monitor was never started.
// Stop stops the monitor without freeing it in global dedup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Stop stops the monitor without freeing it in global dedup
// Stop the monitor without freeing it in global dedup

I know the function name is a proper noun here but it feels kinda redundant.

// needed by dedup itself to avoid a reentrant lock.
func (m *Monitor) Stop() {
m.internalsMtx.Lock()
defer m.internalsMtx.Unlock()
defer m.freeID()

if m.state == MON_STOPPED {
return
}

for _, t := range m.configuredJobs {
t.Stop()
Expand All @@ -238,9 +240,5 @@ func (m *Monitor) Stop() {
}

m.stats.StopMonitor(int64(m.endpoints))
}

func (m *Monitor) freeID() {
// Free up the monitor ID for reuse
uniqueMonitorIDs.Delete(m.stdFields.ID)
m.state = MON_STOPPED
}
Loading