From 6c59c2544505ba31b7e2223b84efcab8548b645c Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 18 Nov 2021 16:36:27 -0600 Subject: [PATCH 01/14] [Heartbeat] Log error on dupe monitor ID instead of strict req Currently Heartbeat will not start > 1 monitor with the same ID. This can create unintuitive scenarios in situations where a user changes a monitor by adding a new entry via some reload mechanism (say k8s autodiscover) before the old one is deleted. This PR changes the behavior to only log errors in this situation. Only one monitor will be active, but it will be the newest monitor. --- heartbeat/monitors/monitor.go | 11 ++++++++--- heartbeat/monitors/monitor_test.go | 22 ++++++++++++---------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index f93483d79334..c212a4750c00 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -84,7 +84,7 @@ var uniqueMonitorIDs sync.Map type ErrDuplicateMonitorID struct{ ID string } func (e ErrDuplicateMonitorID) Error() string { - return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID) + return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs should be unique values.", e.ID) } // newMonitor Creates a new monitor, without leaking resources in the event of an error. @@ -139,8 +139,13 @@ func newMonitorUnsafe( if m.stdFields.ID != "" { // Ensure we don't have duplicate IDs - if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded { - return m, ErrDuplicateMonitorID{m.stdFields.ID} + if existingMIface, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded { + // We now only log duplicate monitor id errors, there are too many + // odd situations that can happen where users might temporarily have duplicate + // IDs for a short time when changing things. + logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID) + existingMIface.(*Monitor).close() + uniqueMonitorIDs.Store(m.stdFields.ID, m) } } else { // If there's no explicit ID generate one diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 0f00828bf9b2..a7079f9180d6 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -100,19 +100,21 @@ func TestDuplicateMonitorIDs(t *testing.T) { // Would fail if the previous newMonitor didn't free the monitor.id m1, m1Err := makeTestMon() require.NoError(t, m1Err) - _, m2Err := makeTestMon() - require.Error(t, m2Err) + m2, m2Err := makeTestMon() + // Change the name so we can ensure that this is the currently active monitor + m2.stdFields.Name = "MON2!!!" + // This used to trigger an error, but shouldn't any longer, we just log + // the error, and ensure the last monitor wins + require.NoError(t, m2Err) + + m, ok := uniqueMonitorIDs.Load(m2.stdFields.ID) + require.True(t, ok) + require.Equal(t, m2.stdFields.Name, m.(*Monitor).stdFields.Name) m1.Stop() - m3, m3Err := makeTestMon() - require.NoError(t, m3Err) - m3.Stop() + m2.Stop() - // We count 3 because built doesn't count successful builds, - // just attempted creations of monitors require.Equal(t, 3, built.Load()) - // Only one stops because the others errored on create - require.Equal(t, 2, closed.Load()) - require.NoError(t, m3Err) + require.Equal(t, 3, closed.Load()) } func TestCheckInvalidConfig(t *testing.T) { From d9f64c6e9afa5c6902ea4bfacaf4c2a4cc357473 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 18 Nov 2021 16:56:04 -0600 Subject: [PATCH 02/14] Tweaks |+ changelog --- CHANGELOG.next.asciidoc | 1 + heartbeat/monitors/monitor.go | 7 - heartbeat/service/synthetics_service.go | 274 ++++++++++++++++++++++++ 3 files changed, 275 insertions(+), 7 deletions(-) create mode 100644 heartbeat/service/synthetics_service.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6bb189ee3016..af1027086397 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -65,6 +65,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Change `threatintel` module to use new `threat.*` ECS fields. {pull}29014[29014] *Heartbeat* +- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041] *Journalbeat* diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index c212a4750c00..d69bbacf6f17 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -80,13 +80,6 @@ func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) err // given heartbeat instance. var uniqueMonitorIDs sync.Map -// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor. -type ErrDuplicateMonitorID struct{ ID string } - -func (e ErrDuplicateMonitorID) Error() string { - return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs should be unique values.", e.ID) -} - // newMonitor Creates a new monitor, without leaking resources in the event of an error. func newMonitor( config *common.Config, diff --git a/heartbeat/service/synthetics_service.go b/heartbeat/service/synthetics_service.go new file mode 100644 index 000000000000..dddf836dcaf8 --- /dev/null +++ b/heartbeat/service/synthetics_service.go @@ -0,0 +1,274 @@ +package service + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/elastic/beats/v7/heartbeat/config" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "io/ioutil" + "net/http" + "sync" + "time" +) + +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +var ( + Client HTTPClient +) + +func init() { + Client = &http.Client{} +} + +type Output struct { + Hosts []string `config:"hosts"` + Username string `config:"username"` + Password string `config:"password"` +} + +type SyntheticServicePayload struct { + Monitors []map[string]interface{} `json:"monitors"` + Output Output `json:"output"` +} + +type SyntheticService struct { + config config.Config + monitorReloader *cfgfile.Reloader + servicePushTicker *time.Ticker + servicePushWait sync.WaitGroup + MonitorRunnerFactory *MonitorRunnerFactory +} + +func NewSyntheticService(c config.Config) *SyntheticService { + return &SyntheticService{ + config: c, + servicePushTicker: nil, + servicePushWait: sync.WaitGroup{}, + MonitorRunnerFactory: NewRunnerFactory(), + } +} + +func (service *SyntheticService) Run(b *beat.Beat) error { + logp.Info("Starting run via synthetics service. This is an experimental feature and may be changed or removed in the future!") + + validationErr := service.validateMonitorsSchedule() + if validationErr != nil { + return validationErr + } + + serviceManifest, sErr := service.getSyntheticServiceManifest() + if sErr != nil { + return sErr + } + + serviceLocations := serviceManifest.Locations + + pushInterval := 30 * time.Second + service.servicePushTicker = time.NewTicker(pushInterval) + + output := Output{} + err := b.Config.Output.Config().Unpack(&output) + if err != nil { + logp.Info("Unable to parse output param") + return err + } + + // first we need to push at start, and then ticker will take over + for locationKey, serviceLocation := range serviceLocations { + service.servicePushWait.Add(1) + go service.pushConfigsToSyntheticsService(locationKey, serviceLocation, output) + } + go service.schedulePushConfig(serviceLocations, output) + if service.config.ConfigMonitors.Enabled(){ + go service.scheduleReloadPushConfig(serviceLocations, output) + } + return nil + +} + +func (service *SyntheticService) Wait() { + service.servicePushWait.Wait() +} + +func (service *SyntheticService) Stop() { + service.servicePushTicker.Stop() +} + +func (service *SyntheticService) schedulePushConfig(serviceLocations map[string]config.ServiceLocation, output Output) { + service.servicePushWait.Add(1) + for { + <-service.servicePushTicker.C: + for locationKey, serviceLocation := range serviceLocations { + service.servicePushWait.Add(1) + // first we need to do at start, and then ticker will take over + go service.pushConfigsToSyntheticsService(locationKey, serviceLocation, output) + } + + defer service.servicePushWait.Done() + } +} + +func (service *SyntheticService) scheduleReloadPushConfig(serviceLocations map[string]config.ServiceLocation, output Output) { + reloadPushTicker := time.NewTicker(1 * time.Hour) + for { + select { + case <-service.MonitorRunnerFactory.Update: + if reloadPushTicker == nil { + reloadPushTicker = time.NewTicker(1 * time.Second) + } else { + reloadPushTicker.Reset(1 * time.Second) + } + + case <-reloadPushTicker.C: + for locationKey, serviceLocation := range serviceLocations { + service.servicePushWait.Add(1) + // first we need to do at start, and then ticker will take over + go service.pushConfigsToSyntheticsService(locationKey, serviceLocation, output) + } + reloadPushTicker.Stop() + } + } +} + +func (service *SyntheticService) getSyntheticServiceManifest() (config.ServiceManifest, error) { + serviceCfg := service.config.Service + var err error + + if serviceCfg.Username == "" { + err = errors.New("synthetic service username is required for authentication") + } + + if serviceCfg.Password == "" { + err = errors.New("synthetic service password is required for authentication") + } + + if serviceCfg.ManifestURL == "" { + err = errors.New("synthetic service manifest url is required") + } + + if err != nil { + return config.ServiceManifest{}, err + } + + req, err := http.NewRequest("GET", serviceCfg.ManifestURL, nil) + + resp, err := Client.Do(req) + + if err != nil { + return config.ServiceManifest{}, err + } + + serviceManifest := config.ServiceManifest{} + + read, err := ioutil.ReadAll(resp.Body) + + err = json.Unmarshal(read, &serviceManifest) + + return serviceManifest, err + +} + +func (service *SyntheticService) validateMonitorsSchedule() error { + for _, m := range service.config.Monitors { + monitorFields, _ := stdfields.ConfigToStdMonitorFields(m) + monitorSchedule, _ := schedule.ParseSchedule(monitorFields.ScheduleStr) + if monitorSchedule.Seconds() < 60 { + return errors.New("schedule can't be less than 1 minute while using synthetics service") + } + } + return nil +} + +func (service *SyntheticService) pushConfigsToSyntheticsService(locationKey string, serviceLocation config.ServiceLocation, output Output) { + defer service.servicePushWait.Done() + + payload := SyntheticServicePayload{Output: output} + + addToPayload := func(monCfg *common.Config) { + monitorFields, _ := stdfields.ConfigToStdMonitorFields(monCfg) + if locationInServiceLocation(locationKey, monitorFields.ServiceLocations) { + target := map[string]interface{}{} + err := monCfg.Unpack(target) + if err != nil { + logp.Info("error unpacking monitor plugin config") + return + } + payload.Monitors = append(payload.Monitors, target) + } + } + monitorsById := map[string]*common.Config{} + for _, monCfg := range service.config.Monitors { + monitorFields, _ := stdfields.ConfigToStdMonitorFields(monCfg) + monitorsById[monitorFields.ID] = monCfg + } + + if service.config.ConfigMonitors.Enabled() { + for monId, monitor := range service.MonitorRunnerFactory.GetMonitorsById() { + monitorsById[monId] = monitor + } + } + + for _, monitor := range monitorsById { + addToPayload(monitor) + } + + if len(payload.Monitors) == 0 { + logp.Info("No monitor found for service: %s, to push configuration.", serviceLocation.Geo.Name) + return + } + + serviceCfg := service.config.Service + + jsonValue, _ := json.Marshal(payload) + url := fmt.Sprintf("%s/cronjob", serviceLocation.Url) + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonValue)) + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(serviceCfg.Username, serviceCfg.Password) + + resp, err := Client.Do(req) + if err != nil { + logp.Info("Failed to push configurations to the synthetics service: %s for %d monitors", + serviceLocation.Geo.Name, len(payload.Monitors)) + logp.Error(err) + return + } + + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + logp.Info("Failed to push configurations to the synthetics service: %s for %d monitors", + serviceLocation.Geo.Name, len(payload.Monitors)) + logp.Error(err) + } + bodyString := string(bodyBytes) + + if bodyString == "success" { + logp.Info("Successfully pushed configurations to the synthetics service: %s for %d monitors", + serviceLocation.Geo.Name, len(payload.Monitors)) + } + } + +} + +func locationInServiceLocation(location string, locationsList []string) bool { + for _, b := range locationsList { + if b == location { + return true + } + } + return false +} From ec7d4be5cf467c7e6da9a896aa9576895c09fbc0 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 09:53:04 -0600 Subject: [PATCH 03/14] Fix threadsafety --- heartbeat/monitors/dedup.go | 65 ++++++ heartbeat/monitors/monitor.go | 32 ++- heartbeat/monitors/monitor_test.go | 7 +- heartbeat/service/synthetics_service.go | 274 ------------------------ 4 files changed, 81 insertions(+), 297 deletions(-) create mode 100644 heartbeat/monitors/dedup.go delete mode 100644 heartbeat/service/synthetics_service.go diff --git a/heartbeat/monitors/dedup.go b/heartbeat/monitors/dedup.go new file mode 100644 index 000000000000..f345b2b89841 --- /dev/null +++ b/heartbeat/monitors/dedup.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func newDedup() dedup { + return dedup{ + byId: map[string]*Monitor{}, + mtx: &sync.Mutex{}, + } +} + +type dedup struct { + byId map[string]*Monitor + mtx *sync.Mutex +} + +func (um dedup) register(m *Monitor) { + um.mtx.Lock() + defer um.mtx.Unlock() + + closed := um.closeUnsafe(m) + if closed { + logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID) + } + + um.byId[m.stdFields.ID] = m +} + +func (um dedup) unregister(m *Monitor) { + um.mtx.Lock() + defer um.mtx.Unlock() + + um.closeUnsafe(m) + + delete(um.byId, m.stdFields.ID) +} + +func (um dedup) closeUnsafe(m *Monitor) bool { + if existing, ok := um.byId[m.stdFields.ID]; ok { + existing.close() + return ok + } + return false +} diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index d69bbacf6f17..b04266aeb92d 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -76,9 +76,9 @@ func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) err return err } -// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a +// globalDedup is used to keep track of explicitly configured monitor IDs and ensure no duplication within a // given heartbeat instance. -var uniqueMonitorIDs sync.Map +var globalDedup = newDedup() // newMonitor Creates a new monitor, without leaking resources in the event of an error. func newMonitor( @@ -130,24 +130,16 @@ func newMonitorUnsafe( stats: pluginFactory.Stats, } - if m.stdFields.ID != "" { - // Ensure we don't have duplicate IDs - if existingMIface, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded { - // We now only log duplicate monitor id errors, there are too many - // odd situations that can happen where users might temporarily have duplicate - // IDs for a short time when changing things. - logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID) - existingMIface.(*Monitor).close() - uniqueMonitorIDs.Store(m.stdFields.ID, m) - } - } else { - // If there's no explicit ID generate one - hash, err := m.configHash() - if err != nil { - return m, err - } - m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash) + // If there's no explicit ID generate one + hash, err := m.configHash() + if err != nil { + return m, err } + m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash) + + // De-duplicate monitors with identical IDs + // last write wins + globalDedup.register(m) p, err := pluginFactory.Create(config) m.close = p.Close @@ -240,5 +232,5 @@ func (m *Monitor) Stop() { func (m *Monitor) freeID() { // Free up the monitor ID for reuse - uniqueMonitorIDs.Delete(m.stdFields.ID) + globalDedup.unregister(m) } diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index a7079f9180d6..f1f9e0fc2b8a 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -107,14 +107,15 @@ func TestDuplicateMonitorIDs(t *testing.T) { // the error, and ensure the last monitor wins require.NoError(t, m2Err) - m, ok := uniqueMonitorIDs.Load(m2.stdFields.ID) + m, ok := globalDedup.byId[m2.stdFields.ID] require.True(t, ok) - require.Equal(t, m2.stdFields.Name, m.(*Monitor).stdFields.Name) + require.Equal(t, m2.stdFields.Name, m.stdFields.Name) m1.Stop() m2.Stop() require.Equal(t, 3, built.Load()) - require.Equal(t, 3, closed.Load()) + // Make sure each is closed at least once + require.GreaterOrEqual(t, closed.Load(), 3) } func TestCheckInvalidConfig(t *testing.T) { diff --git a/heartbeat/service/synthetics_service.go b/heartbeat/service/synthetics_service.go deleted file mode 100644 index dddf836dcaf8..000000000000 --- a/heartbeat/service/synthetics_service.go +++ /dev/null @@ -1,274 +0,0 @@ -package service - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "github.com/elastic/beats/v7/heartbeat/config" - "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" - "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - "io/ioutil" - "net/http" - "sync" - "time" -) - -type HTTPClient interface { - Do(req *http.Request) (*http.Response, error) -} - -var ( - Client HTTPClient -) - -func init() { - Client = &http.Client{} -} - -type Output struct { - Hosts []string `config:"hosts"` - Username string `config:"username"` - Password string `config:"password"` -} - -type SyntheticServicePayload struct { - Monitors []map[string]interface{} `json:"monitors"` - Output Output `json:"output"` -} - -type SyntheticService struct { - config config.Config - monitorReloader *cfgfile.Reloader - servicePushTicker *time.Ticker - servicePushWait sync.WaitGroup - MonitorRunnerFactory *MonitorRunnerFactory -} - -func NewSyntheticService(c config.Config) *SyntheticService { - return &SyntheticService{ - config: c, - servicePushTicker: nil, - servicePushWait: sync.WaitGroup{}, - MonitorRunnerFactory: NewRunnerFactory(), - } -} - -func (service *SyntheticService) Run(b *beat.Beat) error { - logp.Info("Starting run via synthetics service. This is an experimental feature and may be changed or removed in the future!") - - validationErr := service.validateMonitorsSchedule() - if validationErr != nil { - return validationErr - } - - serviceManifest, sErr := service.getSyntheticServiceManifest() - if sErr != nil { - return sErr - } - - serviceLocations := serviceManifest.Locations - - pushInterval := 30 * time.Second - service.servicePushTicker = time.NewTicker(pushInterval) - - output := Output{} - err := b.Config.Output.Config().Unpack(&output) - if err != nil { - logp.Info("Unable to parse output param") - return err - } - - // first we need to push at start, and then ticker will take over - for locationKey, serviceLocation := range serviceLocations { - service.servicePushWait.Add(1) - go service.pushConfigsToSyntheticsService(locationKey, serviceLocation, output) - } - go service.schedulePushConfig(serviceLocations, output) - if service.config.ConfigMonitors.Enabled(){ - go service.scheduleReloadPushConfig(serviceLocations, output) - } - return nil - -} - -func (service *SyntheticService) Wait() { - service.servicePushWait.Wait() -} - -func (service *SyntheticService) Stop() { - service.servicePushTicker.Stop() -} - -func (service *SyntheticService) schedulePushConfig(serviceLocations map[string]config.ServiceLocation, output Output) { - service.servicePushWait.Add(1) - for { - <-service.servicePushTicker.C: - for locationKey, serviceLocation := range serviceLocations { - service.servicePushWait.Add(1) - // first we need to do at start, and then ticker will take over - go service.pushConfigsToSyntheticsService(locationKey, serviceLocation, output) - } - - defer service.servicePushWait.Done() - } -} - -func (service *SyntheticService) scheduleReloadPushConfig(serviceLocations map[string]config.ServiceLocation, output Output) { - reloadPushTicker := time.NewTicker(1 * time.Hour) - for { - select { - case <-service.MonitorRunnerFactory.Update: - if reloadPushTicker == nil { - reloadPushTicker = time.NewTicker(1 * time.Second) - } else { - reloadPushTicker.Reset(1 * time.Second) - } - - case <-reloadPushTicker.C: - for locationKey, serviceLocation := range serviceLocations { - service.servicePushWait.Add(1) - // first we need to do at start, and then ticker will take over - go service.pushConfigsToSyntheticsService(locationKey, serviceLocation, output) - } - reloadPushTicker.Stop() - } - } -} - -func (service *SyntheticService) getSyntheticServiceManifest() (config.ServiceManifest, error) { - serviceCfg := service.config.Service - var err error - - if serviceCfg.Username == "" { - err = errors.New("synthetic service username is required for authentication") - } - - if serviceCfg.Password == "" { - err = errors.New("synthetic service password is required for authentication") - } - - if serviceCfg.ManifestURL == "" { - err = errors.New("synthetic service manifest url is required") - } - - if err != nil { - return config.ServiceManifest{}, err - } - - req, err := http.NewRequest("GET", serviceCfg.ManifestURL, nil) - - resp, err := Client.Do(req) - - if err != nil { - return config.ServiceManifest{}, err - } - - serviceManifest := config.ServiceManifest{} - - read, err := ioutil.ReadAll(resp.Body) - - err = json.Unmarshal(read, &serviceManifest) - - return serviceManifest, err - -} - -func (service *SyntheticService) validateMonitorsSchedule() error { - for _, m := range service.config.Monitors { - monitorFields, _ := stdfields.ConfigToStdMonitorFields(m) - monitorSchedule, _ := schedule.ParseSchedule(monitorFields.ScheduleStr) - if monitorSchedule.Seconds() < 60 { - return errors.New("schedule can't be less than 1 minute while using synthetics service") - } - } - return nil -} - -func (service *SyntheticService) pushConfigsToSyntheticsService(locationKey string, serviceLocation config.ServiceLocation, output Output) { - defer service.servicePushWait.Done() - - payload := SyntheticServicePayload{Output: output} - - addToPayload := func(monCfg *common.Config) { - monitorFields, _ := stdfields.ConfigToStdMonitorFields(monCfg) - if locationInServiceLocation(locationKey, monitorFields.ServiceLocations) { - target := map[string]interface{}{} - err := monCfg.Unpack(target) - if err != nil { - logp.Info("error unpacking monitor plugin config") - return - } - payload.Monitors = append(payload.Monitors, target) - } - } - monitorsById := map[string]*common.Config{} - for _, monCfg := range service.config.Monitors { - monitorFields, _ := stdfields.ConfigToStdMonitorFields(monCfg) - monitorsById[monitorFields.ID] = monCfg - } - - if service.config.ConfigMonitors.Enabled() { - for monId, monitor := range service.MonitorRunnerFactory.GetMonitorsById() { - monitorsById[monId] = monitor - } - } - - for _, monitor := range monitorsById { - addToPayload(monitor) - } - - if len(payload.Monitors) == 0 { - logp.Info("No monitor found for service: %s, to push configuration.", serviceLocation.Geo.Name) - return - } - - serviceCfg := service.config.Service - - jsonValue, _ := json.Marshal(payload) - url := fmt.Sprintf("%s/cronjob", serviceLocation.Url) - - req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonValue)) - req.Header.Set("Content-Type", "application/json") - - req.SetBasicAuth(serviceCfg.Username, serviceCfg.Password) - - resp, err := Client.Do(req) - if err != nil { - logp.Info("Failed to push configurations to the synthetics service: %s for %d monitors", - serviceLocation.Geo.Name, len(payload.Monitors)) - logp.Error(err) - return - } - - defer resp.Body.Close() - - if resp.StatusCode == http.StatusOK { - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - logp.Info("Failed to push configurations to the synthetics service: %s for %d monitors", - serviceLocation.Geo.Name, len(payload.Monitors)) - logp.Error(err) - } - bodyString := string(bodyBytes) - - if bodyString == "success" { - logp.Info("Successfully pushed configurations to the synthetics service: %s for %d monitors", - serviceLocation.Geo.Name, len(payload.Monitors)) - } - } - -} - -func locationInServiceLocation(location string, locationsList []string) bool { - for _, b := range locationsList { - if b == location { - return true - } - } - return false -} From 6d4a78d0b7435ff4471e05259e4152ee9041740b Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 09:55:09 -0600 Subject: [PATCH 04/14] Stop not close --- heartbeat/monitors/dedup.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/heartbeat/monitors/dedup.go b/heartbeat/monitors/dedup.go index f345b2b89841..7162580a524d 100644 --- a/heartbeat/monitors/dedup.go +++ b/heartbeat/monitors/dedup.go @@ -39,7 +39,7 @@ func (um dedup) register(m *Monitor) { um.mtx.Lock() defer um.mtx.Unlock() - closed := um.closeUnsafe(m) + closed := um.stopUnsafe(m) if closed { logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID) } @@ -51,14 +51,14 @@ func (um dedup) unregister(m *Monitor) { um.mtx.Lock() defer um.mtx.Unlock() - um.closeUnsafe(m) + um.stopUnsafe(m) delete(um.byId, m.stdFields.ID) } -func (um dedup) closeUnsafe(m *Monitor) bool { +func (um dedup) stopUnsafe(m *Monitor) bool { if existing, ok := um.byId[m.stdFields.ID]; ok { - existing.close() + existing.Stop() return ok } return false From f36134895d194543f762695deab53561b54a0f39 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 11:10:45 -0600 Subject: [PATCH 05/14] Check IDs --- heartbeat/monitors/monitor.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index b04266aeb92d..eaca5cd6b9f2 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -130,12 +130,14 @@ func newMonitorUnsafe( stats: pluginFactory.Stats, } - // If there's no explicit ID generate one - hash, err := m.configHash() - if err != nil { - return m, err + if m.stdFields.ID == "" { + // If there's no explicit ID generate one + hash, err := m.configHash() + if err != nil { + return m, err + } + m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash) } - m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash) // De-duplicate monitors with identical IDs // last write wins From 8dd48781795a947203223b3ad50bebe7f4c02073 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 11:28:59 -0600 Subject: [PATCH 06/14] Fix reentrant lock issue causing deadlock --- heartbeat/monitors/dedup.go | 6 +++--- heartbeat/monitors/monitor.go | 13 +++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/heartbeat/monitors/dedup.go b/heartbeat/monitors/dedup.go index 7162580a524d..931e40ffc531 100644 --- a/heartbeat/monitors/dedup.go +++ b/heartbeat/monitors/dedup.go @@ -39,8 +39,8 @@ func (um dedup) register(m *Monitor) { um.mtx.Lock() defer um.mtx.Unlock() - closed := um.stopUnsafe(m) - if closed { + stopped := um.stopUnsafe(m) + if stopped { logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID) } @@ -58,7 +58,7 @@ func (um dedup) unregister(m *Monitor) { func (um dedup) stopUnsafe(m *Monitor) bool { if existing, ok := um.byId[m.stdFields.ID]; ok { - existing.Stop() + existing.stopUnsafe() return ok } return false diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index eaca5cd6b9f2..d3368279f3c0 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -214,9 +214,15 @@ func (m *Monitor) Start() { // Stop stops the Monitor's execution in its configured scheduler. // This is safe to call even if the Monitor was never started. func (m *Monitor) Stop() { + defer globalDedup.unregister(m) + m.stopUnsafe() +} + +// stopUnsafe stops the monitor without freeing it in global dedup +// needed by dedup itself to avoid a reentrant lock. +func (m *Monitor) stopUnsafe() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() - defer m.freeID() for _, t := range m.configuredJobs { t.Stop() @@ -231,8 +237,3 @@ func (m *Monitor) Stop() { m.stats.StopMonitor(int64(m.endpoints)) } - -func (m *Monitor) freeID() { - // Free up the monitor ID for reuse - globalDedup.unregister(m) -} From 6f2df9106ef0b47454faf8b8649cda968b6689b0 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 11:40:37 -0600 Subject: [PATCH 07/14] Fix tests --- heartbeat/monitors/monitor.go | 4 ++-- heartbeat/monitors/monitor_test.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index d3368279f3c0..3903deb24290 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -214,8 +214,8 @@ func (m *Monitor) Start() { // Stop stops the Monitor's execution in its configured scheduler. // This is safe to call even if the Monitor was never started. func (m *Monitor) Stop() { - defer globalDedup.unregister(m) - m.stopUnsafe() + // later calls stopUnsafe + globalDedup.unregister(m) } // stopUnsafe stops the monitor without freeing it in global dedup diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index f1f9e0fc2b8a..1775a1fc0aa1 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -113,9 +113,11 @@ func TestDuplicateMonitorIDs(t *testing.T) { m1.Stop() m2.Stop() + // 3 are counted as built, even the bad config require.Equal(t, 3, built.Load()) // Make sure each is closed at least once - require.GreaterOrEqual(t, closed.Load(), 3) + // the bad config doesn't need to be closed + require.Equal(t, closed.Load(), 2) } func TestCheckInvalidConfig(t *testing.T) { From 665cf1f4d52fa033ae629b3a5020efd81b21e4f9 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 15:24:35 -0600 Subject: [PATCH 08/14] Move uniqueness test to start --- heartbeat/monitors/monitor.go | 17 ++++++++--------- heartbeat/monitors/monitor_test.go | 3 +++ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 3903deb24290..e6304811d8b6 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -69,10 +69,8 @@ func (m *Monitor) String() string { } func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error { - m, err := newMonitor(config, registrar, nil, nil) - if m != nil { - m.Stop() // Stop the monitor to free up the ID from uniqueness checks - } + _, err := newMonitor(config, registrar, nil, nil) + return err } @@ -80,7 +78,8 @@ func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) err // given heartbeat instance. var globalDedup = newDedup() -// newMonitor Creates a new monitor, without leaking resources in the event of an error. +// newMonitor creates a new monitor, without leaking resources in the event of an error. +// you do not need to call Stop(), it will be safely garbage collected unless Start is called. func newMonitor( config *common.Config, registrar *plugin.PluginsReg, @@ -139,10 +138,6 @@ func newMonitorUnsafe( m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash) } - // De-duplicate monitors with identical IDs - // last write wins - globalDedup.register(m) - p, err := pluginFactory.Create(config) m.close = p.Close wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) @@ -204,6 +199,10 @@ func (m *Monitor) Start() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() + // De-duplicate monitors with identical IDs + // last write wins + globalDedup.register(m) + for _, t := range m.configuredJobs { t.Start() } diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 1775a1fc0aa1..780cbf901be3 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -99,8 +99,10 @@ func TestDuplicateMonitorIDs(t *testing.T) { // Would fail if the previous newMonitor didn't free the monitor.id m1, m1Err := makeTestMon() + m1.Start() require.NoError(t, m1Err) m2, m2Err := makeTestMon() + m2.Start() // Change the name so we can ensure that this is the currently active monitor m2.stdFields.Name = "MON2!!!" // This used to trigger an error, but shouldn't any longer, we just log @@ -131,6 +133,7 @@ func TestCheckInvalidConfig(t *testing.T) { defer sched.Stop() m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched) + require.Error(t, err) // This could change if we decide the contract for newMonitor should always return a monitor require.Nil(t, m, "For this test to work we need a nil value for the monitor.") From 9685ba5425cd6299448b58b23980213d54fafb85 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 16:32:40 -0600 Subject: [PATCH 09/14] It all works --- heartbeat/beater/heartbeat.go | 4 +- heartbeat/monitors/dedup.go | 65 ------------------------------ heartbeat/monitors/factory.go | 54 ++++++++++++++++++++++--- heartbeat/monitors/factory_test.go | 63 ++++++++++++++++++++++++++--- heartbeat/monitors/monitor.go | 33 ++++++--------- heartbeat/monitors/monitor_test.go | 48 +--------------------- 6 files changed, 123 insertions(+), 144 deletions(-) delete mode 100644 heartbeat/monitors/dedup.go diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index f9623753d3a2..3511df898d4c 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -78,7 +78,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { config: parsedConfig, scheduler: scheduler, // dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload - dynamicFactory: monitors.NewFactory(b.Info, scheduler), + dynamicFactory: monitors.NewFactory(b.Info, scheduler, plugin.GlobalPluginsReg), } return bt, nil } @@ -198,7 +198,7 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) { - factory := monitors.NewFactory(b.Info, bt.scheduler) + factory := monitors.NewFactory(b.Info, bt.scheduler, plugin.GlobalPluginsReg) var runners []cfgfile.Runner for _, cfg := range bt.config.Monitors { diff --git a/heartbeat/monitors/dedup.go b/heartbeat/monitors/dedup.go deleted file mode 100644 index 931e40ffc531..000000000000 --- a/heartbeat/monitors/dedup.go +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package monitors - -import ( - "sync" - - "github.com/elastic/beats/v7/libbeat/logp" -) - -func newDedup() dedup { - return dedup{ - byId: map[string]*Monitor{}, - mtx: &sync.Mutex{}, - } -} - -type dedup struct { - byId map[string]*Monitor - mtx *sync.Mutex -} - -func (um dedup) register(m *Monitor) { - um.mtx.Lock() - defer um.mtx.Unlock() - - stopped := um.stopUnsafe(m) - if stopped { - logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID) - } - - um.byId[m.stdFields.ID] = m -} - -func (um dedup) unregister(m *Monitor) { - um.mtx.Lock() - defer um.mtx.Unlock() - - um.stopUnsafe(m) - - delete(um.byId, m.stdFields.ID) -} - -func (um dedup) stopUnsafe(m *Monitor) bool { - if existing, ok := um.byId[m.stdFields.ID]; ok { - existing.stopUnsafe() - return ok - } - return false -} diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index d2b013d3c015..07739cd2deb6 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -19,6 +19,7 @@ package monitors import ( "fmt" + "sync" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" @@ -37,8 +39,11 @@ import ( // RunnerFactory that can be used to create cfg.Runner cast versions of Monitor // suitable for config reloading. type RunnerFactory struct { - info beat.Info - sched *scheduler.Scheduler + info beat.Info + sched *scheduler.Scheduler + byId map[string]*Monitor + mtx *sync.Mutex + pluginsReg *plugin.PluginsReg } type publishSettings struct { @@ -61,8 +66,14 @@ type publishSettings struct { } // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. -func NewFactory(info beat.Info, sched *scheduler.Scheduler) *RunnerFactory { - return &RunnerFactory{info, sched} +func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.PluginsReg) *RunnerFactory { + return &RunnerFactory{ + info: info, + sched: sched, + byId: map[string]*Monitor{}, + mtx: &sync.Mutex{}, + pluginsReg: pluginsReg, + } } // Create makes a new Runner for a new monitor with the given Config. @@ -78,8 +89,39 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne } p = pipetool.WithClientConfigEdit(p, configEditor) - monitor, err := newMonitor(c, plugin.GlobalPluginsReg, p, f.sched) - return monitor, err + + f.mtx.Lock() + defer f.mtx.Unlock() + + // Handle the problem of this function being occasionally invoked from within + // f.mtx being locked given that golang does not support reentrant locks. + // The important thing is clearing the map, not ensuring it stops exactly on time + // so we can defer its removal from the map with a goroutine. + safeStop := func(m *Monitor) { + go func() { + // We can safely relock now, since we're in a new goroutine. + f.mtx.Lock() + defer f.mtx.Unlock() + + // If this element hasn't already been removed or replaced with a new + // instance delete it from the map. + if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m { + delete(f.byId, m.stdFields.ID) + } + }() + } + monitor, err := newMonitor(c, f.pluginsReg, p, f.sched, safeStop) + if err != nil { + return nil, err + } + if mon, ok := f.byId[monitor.stdFields.ID]; ok { + logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID) + mon.Stop() + } + + f.byId[monitor.stdFields.ID] = monitor + + return monitor, nil } // CheckConfig checks to see if the given monitor config is valid. diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 27b119392eec..54f02bce3db1 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -23,19 +23,23 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" ) +var binfo = beat.Info{ + Beat: "heartbeat", + IndexPrefix: "heartbeat", + Version: "8.0.0", +} + func TestPreProcessors(t *testing.T) { - binfo := beat.Info{ - Beat: "heartbeat", - IndexPrefix: "heartbeat", - Version: "8.0.0", - } + tests := map[string]struct { settings publishSettings expectedIndex string @@ -143,3 +147,52 @@ func TestPreProcessors(t *testing.T) { }) } } + +func TestDuplicateMonitorIDs(t *testing.T) { + serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") + badConf := mockBadPluginConf(t, "custom", "@every 1ms") + reg, built, closed := mockPluginsReg() + pipelineConnector := &MockPipelineConnector{} + + sched := scheduler.New(1, monitoring.NewRegistry()) + err := sched.Start() + require.NoError(t, err) + defer sched.Stop() + + f := NewFactory(binfo, sched, reg) + makeTestMon := func() (*Monitor, error) { + mIface, err := f.Create(pipelineConnector, serverMonConf) + if mIface == nil { + return nil, err + } else { + return mIface.(*Monitor), err + } + } + + // Ensure that an error is returned on a bad config + _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, nil) + require.Error(t, m0Err) + + // Would fail if the previous newMonitor didn't free the monitor.id + m1, m1Err := makeTestMon() + require.NoError(t, m1Err) + m1.Start() + m2, m2Err := makeTestMon() + require.NoError(t, m2Err) + m2.Start() + // Change the name so we can ensure that this is the currently active monitor + m2.stdFields.Name = "MON2!!!" + // This used to trigger an error, but shouldn't any longer, we just log + // the error, and ensure the last monitor wins + require.NoError(t, m2Err) + + m, ok := f.byId[m2.stdFields.ID] + require.True(t, ok) + require.Equal(t, m2.stdFields.Name, m.stdFields.Name) + m1.Stop() + m2.Stop() + + // 3 are counted as built, even the bad config + require.Equal(t, 3, built.Load()) + require.Equal(t, closed.Load(), 3) +} diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index e6304811d8b6..923c769a548b 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -69,15 +69,11 @@ func (m *Monitor) String() string { } func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error { - _, err := newMonitor(config, registrar, nil, nil) + _, err := newMonitor(config, registrar, nil, nil, nil) return err } -// globalDedup is used to keep track of explicitly configured monitor IDs and ensure no duplication within a -// given heartbeat instance. -var globalDedup = newDedup() - // newMonitor creates a new monitor, without leaking resources in the event of an error. // you do not need to call Stop(), it will be safely garbage collected unless Start is called. func newMonitor( @@ -85,8 +81,9 @@ func newMonitor( registrar *plugin.PluginsReg, pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, + onStop func(*Monitor), ) (*Monitor, error) { - m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler) + m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, onStop) if m != nil && err != nil { m.Stop() } @@ -100,6 +97,7 @@ func newMonitorUnsafe( registrar *plugin.PluginsReg, pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, + onStop func(*Monitor), ) (*Monitor, error) { // Extract just the Id, Type, and Enabled fields from the config // We'll parse things more precisely later once we know what exact type of @@ -139,7 +137,14 @@ func newMonitorUnsafe( } p, err := pluginFactory.Create(config) - m.close = p.Close + + m.close = func() error { + if onStop != nil { + onStop(m) + } + return p.Close() + } + wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) m.endpoints = p.Endpoints @@ -198,11 +203,6 @@ func (m *Monitor) makeTasks(config *common.Config, jobs []jobs.Job) ([]*configur func (m *Monitor) Start() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() - - // De-duplicate monitors with identical IDs - // last write wins - globalDedup.register(m) - for _, t := range m.configuredJobs { t.Start() } @@ -210,16 +210,9 @@ func (m *Monitor) Start() { m.stats.StartMonitor(int64(m.endpoints)) } -// Stop stops the Monitor's execution in its configured scheduler. -// This is safe to call even if the Monitor was never started. -func (m *Monitor) Stop() { - // later calls stopUnsafe - globalDedup.unregister(m) -} - // stopUnsafe stops the monitor without freeing it in global dedup // needed by dedup itself to avoid a reentrant lock. -func (m *Monitor) stopUnsafe() { +func (m *Monitor) Stop() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 780cbf901be3..9a0962ef8b27 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -39,7 +39,7 @@ func TestMonitor(t *testing.T) { require.NoError(t, err) defer sched.Stop() - mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched) + mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, nil) require.NoError(t, err) mon.Start() @@ -78,50 +78,6 @@ func TestMonitor(t *testing.T) { assert.Equal(t, true, pcClient.closed) } -func TestDuplicateMonitorIDs(t *testing.T) { - serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") - badConf := mockBadPluginConf(t, "custom", "@every 1ms") - reg, built, closed := mockPluginsReg() - pipelineConnector := &MockPipelineConnector{} - - sched := scheduler.New(1, monitoring.NewRegistry()) - err := sched.Start() - require.NoError(t, err) - defer sched.Stop() - - makeTestMon := func() (*Monitor, error) { - return newMonitor(serverMonConf, reg, pipelineConnector, sched) - } - - // Ensure that an error is returned on a bad config - _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched) - require.Error(t, m0Err) - - // Would fail if the previous newMonitor didn't free the monitor.id - m1, m1Err := makeTestMon() - m1.Start() - require.NoError(t, m1Err) - m2, m2Err := makeTestMon() - m2.Start() - // Change the name so we can ensure that this is the currently active monitor - m2.stdFields.Name = "MON2!!!" - // This used to trigger an error, but shouldn't any longer, we just log - // the error, and ensure the last monitor wins - require.NoError(t, m2Err) - - m, ok := globalDedup.byId[m2.stdFields.ID] - require.True(t, ok) - require.Equal(t, m2.stdFields.Name, m.stdFields.Name) - m1.Stop() - m2.Stop() - - // 3 are counted as built, even the bad config - require.Equal(t, 3, built.Load()) - // Make sure each is closed at least once - // the bad config doesn't need to be closed - require.Equal(t, closed.Load(), 2) -} - func TestCheckInvalidConfig(t *testing.T) { serverMonConf := mockInvalidPluginConf(t) reg, built, closed := mockPluginsReg() @@ -132,7 +88,7 @@ func TestCheckInvalidConfig(t *testing.T) { require.NoError(t, err) defer sched.Stop() - m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched) + m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, nil) require.Error(t, err) // This could change if we decide the contract for newMonitor should always return a monitor require.Nil(t, m, "For this test to work we need a nil value for the monitor.") From cd10b2708675174d19ef2ca11bb113565103a3d6 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 16:34:01 -0600 Subject: [PATCH 10/14] Cleanup factory --- heartbeat/beater/heartbeat.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 3511df898d4c..c019e37ea72a 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -198,11 +198,9 @@ func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) { - factory := monitors.NewFactory(b.Info, bt.scheduler, plugin.GlobalPluginsReg) - var runners []cfgfile.Runner for _, cfg := range bt.config.Monitors { - created, err := factory.Create(b.Publisher, cfg) + created, err := bt.dynamicFactory.Create(b.Publisher, cfg) if err != nil { if errors.Is(err, monitors.ErrMonitorDisabled) { logp.Info("skipping disabled monitor: %s", err) From 637d439ee212af31aa59245b611e639e4676d62b Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 19:20:20 -0600 Subject: [PATCH 11/14] Cleanup logging --- heartbeat/monitors/factory.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 07739cd2deb6..4bd057bb5935 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -44,6 +44,7 @@ type RunnerFactory struct { byId map[string]*Monitor mtx *sync.Mutex pluginsReg *plugin.PluginsReg + logger *logp.Logger } type publishSettings struct { @@ -73,6 +74,7 @@ func NewFactory(info beat.Info, sched *scheduler.Scheduler, pluginsReg *plugin.P byId: map[string]*Monitor{}, mtx: &sync.Mutex{}, pluginsReg: pluginsReg, + logger: logp.NewLogger("monitor-factory"), } } @@ -115,7 +117,7 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne return nil, err } if mon, ok := f.byId[monitor.stdFields.ID]; ok { - logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID) + f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID) mon.Stop() } From 0f81f53c565b14fc5395a72755988ca228b20f6d Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 19:50:05 -0600 Subject: [PATCH 12/14] Refactors --- heartbeat/monitors/factory.go | 14 ++++++++++---- heartbeat/monitors/factory_test.go | 6 +++--- heartbeat/monitors/monitor.go | 18 ++++++++++++++++-- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 4bd057bb5935..7f660cbb087a 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -95,10 +95,14 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne f.mtx.Lock() defer f.mtx.Unlock() - // Handle the problem of this function being occasionally invoked from within - // f.mtx being locked given that golang does not support reentrant locks. + // This is a callback executed on stop of a monitor, it ensures we delete the entry in + // byId. + // It's a little tricky, because it handles the problem of this function being + // occasionally invoked twice in one stack. + // f.mtx would be locked given that golang does not support reentrant locks. // The important thing is clearing the map, not ensuring it stops exactly on time - // so we can defer its removal from the map with a goroutine. + // so we can defer its removal from the map with a goroutine, thus breaking out of the current stack + // and ensuring the cleanup happen soon enough. safeStop := func(m *Monitor) { go func() { // We can safely relock now, since we're in a new goroutine. @@ -106,7 +110,7 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne defer f.mtx.Unlock() // If this element hasn't already been removed or replaced with a new - // instance delete it from the map. + // instance delete it from the map. Check monitor identity via pointer equality. if curM, ok := f.byId[m.stdFields.ID]; ok && curM == m { delete(f.byId, m.stdFields.ID) } @@ -116,8 +120,10 @@ func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runne if err != nil { return nil, err } + if mon, ok := f.byId[monitor.stdFields.ID]; ok { f.logger.Warnf("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", monitor.stdFields.ID) + // Stop the old monitor, since we'll swap our new one in place mon.Stop() } diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 54f02bce3db1..4849529cec45 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -39,7 +39,6 @@ var binfo = beat.Info{ } func TestPreProcessors(t *testing.T) { - tests := map[string]struct { settings publishSettings expectedIndex string @@ -181,7 +180,7 @@ func TestDuplicateMonitorIDs(t *testing.T) { require.NoError(t, m2Err) m2.Start() // Change the name so we can ensure that this is the currently active monitor - m2.stdFields.Name = "MON2!!!" + m2.stdFields.Name = "mon2" // This used to trigger an error, but shouldn't any longer, we just log // the error, and ensure the last monitor wins require.NoError(t, m2Err) @@ -194,5 +193,6 @@ func TestDuplicateMonitorIDs(t *testing.T) { // 3 are counted as built, even the bad config require.Equal(t, 3, built.Load()) - require.Equal(t, closed.Load(), 3) + // Only 2 closes, because the bad config isn't closed + require.Equal(t, 2, closed.Load()) } diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 923c769a548b..293cd48bae49 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -37,14 +37,18 @@ import ( // ErrMonitorDisabled is returned when the monitor plugin is marked as disabled. var ErrMonitorDisabled = errors.New("monitor not loaded, plugin is disabled") +const ( + MON_INIT = iota + MON_STARTED + MON_STOPPED +) + // Monitor represents a configured recurring monitoring configuredJob loaded from a config file. Starting it // will cause it to run with the given scheduler until Stop() is called. type Monitor struct { stdFields stdfields.StdMonitorFields pluginName string config *common.Config - registrar *plugin.PluginsReg - uniqueName string scheduler *scheduler.Scheduler configuredJobs []*configuredJob enabled bool @@ -60,6 +64,8 @@ type Monitor struct { // stats is the countersRecorder used to record lifecycle events // for global metrics + telemetry stats plugin.RegistryRecorder + + state int } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -125,6 +131,7 @@ func newMonitorUnsafe( internalsMtx: sync.Mutex{}, config: config, stats: pluginFactory.Stats, + state: MON_INIT, } if m.stdFields.ID == "" { @@ -203,11 +210,13 @@ func (m *Monitor) makeTasks(config *common.Config, jobs []jobs.Job) ([]*configur func (m *Monitor) Start() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() + for _, t := range m.configuredJobs { t.Start() } m.stats.StartMonitor(int64(m.endpoints)) + m.state = MON_STARTED } // stopUnsafe stops the monitor without freeing it in global dedup @@ -216,6 +225,10 @@ func (m *Monitor) Stop() { m.internalsMtx.Lock() defer m.internalsMtx.Unlock() + if m.state == MON_STOPPED { + return + } + for _, t := range m.configuredJobs { t.Stop() } @@ -228,4 +241,5 @@ func (m *Monitor) Stop() { } m.stats.StopMonitor(int64(m.endpoints)) + m.state = MON_STOPPED } From 10748053b3a4c423bc047da27f1602ba7b222ea1 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 19:54:18 -0600 Subject: [PATCH 13/14] Cleanup --- heartbeat/monitors/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 293cd48bae49..94e186b30286 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -219,7 +219,7 @@ func (m *Monitor) Start() { m.state = MON_STARTED } -// stopUnsafe stops the monitor without freeing it in global dedup +// Stop stops the monitor without freeing it in global dedup // needed by dedup itself to avoid a reentrant lock. func (m *Monitor) Stop() { m.internalsMtx.Lock() From fad06833f9aed894b8f3ed1adef23b525a605426 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Nov 2021 19:56:05 -0600 Subject: [PATCH 14/14] Cleanup --- heartbeat/monitors/monitor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 94e186b30286..9cdbb8ecfd68 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -52,6 +52,7 @@ type Monitor struct { scheduler *scheduler.Scheduler configuredJobs []*configuredJob enabled bool + state int // endpoints is a count of endpoints this monitor measures. endpoints int // internalsMtx is used to synchronize access to critical @@ -64,8 +65,6 @@ type Monitor struct { // stats is the countersRecorder used to record lifecycle events // for global metrics + telemetry stats plugin.RegistryRecorder - - state int } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe