Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Log error on dupe monitor ID instead of strict req #29041

Merged
merged 14 commits into from
Nov 22, 2021
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change `threatintel` module to use new `threat.*` ECS fields. {pull}29014[29014]

*Heartbeat*
- Change behavior in case of duplicate monitor IDs in configs to be last monitor wins. {pull}29041[29041]

*Journalbeat*

Expand Down
65 changes: 65 additions & 0 deletions heartbeat/monitors/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitors

import (
"sync"

"github.com/elastic/beats/v7/libbeat/logp"
)

func newDedup() dedup {
return dedup{
byId: map[string]*Monitor{},
mtx: &sync.Mutex{},
}
}

type dedup struct {
byId map[string]*Monitor
mtx *sync.Mutex
}

func (um dedup) register(m *Monitor) {
um.mtx.Lock()
defer um.mtx.Unlock()

stopped := um.stopUnsafe(m)
if stopped {
logp.Warn("monitor ID %s is configured for multiple monitors! IDs should be unique values, last seen config will win", m.stdFields.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log like stopping previous monitor with same monitor id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if that adds anything, because it's hard to tell them apart. Should I change the log message maybe?

andrewvc marked this conversation as resolved.
Show resolved Hide resolved
}

um.byId[m.stdFields.ID] = m
}

func (um dedup) unregister(m *Monitor) {
um.mtx.Lock()
defer um.mtx.Unlock()

um.stopUnsafe(m)

delete(um.byId, m.stdFields.ID)
}

func (um dedup) stopUnsafe(m *Monitor) bool {
if existing, ok := um.byId[m.stdFields.ID]; ok {
existing.stopUnsafe()
return ok
}
return false
}
35 changes: 14 additions & 21 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,9 @@ func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) err
return err
}

// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a
// globalDedup is used to keep track of explicitly configured monitor IDs and ensure no duplication within a
// given heartbeat instance.
var uniqueMonitorIDs sync.Map

// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor.
type ErrDuplicateMonitorID struct{ ID string }

func (e ErrDuplicateMonitorID) Error() string {
return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID)
}
var globalDedup = newDedup()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be part of the RunnerFactory to avoid needing a global object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, done!


// newMonitor Creates a new monitor, without leaking resources in the event of an error.
func newMonitor(
Expand Down Expand Up @@ -137,12 +130,7 @@ func newMonitorUnsafe(
stats: pluginFactory.Stats,
}

if m.stdFields.ID != "" {
// Ensure we don't have duplicate IDs
if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded {
return m, ErrDuplicateMonitorID{m.stdFields.ID}
}
} else {
if m.stdFields.ID == "" {
// If there's no explicit ID generate one
hash, err := m.configHash()
if err != nil {
Expand All @@ -151,6 +139,10 @@ func newMonitorUnsafe(
m.stdFields.ID = fmt.Sprintf("auto-%s-%#X", m.stdFields.Type, hash)
}

// De-duplicate monitors with identical IDs
// last write wins
globalDedup.register(m)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newMonitor is used by CheckConfig. Checking configs shouldn't stop monitors existing monitors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this, it is now the case that newMonitor is side effect free


p, err := pluginFactory.Create(config)
m.close = p.Close
wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
Expand Down Expand Up @@ -222,9 +214,15 @@ func (m *Monitor) Start() {
// Stop stops the Monitor's execution in its configured scheduler.
// This is safe to call even if the Monitor was never started.
func (m *Monitor) Stop() {
// later calls stopUnsafe
globalDedup.unregister(m)
}

// stopUnsafe stops the monitor without freeing it in global dedup
// needed by dedup itself to avoid a reentrant lock.
func (m *Monitor) stopUnsafe() {
m.internalsMtx.Lock()
defer m.internalsMtx.Unlock()
defer m.freeID()

for _, t := range m.configuredJobs {
t.Stop()
Expand All @@ -239,8 +237,3 @@ func (m *Monitor) Stop() {

m.stats.StopMonitor(int64(m.endpoints))
}

func (m *Monitor) freeID() {
// Free up the monitor ID for reuse
uniqueMonitorIDs.Delete(m.stdFields.ID)
}
25 changes: 15 additions & 10 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,24 @@ func TestDuplicateMonitorIDs(t *testing.T) {
// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
require.NoError(t, m1Err)
_, m2Err := makeTestMon()
require.Error(t, m2Err)
m2, m2Err := makeTestMon()
// Change the name so we can ensure that this is the currently active monitor
m2.stdFields.Name = "MON2!!!"
// This used to trigger an error, but shouldn't any longer, we just log
// the error, and ensure the last monitor wins
require.NoError(t, m2Err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also check if previous monitor is stopped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do check for this at the end of the test by checking the number of stops invoked


m, ok := globalDedup.byId[m2.stdFields.ID]
require.True(t, ok)
require.Equal(t, m2.stdFields.Name, m.stdFields.Name)
m1.Stop()
m3, m3Err := makeTestMon()
require.NoError(t, m3Err)
m3.Stop()
m2.Stop()

// We count 3 because built doesn't count successful builds,
// just attempted creations of monitors
// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Only one stops because the others errored on create
require.Equal(t, 2, closed.Load())
require.NoError(t, m3Err)
// Make sure each is closed at least once
// the bad config doesn't need to be closed
require.Equal(t, closed.Load(), 2)
}

func TestCheckInvalidConfig(t *testing.T) {
Expand Down