Skip to content

Commit

Permalink
[heartbeat][libbeat] Fix potential race in runner lists
Browse files Browse the repository at this point in the history
Fixes elastic#28518 most likely, it's one I haven't personally been able to
repro. This makes the runner list synchronous, thus preventing rapid
adds/removes from creating simultaneously live plugins of the same type.

There's definitely ways to do this more concurrently, but that sort of
optimization/complexity isn't warranted here. Heartbeat can tear down
monitors near instantaneously.
  • Loading branch information
andrewvc committed Nov 3, 2021
1 parent dc02c08 commit a435abf
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
19 changes: 17 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package beater

import (
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -89,6 +91,19 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
groups, _ := syscall.Getgroups()
logp.Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)

for _, m := range bt.config.Monitors {
target := make(map[string]interface{})
m.Unpack(target)
bytes, err := json.Marshal(target)
if err != nil {
panic(fmt.Sprintf("got err: %s", err))
}
fmt.Printf("Found monitor")
os.Stdout.Write(bytes)
fmt.Printf("\n")
syscall.Exit(0)
}

if bt.config.RunOnce {
err := bt.runRunOnce(b)
if err != nil {
Expand Down Expand Up @@ -226,9 +241,9 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {

// RunCentralMgmtMonitors loads any central management configured configs.
func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
monitors := cfgfile.NewRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
monitors := cfgfile.NewSyncRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
reload.Register.MustRegisterList(b.Info.Beat+".monitors", monitors)
inputs := cfgfile.NewRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
inputs := cfgfile.NewSyncRunnerList(management.DebugK, bt.dynamicFactory, b.Publisher)
reload.Register.MustRegisterList("inputs", inputs)
}

Expand Down
16 changes: 15 additions & 1 deletion libbeat/cfgfile/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type RunnerList struct {
factory RunnerFactory
pipeline beat.PipelineConnector
logger *logp.Logger
syncStop bool
}

// NewRunnerList builds and returns a RunnerList
Expand All @@ -50,6 +51,15 @@ func NewRunnerList(name string, factory RunnerFactory, pipeline beat.PipelineCon
}
}

// NewSyncRunnerList performs `stop` operations synchronously. This solves some race issues if you need
// stops / starts to be more strictly sequenced, but can only be trusted in scenarios where stops
// do not block excessively.
func NewSyncRunnerList(name string, factory RunnerFactory, pipeline beat.PipelineConnector) *RunnerList {
rl := NewRunnerList(name, factory, pipeline)
rl.syncStop = true
return rl
}

// Reload the list of runners to match the given state
func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
r.mutex.Lock()
Expand Down Expand Up @@ -84,7 +94,11 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
for hash, runner := range stopList {
r.logger.Debugf("Stopping runner: %s", runner)
delete(r.runners, hash)
go runner.Stop()
if r.syncStop {
runner.Stop()
} else {
go runner.Stop()
}
moduleStops.Add(1)
}

Expand Down

0 comments on commit a435abf

Please sign in to comment.