-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Heartbeat] Log error on dupe monitor ID instead of strict req #29041
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
6c59c25
[Heartbeat] Log error on dupe monitor ID instead of strict req
andrewvc d9f64c6
Tweaks |+ changelog
andrewvc ec7d4be
Fix threadsafety
andrewvc 6d4a78d
Stop not close
andrewvc f361348
Check IDs
andrewvc 8dd4878
Fix reentrant lock issue causing deadlock
andrewvc 6f2df91
Fix tests
andrewvc 665cf1f
Move uniqueness test to start
andrewvc 9685ba5
It all works
andrewvc cd10b27
Cleanup factory
andrewvc 637d439
Cleanup logging
andrewvc 0f81f53
Refactors
andrewvc 1074805
Cleanup
andrewvc fad0683
Cleanup
andrewvc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -37,17 +37,22 @@ import ( | |||||
// ErrMonitorDisabled is returned when the monitor plugin is marked as disabled. | ||||||
var ErrMonitorDisabled = errors.New("monitor not loaded, plugin is disabled") | ||||||
|
||||||
const ( | ||||||
MON_INIT = iota | ||||||
MON_STARTED | ||||||
MON_STOPPED | ||||||
) | ||||||
|
||||||
// Monitor represents a configured recurring monitoring configuredJob loaded from a config file. Starting it | ||||||
// will cause it to run with the given scheduler until Stop() is called. | ||||||
type Monitor struct { | ||||||
stdFields stdfields.StdMonitorFields | ||||||
pluginName string | ||||||
config *common.Config | ||||||
registrar *plugin.PluginsReg | ||||||
uniqueName string | ||||||
scheduler *scheduler.Scheduler | ||||||
configuredJobs []*configuredJob | ||||||
enabled bool | ||||||
state int | ||||||
// endpoints is a count of endpoints this monitor measures. | ||||||
endpoints int | ||||||
// internalsMtx is used to synchronize access to critical | ||||||
|
@@ -69,32 +74,21 @@ func (m *Monitor) String() string { | |||||
} | ||||||
|
||||||
func checkMonitorConfig(config *common.Config, registrar *plugin.PluginsReg) error { | ||||||
m, err := newMonitor(config, registrar, nil, nil) | ||||||
if m != nil { | ||||||
m.Stop() // Stop the monitor to free up the ID from uniqueness checks | ||||||
} | ||||||
return err | ||||||
} | ||||||
_, err := newMonitor(config, registrar, nil, nil, nil) | ||||||
|
||||||
// uniqueMonitorIDs is used to keep track of explicitly configured monitor IDs and ensure no duplication within a | ||||||
// given heartbeat instance. | ||||||
var uniqueMonitorIDs sync.Map | ||||||
|
||||||
// ErrDuplicateMonitorID is returned when a monitor attempts to start using an ID already in use by another monitor. | ||||||
type ErrDuplicateMonitorID struct{ ID string } | ||||||
|
||||||
func (e ErrDuplicateMonitorID) Error() string { | ||||||
return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID) | ||||||
return err | ||||||
} | ||||||
|
||||||
// newMonitor Creates a new monitor, without leaking resources in the event of an error. | ||||||
// newMonitor creates a new monitor, without leaking resources in the event of an error. | ||||||
// you do not need to call Stop(), it will be safely garbage collected unless Start is called. | ||||||
func newMonitor( | ||||||
config *common.Config, | ||||||
registrar *plugin.PluginsReg, | ||||||
pipelineConnector beat.PipelineConnector, | ||||||
scheduler *scheduler.Scheduler, | ||||||
onStop func(*Monitor), | ||||||
) (*Monitor, error) { | ||||||
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler) | ||||||
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, onStop) | ||||||
if m != nil && err != nil { | ||||||
m.Stop() | ||||||
} | ||||||
|
@@ -108,6 +102,7 @@ func newMonitorUnsafe( | |||||
registrar *plugin.PluginsReg, | ||||||
pipelineConnector beat.PipelineConnector, | ||||||
scheduler *scheduler.Scheduler, | ||||||
onStop func(*Monitor), | ||||||
) (*Monitor, error) { | ||||||
// Extract just the Id, Type, and Enabled fields from the config | ||||||
// We'll parse things more precisely later once we know what exact type of | ||||||
|
@@ -135,14 +130,10 @@ func newMonitorUnsafe( | |||||
internalsMtx: sync.Mutex{}, | ||||||
config: config, | ||||||
stats: pluginFactory.Stats, | ||||||
state: MON_INIT, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevents weird situations with dual stops, makes |
||||||
} | ||||||
|
||||||
if m.stdFields.ID != "" { | ||||||
// Ensure we don't have duplicate IDs | ||||||
if _, loaded := uniqueMonitorIDs.LoadOrStore(m.stdFields.ID, m); loaded { | ||||||
return m, ErrDuplicateMonitorID{m.stdFields.ID} | ||||||
} | ||||||
} else { | ||||||
if m.stdFields.ID == "" { | ||||||
// If there's no explicit ID generate one | ||||||
hash, err := m.configHash() | ||||||
if err != nil { | ||||||
|
@@ -152,7 +143,14 @@ func newMonitorUnsafe( | |||||
} | ||||||
|
||||||
p, err := pluginFactory.Create(config) | ||||||
m.close = p.Close | ||||||
|
||||||
m.close = func() error { | ||||||
if onStop != nil { | ||||||
onStop(m) | ||||||
} | ||||||
return p.Close() | ||||||
} | ||||||
|
||||||
wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) | ||||||
m.endpoints = p.Endpoints | ||||||
|
||||||
|
@@ -217,14 +215,18 @@ func (m *Monitor) Start() { | |||||
} | ||||||
|
||||||
m.stats.StartMonitor(int64(m.endpoints)) | ||||||
m.state = MON_STARTED | ||||||
} | ||||||
|
||||||
// Stop stops the Monitor's execution in its configured scheduler. | ||||||
// This is safe to call even if the Monitor was never started. | ||||||
// Stop stops the monitor without freeing it in global dedup | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I know the function name is a proper noun here but it feels kinda redundant. |
||||||
// needed by dedup itself to avoid a reentrant lock. | ||||||
func (m *Monitor) Stop() { | ||||||
m.internalsMtx.Lock() | ||||||
defer m.internalsMtx.Unlock() | ||||||
defer m.freeID() | ||||||
|
||||||
if m.state == MON_STOPPED { | ||||||
return | ||||||
} | ||||||
|
||||||
for _, t := range m.configuredJobs { | ||||||
t.Stop() | ||||||
|
@@ -238,9 +240,5 @@ func (m *Monitor) Stop() { | |||||
} | ||||||
|
||||||
m.stats.StopMonitor(int64(m.endpoints)) | ||||||
} | ||||||
|
||||||
func (m *Monitor) freeID() { | ||||||
// Free up the monitor ID for reuse | ||||||
uniqueMonitorIDs.Delete(m.stdFields.ID) | ||||||
m.state = MON_STOPPED | ||||||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleans everything up so that we only use a single factory instance in heartbeat