Skip to content

Commit

Permalink
[7.16](backport #29041) [Heartbeat] Log error on dupe monitor ID inst…
Browse files Browse the repository at this point in the history
…ead of strict req (#29082)

* Remove watch poll feature (#27166)

* Remove watch poll feature

* Changelog

* Update YML

* [Heartbeat] Log error on dupe monitor ID instead of strict req (#29041)

Co-authored-by: Andrew Cholakian <[email protected]>
  • Loading branch information
mergify[bot] and andrewvc authored Nov 23, 2021
1 parent f8196da commit 1badd70
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 341 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for GMT timezone offsets in `decode_cef`. {pull}20993[20993]

*Heartbeat*
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]
- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041]

*Journalbeat*

Expand Down
51 changes: 2 additions & 49 deletions heartbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,6 @@ heartbeat.monitors:
# sub-dictionary. Default is false.
#fields_under_root: false

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -168,16 +148,7 @@ heartbeat.monitors:
# Required TLS protocols
#supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"]

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down Expand Up @@ -210,14 +181,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# Optional HTTP proxy url.
#proxy_url: ''

Expand Down Expand Up @@ -272,17 +235,7 @@ heartbeat.monitors:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down
6 changes: 2 additions & 4 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler, false),
dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg),
}
return bt, nil
}
Expand Down Expand Up @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient,

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler, true)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
created, err := bt.dynamicFactory.Create(b.Publisher, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.Info("skipping disabled monitor: %s", err)
Expand Down
51 changes: 2 additions & 49 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,6 @@ heartbeat.monitors:
# sub-dictionary. Default is false.
#fields_under_root: false

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -168,16 +148,7 @@ heartbeat.monitors:
# Required TLS protocols
#supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"]

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down Expand Up @@ -210,14 +181,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# Optional HTTP proxy url.
#proxy_url: ''

Expand Down Expand Up @@ -272,17 +235,7 @@ heartbeat.monitors:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down
65 changes: 57 additions & 8 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monitors

import (
"fmt"
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
Expand All @@ -37,9 +39,12 @@ import (
// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
allowWatches bool
info beat.Info
sched *scheduler.Scheduler
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
}

type publishSettings struct {
Expand All @@ -62,8 +67,15 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
return &RunnerFactory{info, sched, allowWatches}
func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory {
return &RunnerFactory{
info: info,
sched: sched,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.NewLogger("monitor-factory"),
}
}

// Create makes a new Runner for a new monitor with the given Config.
Expand All @@ -79,13 +91,50 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched, f.allowWatches)
return monitor, err

f.mtx.Lock()
defer f.mtx.Unlock()

// This is a callback executed on stop of a monitor, it ensures we delete the entry in
// byId.
// It's a little tricky, because it handles the problem of this function being
// occasionally invoked twice in one stack.
// f.mtx would be locked given that golang does not support reentrant locks.
// The important thing is clearing the map, not ensuring it stops exactly on time
// so we can defer its removal from the map with a goroutine, thus breaking out of the current stack
// and ensuring the cleanup happen soon enough.
safeStop := func(m *Monitor) {
go func() {
// We can safely relock now, since we're in a new goroutine.
f.mtx.Lock()
defer f.mtx.Unlock()

// If this element hasn't already been removed or replaced with a new
// instance delete it from the map. Check monitor identity via pointer equality.
if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m {
delete(f.byId, m.stdFields.ID)
}
}()
}
monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop)
if err != nil {
return nil, err
}

if mon, ok := f.byId[monitor.stdFields.ID]; ok {
f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID)
// Stop the old monitor, since we'll swap our new one in place
mon.Stop()
}

f.byId[monitor.stdFields.ID] = monitor

return monitor, nil
}

// CheckConfig checks to see if the given monitor config is valid.
func (f *RunnerFactory) CheckConfig(config *common.Config) error {
return checkMonitorConfig(config, plugin.GlobalPluginsReg, f.allowWatches)
return checkMonitorConfig(config, plugin.GlobalPluginsReg)
}

func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) {
Expand Down
63 changes: 58 additions & 5 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

var binfo = beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}

func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}
tests := map[string]struct {
settings publishSettings
expectedIndex string
Expand Down Expand Up @@ -143,3 +146,53 @@ func TestPreProcessors(t *testing.T) {
})
}
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

f := NewFactory(binfo, sched, reg)
makeTestMon := func() (*Monitor, error) {
mIface, err := f.Create(pipelineConnector, serverMonConf)
if mIface == nil {
return nil, err
} else {
return mIface.(*Monitor), err
}
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
m1.Start()
m2, m2Err := makeTestMon()
require.NoError(t, m2Err)
m2.Start()
// Change the name so we can ensure that this is the currently active monitor
m2.stdFields.Name = "mon2"
// This used to trigger an error, but shouldn't any longer, we just log
// the error, and ensure the last monitor wins
require.NoError(t, m2Err)

m, ok := f.byId[m2.stdFields.ID]
require.True(t, ok)
require.Equal(t, m2.stdFields.Name, m.stdFields.Name)
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
}
Loading

0 comments on commit 1badd70

Please sign in to comment.