diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 41cb29fb403..acca8a8a4e8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for GMT timezone offsets in `decode_cef`. {pull}20993[20993] *Heartbeat* +- Remove long deprecated `watch_poll` functionality. {pull}27166[27166] +- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041] *Journalbeat* diff --git a/heartbeat/_meta/config/beat.reference.yml.tmpl b/heartbeat/_meta/config/beat.reference.yml.tmpl index 875e966d8f1..67145c7ac21 100644 --- a/heartbeat/_meta/config/beat.reference.yml.tmpl +++ b/heartbeat/_meta/config/beat.reference.yml.tmpl @@ -71,26 +71,6 @@ heartbeat.monitors: # sub-dictionary. Default is false. #fields_under_root: false - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it - # overwrites the pipeline option from the Elasticsearch output. - #pipeline: - - # The index name associated with this input. If this is set, it - # overwrites the index option from the Elasticsearch output. - #index: - - # Set to true to publish fields with null values in events. - #keep_null: false - # Define a directory to load monitor definitions from. Definitions take the form # of individual yaml files. # heartbeat.config.monitors: @@ -168,16 +148,7 @@ heartbeat.monitors: # Required TLS protocols #supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"] - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it + # The Ingest Node pipeline ID associated with this input. If this is set, it # overwrites the pipeline option from the Elasticsearch output. #pipeline: @@ -210,14 +181,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # Optional HTTP proxy url. #proxy_url: '' @@ -272,17 +235,7 @@ heartbeat.monitors: # equals: # myField: expectedValue - - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it + # The Ingest Node pipeline ID associated with this input. If this is set, it # overwrites the pipeline option from the Elasticsearch output. #pipeline: diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 538a3730b4a..c019e37ea72 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { config: parsedConfig, scheduler: scheduler, // dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload - dynamicFactory: monitors.NewFactory(b.Info, scheduler, false), + dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg), } return bt, nil } @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) { - factory := monitors.NewFactory(b.Info, bt.scheduler, true) - var runners []cfgfile.Runner for _, cfg := range bt.config.Monitors { - created, err := factory.Create(b.Publisher, cfg) + created, err := bt.dynamicFactory.Create(b.Publisher, cfg) if err != nil { if errors.Is(err, monitors.ErrMonitorDisabled) { logp.Info("skipping disabled monitor: %s", err) diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 0f0c4d349cb..6ab29d59073 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -71,26 +71,6 @@ heartbeat.monitors: # sub-dictionary. Default is false. #fields_under_root: false - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it - # overwrites the pipeline option from the Elasticsearch output. - #pipeline: - - # The index name associated with this input. If this is set, it - # overwrites the index option from the Elasticsearch output. - #index: - - # Set to true to publish fields with null values in events. - #keep_null: false - # Define a directory to load monitor definitions from. Definitions take the form # of individual yaml files. # heartbeat.config.monitors: @@ -168,16 +148,7 @@ heartbeat.monitors: # Required TLS protocols #supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"] - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it + # The Ingest Node pipeline ID associated with this input. If this is set, it # overwrites the pipeline option from the Elasticsearch output. #pipeline: @@ -210,14 +181,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # Optional HTTP proxy url. #proxy_url: '' @@ -272,17 +235,7 @@ heartbeat.monitors: # equals: # myField: expectedValue - - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it + # The Ingest Node pipeline ID associated with this input. If this is set, it # overwrites the pipeline option from the Elasticsearch output. #pipeline: diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index a4d2ce02b6c..7f660cbb087 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -19,6 +19,7 @@ package monitors import ( "fmt" + "sync" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" @@ -37,9 +39,12 @@ import ( // RunnerFactory that can be used to create cfg.Runner cast versions of Monitor // suitable for config reloading. type RunnerFactory struct { - info beat.Info - sched *scheduler.Scheduler - allowWatches bool + info beat.Info + sched *scheduler.Scheduler + byId map[string]*Monitor + mtx *sync.Mutex + pluginsReg *plugin.PluginsReg + logger *logp.Logger } type publishSettings struct { @@ -62,8 +67,15 @@ type publishSettings struct { } // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. -func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { - return &RunnerFactory{info, sched, allowWatches} +func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory { + return &RunnerFactory{ + info: info, + sched: sched, + byId: map[string]*Monitor{}, + mtx: &sync.Mutex{}, + pluginsReg: pluginsReg, + logger: logp.NewLogger("monitor-factory"), + } } // Create makes a new Runner for a new monitor with the given Config. @@ -79,13 +91,50 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne } p = pipetool.WithClientConfigEdit(p, configEditor) - monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched, f.allowWatches) - return monitor, err + + f.mtx.Lock() + defer f.mtx.Unlock() + + // This is a callback executed on stop of a monitor, it ensures we delete the entry in + // byId. + // It's a little tricky, because it handles the problem of this function being + // occasionally invoked twice in one stack. + // f.mtx would be locked given that golang does not support reentrant locks. + // The important thing is clearing the map, not ensuring it stops exactly on time + // so we can defer its removal from the map with a goroutine, thus breaking out of the current stack + // and ensuring the cleanup happen soon enough. + safeStop := func(m *Monitor) { + go func() { + // We can safely relock now, since we're in a new goroutine. + f.mtx.Lock() + defer f.mtx.Unlock() + + // If this element hasn't already been removed or replaced with a new + // instance delete it from the map. Check monitor identity via pointer equality. + if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m { + delete(f.byId, m.stdFields.ID) + } + }() + } + monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop) + if err != nil { + return nil, err + } + + if mon, ok := f.byId[monitor.stdFields.ID]; ok { + f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID) + // Stop the old monitor, since we'll swap our new one in place + mon.Stop() + } + + f.byId[monitor.stdFields.ID] = monitor + + return monitor, nil } // CheckConfig checks to see if the given monitor config is valid. func (f *RunnerFactory) CheckConfig(config *common.Config) error { - return checkMonitorConfig(config, plugin.GlobalPluginsReg, f.allowWatches) + return checkMonitorConfig(config, plugin.GlobalPluginsReg) } func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) { diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 27b119392ee..4849529cec4 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -23,19 +23,22 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" ) +var binfo = beat.Info{ + Beat: "heartbeat", + IndexPrefix: "heartbeat", + Version: "8.0.0", +} + func TestPreProcessors(t *testing.T) { - binfo := beat.Info{ - Beat: "heartbeat", - IndexPrefix: "heartbeat", - Version: "8.0.0", - } tests := map[string]struct { settings publishSettings expectedIndex string @@ -143,3 +146,53 @@ func TestPreProcessors(t *testing.T) { }) } } + +func TestDuplicateMonitorIDs(t *testing.T) { + serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") + badConf := mockBadPluginConf(t, "custom", "@every 1ms") + reg, built, closed := mockPluginsReg() + pipelineConnector := &MockPipelineConnector{} + + sched := scheduler.New(1, monitoring.NewRegistry()) + err := sched.Start() + require.NoError(t, err) + defer sched.Stop() + + f := NewFactory(binfo, sched, reg) + makeTestMon := func() (*Monitor, error) { + mIface, err := f.Create(pipelineConnector, serverMonConf) + if mIface == nil { + return nil, err + } else { + return mIface.(*Monitor), err + } + } + + // Ensure that an error is returned on a bad config + _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil) + require.Error(t, m0Err) + + // Would fail if the previous newMonitor didn't free the monitor.id + m1, m1Err := makeTestMon() + require.NoError(t, m1Err) + m1.Start() + m2, m2Err := makeTestMon() + require.NoError(t, m2Err) + m2.Start() + // Change the name so we can ensure that this is the currently active monitor + m2.stdFields.Name = "mon2" + // This used to trigger an error, but shouldn't any longer, we just log + // the error, and ensure the last monitor wins + require.NoError(t, m2Err) + + m, ok := f.byId[m2.stdFields.ID] + require.True(t, ok) + require.Equal(t, m2.stdFields.Name, m.stdFields.Name) + m1.Stop() + m2.Stop() + + // 3 are counted as built, even the bad config + require.Equal(t, 3, built.Load()) + // Only 2 closes, because the bad config isn't closed + require.Equal(t, 2, closed.Load()) +} diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index caac7b8aa0c..9cdbb8ecfd6 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -18,8 +18,6 @@ package monitors import ( - "bytes" - "encoding/json" "fmt" "sync" @@ -31,7 +29,6 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/scheduler" - "github.com/elastic/beats/v7/heartbeat/watcher" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -40,17 +37,22 @@ import ( // ErrMonitorDisabled is returned when the monitor plugin is marked as disabled. var ErrMonitorDisabled = errors.New("monitor not loaded, plugin is disabled") +const ( + MON_INIT = iota + MON_STARTED + MON_STOPPED +) + // Monitor represents a configured recurring monitoring configuredJob loaded from a config file. Starting it // will cause it to run with the given scheduler until Stop() is called. type Monitor struct { stdFields stdfields.StdMonitorFields pluginName string config *common.Config - registrar *plugin.PluginsReg - uniqueName string scheduler *scheduler.Scheduler configuredJobs []*configuredJob enabled bool + state int // endpoints is a count of endpoints this monitor measures. endpoints int // internalsMtx is used to synchronize access to critical @@ -58,10 +60,6 @@ type Monitor struct { internalsMtx sync.Mutex close func() error - // Watch related fields - watchPollTasks []*configuredJob - watch watcher.Watch - pipelineConnector beat.PipelineConnector // stats is the countersRecorder used to record lifecycle events @@ -75,37 +73,22 @@ func (m *Monitor) String() string { return fmt.Sprintf("Monitor", m.stdFields.Name, m.enabled) } -func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg, allowWatches bool) error { - m, err := newMonitor(config, registrar, nil, nil, allowWatches) - if m != nil { - m.Stop() // Stop the monitor to free up the ID from uniqueness checks - } - return err -} - -// ErrWatchesDisabled is returned when the user attempts to declare a watch poll file in a -var ErrWatchesDisabled = errors.New("watch poll files are only allowed in heartbeat.yml, not dynamic configs") - -// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a -// given heartbeat instance. -var uniqueMonitorIDs sync.Map - -// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor. -type ErrDuplicateMonitorID struct{ ID string } +func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error { + _, err := newMonitor(config, registrar, nil, nil, nil) -func (e ErrDuplicateMonitorID) Error() string { - return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID) + return err } -// newMonitor Creates a new monitor, without leaking resources in the event of an error. +// newMonitor creates a new monitor, without leaking resources in the event of an error. +// you do not need to call Stop(), it will be safely garbage collected unless Start is called. func newMonitor( config *common.Config, registrar *plugin.PluginsReg, pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, - allowWatches bool, + onStop func(*Monitor), ) (*Monitor, error) { - m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches) + m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, onStop) if m != nil && err != nil { m.Stop() } @@ -119,7 +102,7 @@ func newMonitorUnsafe( registrar *plugin.PluginsReg, pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, - allowWatches bool, + onStop func(*Monitor), ) (*Monitor, error) { // Extract just the Id, Type, and Enabled fields from the config // We'll parse things more precisely later once we know what exact type of @@ -144,18 +127,13 @@ func newMonitorUnsafe( scheduler: scheduler, configuredJobs: []*configuredJob{}, pipelineConnector: pipelineConnector, - watchPollTasks: []*configuredJob{}, internalsMtx: sync.Mutex{}, config: config, stats: pluginFactory.Stats, + state: MON_INIT, } - if m.stdFields.ID != "" { - // Ensure we don't have duplicate IDs - if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded { - return m, ErrDuplicateMonitorID{m.stdFields.ID} - } - } else { + if m.stdFields.ID == "" { // If there's no explicit ID generate one hash, err := m.configHash() if err != nil { @@ -165,7 +143,14 @@ func newMonitorUnsafe( } p, err := pluginFactory.Create(config) - m.close = p.Close + + m.close = func() error { + if onStop != nil { + onStop(m) + } + return p.Close() + } + wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) m.endpoints = p.Endpoints @@ -178,20 +163,6 @@ func newMonitorUnsafe( return m, err } - err = m.makeWatchTasks(pluginFactory) - if err != nil { - return m, err - } - - if len(m.watchPollTasks) > 0 { - if !allowWatches { - return m, ErrWatchesDisabled - } - - logp.Info(`Obsolete option 'watch.poll_file' declared. This will be removed in a future release. -See https://www.elastic.co/guide/en/beats/heartbeat/current/configuration-heartbeat-options.html for more info`) - } - return m, nil } @@ -234,74 +205,6 @@ func (m *Monitor) makeTasks(config *common.Config, jobs []jobs.Job) ([]*configur return mTasks, nil } -func (m *Monitor) makeWatchTasks(pluginFactory plugin.PluginFactory) error { - watchCfg := watcher.DefaultWatchConfig - err := m.config.Unpack(&watchCfg) - if err != nil { - return err - } - - if len(watchCfg.Path) > 0 { - m.watch, err = watcher.NewFilePoller(watchCfg.Path, watchCfg.Poll, func(content []byte) { - var newTasks []*configuredJob - - dec := json.NewDecoder(bytes.NewBuffer(content)) - for dec.More() { - var obj map[string]interface{} - err = dec.Decode(&obj) - if err != nil { - logp.Err("Failed parsing JSON object: %v", err) - return - } - - cfg, err := common.NewConfigFrom(obj) - if err != nil { - logp.Err("Failed normalizing JSON input: %v", err) - return - } - - merged, err := common.MergeConfigs(m.config, cfg) - if err != nil { - logp.Err("Could not merge config: %v", err) - return - } - - p, err := pluginFactory.Create(merged) - m.close = p.Close - m.endpoints = p.Endpoints - if err != nil { - logp.Err("Could not create job from watch file: %v", err) - } - - watchTasks, err := m.makeTasks(merged, p.Jobs) - if err != nil { - logp.Err("Could not make configuredJob for config: %v", err) - return - } - - newTasks = append(newTasks, watchTasks...) - } - - m.internalsMtx.Lock() - defer m.internalsMtx.Unlock() - - for _, t := range m.watchPollTasks { - t.Stop() - } - m.watchPollTasks = newTasks - for _, t := range m.watchPollTasks { - t.Start() - } - }) - - if err != nil { - return err - } - } - - return nil -} - // Start starts the monitor's execution using its configured scheduler. func (m *Monitor) Start() { m.internalsMtx.Lock() @@ -311,25 +214,21 @@ func (m *Monitor) Start() { t.Start() } - for _, t := range m.watchPollTasks { - t.Start() - } - m.stats.StartMonitor(int64(m.endpoints)) + m.state = MON_STARTED } -// Stop stops the Monitor's execution in its configured scheduler. -// This is safe to call even if the Monitor was never started. +// Stop stops the monitor without freeing it in global dedup +// needed by dedup itself to avoid a reentrant lock. func (m *Monitor) Stop() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() - defer m.freeID() - for _, t := range m.configuredJobs { - t.Stop() + if m.state == MON_STOPPED { + return } - for _, t := range m.watchPollTasks { + for _, t := range m.configuredJobs { t.Stop() } @@ -341,9 +240,5 @@ func (m *Monitor) Stop() { } m.stats.StopMonitor(int64(m.endpoints)) -} - -func (m *Monitor) freeID() { - // Free up the monitor ID for reuse - uniqueMonitorIDs.Delete(m.stdFields.ID) + m.state = MON_STOPPED } diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index b0d02f819f6..9a0962ef8b2 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -39,7 +39,7 @@ func TestMonitor(t *testing.T) { require.NoError(t, err) defer sched.Stop() - mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false) + mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, nil) require.NoError(t, err) mon.Start() @@ -78,43 +78,6 @@ func TestMonitor(t *testing.T) { assert.Equal(t, true, pcClient.closed) } -func TestDuplicateMonitorIDs(t *testing.T) { - serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") - badConf := mockBadPluginConf(t, "custom", "@every 1ms") - reg, built, closed := mockPluginsReg() - pipelineConnector := &MockPipelineConnector{} - - sched := scheduler.New(1, monitoring.NewRegistry()) - err := sched.Start() - require.NoError(t, err) - defer sched.Stop() - - makeTestMon := func() (*Monitor, error) { - return newMonitor(serverMonConf, reg, pipelineConnector, sched, false) - } - - // Ensure that an error is returned on a bad config - _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, false) - require.Error(t, m0Err) - - // Would fail if the previous newMonitor didn't free the monitor.id - m1, m1Err := makeTestMon() - require.NoError(t, m1Err) - _, m2Err := makeTestMon() - require.Error(t, m2Err) - m1.Stop() - m3, m3Err := makeTestMon() - require.NoError(t, m3Err) - m3.Stop() - - // We count 3 because built doesn't count successful builds, - // just attempted creations of monitors - require.Equal(t, 3, built.Load()) - // Only one stops because the others errored on create - require.Equal(t, 2, closed.Load()) - require.NoError(t, m3Err) -} - func TestCheckInvalidConfig(t *testing.T) { serverMonConf := mockInvalidPluginConf(t) reg, built, closed := mockPluginsReg() @@ -125,7 +88,8 @@ func TestCheckInvalidConfig(t *testing.T) { require.NoError(t, err) defer sched.Stop() - m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false) + m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, nil) + require.Error(t, err) // This could change if we decide the contract for newMonitor should always return a monitor require.Nil(t, m, "For this test to work we need a nil value for the monitor.") @@ -133,5 +97,5 @@ func TestCheckInvalidConfig(t *testing.T) { require.Equal(t, 0, built.Load()) require.Equal(t, 0, closed.Load()) - require.Error(t, checkMonitorConfig(serverMonConf, reg, false)) + require.Error(t, checkMonitorConfig(serverMonConf, reg)) } diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 0f0c4d349cb..6ab29d59073 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -71,26 +71,6 @@ heartbeat.monitors: # sub-dictionary. Default is false. #fields_under_root: false - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it - # overwrites the pipeline option from the Elasticsearch output. - #pipeline: - - # The index name associated with this input. If this is set, it - # overwrites the index option from the Elasticsearch output. - #index: - - # Set to true to publish fields with null values in events. - #keep_null: false - # Define a directory to load monitor definitions from. Definitions take the form # of individual yaml files. # heartbeat.config.monitors: @@ -168,16 +148,7 @@ heartbeat.monitors: # Required TLS protocols #supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"] - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it + # The Ingest Node pipeline ID associated with this input. If this is set, it # overwrites the pipeline option from the Elasticsearch output. #pipeline: @@ -210,14 +181,6 @@ heartbeat.monitors: ipv6: true mode: any - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - # Optional HTTP proxy url. #proxy_url: '' @@ -272,17 +235,7 @@ heartbeat.monitors: # equals: # myField: expectedValue - - # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE - # Configure file json file to be watched for changes to the monitor: - #watch.poll_file: - # Path to check for updates. - #path: - - # Interval between file file changed checks. - #interval: 5s - - # The ingest pipeline ID associated with this input. If this is set, it + # The Ingest Node pipeline ID associated with this input. If this is set, it # overwrites the pipeline option from the Elasticsearch output. #pipeline: