Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.16](backport #29041) [Heartbeat] Log error on dupe monitor ID instead of strict req #29082

Merged
merged 2 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for GMT timezone offsets in `decode_cef`. {pull}20993[20993]

*Heartbeat*
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]
- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041]

*Journalbeat*

Expand Down
51 changes: 2 additions & 49 deletions heartbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,6 @@ heartbeat.monitors:
# sub-dictionary. Default is false.
#fields_under_root: false

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -168,16 +148,7 @@ heartbeat.monitors:
# Required TLS protocols
#supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"]

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down Expand Up @@ -210,14 +181,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# Optional HTTP proxy url.
#proxy_url: ''

Expand Down Expand Up @@ -272,17 +235,7 @@ heartbeat.monitors:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down
6 changes: 2 additions & 4 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler, false),
dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg),
}
return bt, nil
}
Expand Down Expand Up @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient,

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler, true)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
created, err := bt.dynamicFactory.Create(b.Publisher, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.Info("skipping disabled monitor: %s", err)
Expand Down
51 changes: 2 additions & 49 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,6 @@ heartbeat.monitors:
# sub-dictionary. Default is false.
#fields_under_root: false

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -168,16 +148,7 @@ heartbeat.monitors:
# Required TLS protocols
#supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"]

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down Expand Up @@ -210,14 +181,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# Optional HTTP proxy url.
#proxy_url: ''

Expand Down Expand Up @@ -272,17 +235,7 @@ heartbeat.monitors:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The ingest pipeline ID associated with this input. If this is set, it
# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

Expand Down
65 changes: 57 additions & 8 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monitors

import (
"fmt"
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
Expand All @@ -37,9 +39,12 @@ import (
// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
allowWatches bool
info beat.Info
sched *scheduler.Scheduler
byId map[string]*Monitor
mtx *sync.Mutex
pluginsReg *plugin.PluginsReg
logger *logp.Logger
}

type publishSettings struct {
Expand All @@ -62,8 +67,15 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
return &RunnerFactory{info, sched, allowWatches}
func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory {
return &RunnerFactory{
info: info,
sched: sched,
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
pluginsReg: pluginsReg,
logger: logp.NewLogger("monitor-factory"),
}
}

// Create makes a new Runner for a new monitor with the given Config.
Expand All @@ -79,13 +91,50 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched, f.allowWatches)
return monitor, err

f.mtx.Lock()
defer f.mtx.Unlock()

// This is a callback executed on stop of a monitor, it ensures we delete the entry in
// byId.
// It's a little tricky, because it handles the problem of this function being
// occasionally invoked twice in one stack.
// f.mtx would be locked given that golang does not support reentrant locks.
// The important thing is clearing the map, not ensuring it stops exactly on time
// so we can defer its removal from the map with a goroutine, thus breaking out of the current stack
// and ensuring the cleanup happen soon enough.
safeStop := func(m *Monitor) {
go func() {
// We can safely relock now, since we're in a new goroutine.
f.mtx.Lock()
defer f.mtx.Unlock()

// If this element hasn't already been removed or replaced with a new
// instance delete it from the map. Check monitor identity via pointer equality.
if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m {
delete(f.byId, m.stdFields.ID)
}
}()
}
monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop)
if err != nil {
return nil, err
}

if mon, ok := f.byId[monitor.stdFields.ID]; ok {
f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID)
// Stop the old monitor, since we'll swap our new one in place
mon.Stop()
}

f.byId[monitor.stdFields.ID] = monitor

return monitor, nil
}

// CheckConfig checks to see if the given monitor config is valid.
func (f *RunnerFactory) CheckConfig(config *common.Config) error {
return checkMonitorConfig(config, plugin.GlobalPluginsReg, f.allowWatches)
return checkMonitorConfig(config, plugin.GlobalPluginsReg)
}

func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) {
Expand Down
63 changes: 58 additions & 5 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

var binfo = beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}

func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}
tests := map[string]struct {
settings publishSettings
expectedIndex string
Expand Down Expand Up @@ -143,3 +146,53 @@ func TestPreProcessors(t *testing.T) {
})
}
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.New(1, monitoring.NewRegistry())
err := sched.Start()
require.NoError(t, err)
defer sched.Stop()

f := NewFactory(binfo, sched, reg)
makeTestMon := func() (*Monitor, error) {
mIface, err := f.Create(pipelineConnector, serverMonConf)
if mIface == nil {
return nil, err
} else {
return mIface.(*Monitor), err
}
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
m1.Start()
m2, m2Err := makeTestMon()
require.NoError(t, m2Err)
m2.Start()
// Change the name so we can ensure that this is the currently active monitor
m2.stdFields.Name = "mon2"
// This used to trigger an error, but shouldn't any longer, we just log
// the error, and ensure the last monitor wins
require.NoError(t, m2Err)

m, ok := f.byId[m2.stdFields.ID]
require.True(t, ok)
require.Equal(t, m2.stdFields.Name, m.stdFields.Name)
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
}
Loading