Skip to content

Commit

Permalink
use a unique per beat instance registry for hostname
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Dec 5, 2024
1 parent 9c45417 commit b2ecb02
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
14 changes: 11 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
waitEvents := newSignalWait()

// count active events for waiting on shutdown
var reg *monitoring.Registry

if b.MonitorNamespace != nil {
reg = b.MonitorNamespace.GetRegistry().GetRegistry("stats")
if reg == nil {
reg = b.MonitorNamespace.GetRegistry().NewRegistry("stats")
}
}
wgEvents := &eventCounter{
count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(nil, "filebeat.events.added"),
done: monitoring.NewUint(nil, "filebeat.events.done"),
count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(reg, "filebeat.events.added"),
done: monitoring.NewUint(reg, "filebeat.events.done"),
}
finishedLogger := newFinishedLogger(wgEvents)

Expand Down
6 changes: 4 additions & 2 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/keystore"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/useragent"
)

Expand Down Expand Up @@ -84,8 +85,9 @@ type Beat struct {

Instrumentation instrumentation.Instrumentation // instrumentation holds an APM agent for capturing and reporting traces

API *api.Server // API server. This is nil unless the http endpoint is enabled.
Registry *reload.Registry // input, & output registry for configuration manager, should be instantiated in NewBeat
API *api.Server // API server. This is nil unless the http endpoint is enabled.
Registry *reload.Registry // input, & output registry for configuration manager, should be instantiated in NewBeat
MonitorNamespace *monitoring.Namespace // a monitor namespace that is unique per beat instance
}

// GenerateUserAgent populates the UserAgent field on the beat.Info struct
Expand Down
41 changes: 30 additions & 11 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
config.OverwriteConfigOpts(configOpts(store))
}

b.Beat.MonitorNamespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())
fmt.Fprintf(os.Stderr, "monitoring namespace is %s\n", b.Info.Beat+"-"+b.Info.ID.String())

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
Expand Down Expand Up @@ -469,11 +472,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
return nil, fmt.Errorf("error creating processors: %w", err)
}

reg := monitoring.Default.GetRegistry(b.Info.Name)
if reg == nil {
reg = monitoring.Default.NewRegistry(b.Info.Name)
}

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
Expand All @@ -485,12 +483,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
}
}

tel := reg.GetRegistry("state")
uniq_reg := b.Beat.MonitorNamespace.GetRegistry()

tel := uniq_reg.GetRegistry("state")
if tel == nil {
tel = reg.NewRegistry("state")
tel = uniq_reg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: reg,
Metrics: uniq_reg,
Telemetry: tel,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
Expand All @@ -510,7 +510,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
b.Publisher = publisher

return b, nil

}

// InitWithSettings does initialization of things common to all actions (read confs, flags)
Expand Down Expand Up @@ -831,11 +830,31 @@ func (b *Beat) RegisterHostname(useFQDN bool) {
hostname := b.Info.FQDNAwareHostname(useFQDN)

// info.hostname
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
var infoRegistry *monitoring.Registry
if b.MonitorNamespace != nil {
fmt.Fprintf(os.Stderr, "Not nil\n")
infoRegistry = b.MonitorNamespace.GetRegistry().GetRegistry("info")
if infoRegistry == nil {
infoRegistry = b.MonitorNamespace.GetRegistry().NewRegistry("info")
}
} else {
fmt.Fprintf(os.Stderr, "WTF it was nil\n")
infoRegistry = monitoring.GetNamespace("info").GetRegistry()
}
monitoring.NewString(infoRegistry, "hostname").Set(hostname)

// state.host
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
var stateRegistry *monitoring.Registry
if b.MonitorNamespace != nil {
fmt.Fprintf(os.Stderr, "Nil nil again\n")
stateRegistry = b.MonitorNamespace.GetRegistry().GetRegistry("state")
if stateRegistry == nil {
stateRegistry = b.MonitorNamespace.GetRegistry().NewRegistry("state")
}
} else {
fmt.Fprintf(os.Stderr, "WTF it wasn't nil again\n")
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report)
}

Expand Down

0 comments on commit b2ecb02

Please sign in to comment.