Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove watch poll feature #27166

Merged
merged 4 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*
- Add support for screenshot blocks and use newer synthetics flags that only works in newer synthetics betas. {pull}25808[25808]
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]

*Journalbeat*

Expand Down
47 changes: 0 additions & 47 deletions heartbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,6 @@ heartbeat.monitors:
# sub-dictionary. Default is false.
#fields_under_root: false

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

# Define a directory to load monitor definitions from. Definitions take the form
# of individual yaml files.
# heartbeat.config.monitors:
Expand Down Expand Up @@ -168,15 +148,6 @@ heartbeat.monitors:
# Required TLS protocols
#supported_protocols: ["TLSv1.0", "TLSv1.1", "TLSv1.2"]

# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:
Expand Down Expand Up @@ -210,14 +181,6 @@ heartbeat.monitors:
ipv6: true
mode: any

# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# Optional HTTP proxy url.
#proxy_url: ''

Expand Down Expand Up @@ -267,16 +230,6 @@ heartbeat.monitors:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
# Path to check for updates.
#path:

# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(b.Info, scheduler, false),
dynamicFactory: monitors.NewFactory(b.Info, scheduler),
}
return bt, nil
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler, true)
factory := monitors.NewFactory(b.Info, bt.scheduler)

var runners []cfgfile.Runner
for _, cfg := range bt.config.Monitors {
Expand Down
13 changes: 6 additions & 7 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ import (
// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
allowWatches bool
info beat.Info
sched *scheduler.Scheduler
}

type publishSettings struct {
Expand All @@ -59,8 +58,8 @@ type publishSettings struct {
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
return &RunnerFactory{info, sched, allowWatches}
func NewFactory(info beat.Info, sched *scheduler.Scheduler) *RunnerFactory {
return &RunnerFactory{info, sched}
}

// Create makes a new Runner for a new monitor with the given Config.
Expand All @@ -76,13 +75,13 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched, f.allowWatches)
monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched)
return monitor, err
}

// CheckConfig checks to see if the given monitor config is valid.
func (f *RunnerFactory) CheckConfig(config *common.Config) error {
return checkMonitorConfig(config, plugin.GlobalPluginsReg, f.allowWatches)
return checkMonitorConfig(config, plugin.GlobalPluginsReg)
}

func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) {
Expand Down
109 changes: 3 additions & 106 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package monitors

import (
"bytes"
"encoding/json"
"fmt"
"sync"

Expand All @@ -31,7 +29,6 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/heartbeat/watcher"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -55,10 +52,6 @@ type Monitor struct {
internalsMtx sync.Mutex
close func() error

// Watch related fields
watchPollTasks []*configuredJob
watch watcher.Watch

pipelineConnector beat.PipelineConnector

// stats is the countersRecorder used to record lifecycle events
Expand All @@ -72,17 +65,14 @@ func (m *Monitor) String() string {
return fmt.Sprintf("Monitor<pluginName: %s, enabled: %t>", m.stdFields.Name, m.enabled)
}

func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg, allowWatches bool) error {
m, err := newMonitor(config, registrar, nil, nil, allowWatches)
func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error {
m, err := newMonitor(config, registrar, nil, nil)
if m != nil {
m.Stop() // Stop the monitor to free up the ID from uniqueness checks
}
return err
}

// ErrWatchesDisabled is returned when the user attempts to declare a watch poll file in a
var ErrWatchesDisabled = errors.New("watch poll files are only allowed in heartbeat.yml, not dynamic configs")

// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a
// given heartbeat instance.
var uniqueMonitorIDs sync.Map
Expand All @@ -100,9 +90,8 @@ func newMonitor(
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches)
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler)
if m != nil && err != nil {
m.Stop()
}
Expand All @@ -116,7 +105,6 @@ func newMonitorUnsafe(
registrar *plugin.PluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand All @@ -137,7 +125,6 @@ func newMonitorUnsafe(
scheduler: scheduler,
configuredJobs: []*configuredJob{},
pipelineConnector: pipelineConnector,
watchPollTasks: []*configuredJob{},
internalsMtx: sync.Mutex{},
config: config,
stats: pluginFactory.Stats,
Expand Down Expand Up @@ -171,20 +158,6 @@ func newMonitorUnsafe(
return m, err
}

err = m.makeWatchTasks(pluginFactory)
if err != nil {
return m, err
}

if len(m.watchPollTasks) > 0 {
if !allowWatches {
return m, ErrWatchesDisabled
}

logp.Info(`Obsolete option 'watch.poll_file' declared. This will be removed in a future release.
See https://www.elastic.co/guide/en/beats/heartbeat/current/configuration-heartbeat-options.html for more info`)
}

return m, nil
}

Expand Down Expand Up @@ -227,74 +200,6 @@ func (m *Monitor) makeTasks(config *common.Config, jobs []jobs.Job) ([]*configur
return mTasks, nil
}

func (m *Monitor) makeWatchTasks(pluginFactory plugin.PluginFactory) error {
watchCfg := watcher.DefaultWatchConfig
err := m.config.Unpack(&watchCfg)
if err != nil {
return err
}

if len(watchCfg.Path) > 0 {
m.watch, err = watcher.NewFilePoller(watchCfg.Path, watchCfg.Poll, func(content []byte) {
var newTasks []*configuredJob

dec := json.NewDecoder(bytes.NewBuffer(content))
for dec.More() {
var obj map[string]interface{}
err = dec.Decode(&obj)
if err != nil {
logp.Err("Failed parsing JSON object: %v", err)
return
}

cfg, err := common.NewConfigFrom(obj)
if err != nil {
logp.Err("Failed normalizing JSON input: %v", err)
return
}

merged, err := common.MergeConfigs(m.config, cfg)
if err != nil {
logp.Err("Could not merge config: %v", err)
return
}

p, err := pluginFactory.Create(merged)
m.close = p.Close
m.endpoints = p.Endpoints
if err != nil {
logp.Err("Could not create job from watch file: %v", err)
}

watchTasks, err := m.makeTasks(merged, p.Jobs)
if err != nil {
logp.Err("Could not make configuredJob for config: %v", err)
return
}

newTasks = append(newTasks, watchTasks...)
}

m.internalsMtx.Lock()
defer m.internalsMtx.Unlock()

for _, t := range m.watchPollTasks {
t.Stop()
}
m.watchPollTasks = newTasks
for _, t := range m.watchPollTasks {
t.Start()
}
})

if err != nil {
return err
}
}

return nil
}

// Start starts the monitor's execution using its configured scheduler.
func (m *Monitor) Start() {
m.internalsMtx.Lock()
Expand All @@ -304,10 +209,6 @@ func (m *Monitor) Start() {
t.Start()
}

for _, t := range m.watchPollTasks {
t.Start()
}

m.stats.StartMonitor(int64(m.endpoints))
}

Expand All @@ -322,10 +223,6 @@ func (m *Monitor) Stop() {
t.Stop()
}

for _, t := range m.watchPollTasks {
t.Stop()
}

if m.close != nil {
err := m.close()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestMonitor(t *testing.T) {
require.NoError(t, err)
defer sched.Stop()

mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false)
mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched)
require.NoError(t, err)

mon.Start()
Expand Down Expand Up @@ -90,11 +90,11 @@ func TestDuplicateMonitorIDs(t *testing.T) {
defer sched.Stop()

makeTestMon := func() (*Monitor, error) {
return newMonitor(serverMonConf, reg, pipelineConnector, sched, false)
return newMonitor(serverMonConf, reg, pipelineConnector, sched)
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, false)
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
Expand Down Expand Up @@ -125,13 +125,13 @@ func TestCheckInvalidConfig(t *testing.T) {
require.NoError(t, err)
defer sched.Stop()

m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false)
m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched)
// This could change if we decide the contract for newMonitor should always return a monitor
require.Nil(t, m, "For this test to work we need a nil value for the monitor.")

// These counters are both zero since this fails at config parse time
require.Equal(t, 0, built.Load())
require.Equal(t, 0, closed.Load())

require.Error(t, checkMonitorConfig(serverMonConf, reg, false))
require.Error(t, checkMonitorConfig(serverMonConf, reg))
}