Skip to content

Commit

Permalink
Add Beats state reporting (#7075)
Browse files Browse the repository at this point in the history
* Add Beats state reporting

Currently in monitoring or the API endpoint it's not visible which modules are running. This PR adds and endpoint `/state` which shows the following data:

```
{
  "module": {
    "count": 3,
    "names": [
      "system"
    ]
  }
}
```

The names are a unique list of module names that are running and the count is a total of all the modules that are running. In the above case it's 3 as the system module is started by default 3 times with different configs / periods. Currently this reporting only works for Metricbeat.

The same state metrics are reported to X-Pack Monitoring. The data is reported every minute by default.

Additional changes:

* Introduce ArraySlice support in metrics
* Improve namespace handling to prevent races
* Add monitoring reporter for state endpoint
* Add config options for reporting frequency. The old config option was removed. I don't consider this a breaking change as the old option was never document and not intended to be changed.

* rename to metrics.period and implement one bulk per event.

* rename to metrics

* local params

* fix invalid units
  • Loading branch information
ruflin authored and Steffen Siering committed Jul 4, 2018
1 parent 9960220 commit 0db2369
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add support to disable html escaping in outputs. {pull}7445[7445]
- Refactor error handing in schema.Apply(). {pull}7335[7335]
- Add additional types to kubernetes metadata {pull}7457[7457]
- Add module state reporting for X-Pack Monitoring. {pull}7075[7075]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
3 changes: 3 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
3 changes: 3 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,9 @@ logging.files:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

#metrics.period: 10s
#state.period: 1m

#================================ HTTP Endpoint ======================================
# Each beat can expose internal metrics through a HTTP endpoint. For security
# reasons the endpoint is disabled by default. This feature is currently experimental.
Expand Down
12 changes: 11 additions & 1 deletion libbeat/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Start(cfg *common.Config) {

// register handlers
mux.HandleFunc("/", rootHandler())
mux.HandleFunc("/state", stateHandler)
mux.HandleFunc("/stats", statsHandler)
mux.HandleFunc("/dataset", datasetHandler)

Expand All @@ -61,12 +62,21 @@ func rootHandler() func(http.ResponseWriter, *http.Request) {

w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("state").GetRegistry(), monitoring.Full, false)
data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("info").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}
}

// stateHandler reports state metrics
func stateHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(monitoring.GetNamespace("state").GetRegistry(), monitoring.Full, false)

print(w, data, r.URL)
}

// statsHandler report expvar and all libbeat/monitoring metrics
func statsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
Expand Down
3 changes: 1 addition & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ func Run(name, idxPrefix, version string, bt beat.Creator) error {
return err
}

registry := monitoring.NewRegistry()
monitoring.GetNamespace("state").SetRegistry(registry)
registry := monitoring.GetNamespace("info").GetRegistry()
monitoring.NewString(registry, "version").Set(b.Info.Version)
monitoring.NewString(registry, "beat").Set(b.Info.Beat)
monitoring.NewString(registry, "name").Set(b.Info.Name)
Expand Down
53 changes: 35 additions & 18 deletions libbeat/monitoring/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,27 @@

package monitoring

import (
"sync"
)
import "sync"

var namespaces = struct {
sync.Mutex
m map[string]*Namespace
}{
m: make(map[string]*Namespace),
}
var namespaces = NewNamespaces()

// Namespace contains the name of the namespace and it's registry
type Namespace struct {
name string
registry *Registry
}

func newNamespace(name string) *Namespace {
n := &Namespace{
name: name,
}
return n
}

// GetNamespace gets the namespace with the given name.
// If the namespace does not exist yet, a new one is created.
func GetNamespace(name string) *Namespace {
namespaces.Lock()
defer namespaces.Unlock()

n, ok := namespaces.m[name]
if !ok {
n = &Namespace{name: name}
namespaces.m[name] = n
}
return n
return namespaces.Get(name)
}

// SetRegistry sets the registry of the namespace
Expand All @@ -60,3 +52,28 @@ func (n *Namespace) GetRegistry() *Registry {
}
return n.registry
}

// Namespaces is a list of Namespace structs
type Namespaces struct {
sync.Mutex
namespaces map[string]*Namespace
}

// NewNamespaces creates a new namespaces list
func NewNamespaces() *Namespaces {
return &Namespaces{
namespaces: map[string]*Namespace{},
}
}

// Get returns the namespace for the given key. If the key does not exist, new namespace is created.
func (n *Namespaces) Get(key string) *Namespace {
n.Lock()
defer n.Unlock()
if namespace, ok := n.namespaces[key]; ok {
return namespace
}

n.namespaces[key] = newNamespace(key)
return n.namespaces[key]
}
58 changes: 46 additions & 12 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring/report"
esout "github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/publisher"
Expand All @@ -38,7 +39,6 @@ var (
actMonitoringBeats = common.MapStr{
"index": common.MapStr{
"_index": "",
"_type": "beats_stats",
"_routing": nil,
},
}
Expand Down Expand Up @@ -96,23 +96,57 @@ func (c *publishClient) Close() error {

func (c *publishClient) Publish(batch publisher.Batch) error {
events := batch.Events()
bulk := make([]interface{}, 0, 2*len(events))
var failed []publisher.Event
var reason error
for _, event := range events {
bulk = append(bulk,
actMonitoringBeats, report.Event{

// Extract time
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
continue
}

var params = map[string]string{}
// Copy params
for k, v := range c.params {
params[k] = v
}
// Extract potential additional params
p, err := event.Content.Meta.GetValue("params")
if err == nil {
p2, ok := p.(map[string]string)
if ok {
for k, v := range p2 {
params[k] = v
}
}
}
actMonitoringBeats.Put("index._type", t)

bulk := [2]interface{}{
actMonitoringBeats,
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
})
}
},
}

_, err := c.es.BulkWith("_xpack", "monitoring", c.params, nil, bulk)
if err != nil {
batch.Retry()
return err
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err = c.es.BulkWith("_xpack", "monitoring", params, nil, bulk[:])
if err != nil {
failed = append(failed, event)
reason = err
}
}

batch.ACK()
return nil
if len(failed) > 0 {
batch.RetryEvents(failed)
} else {
batch.ACK()
}
return reason
}

func (c *publishClient) Test(d testing.Driver) {
Expand Down
6 changes: 4 additions & 2 deletions libbeat/monitoring/report/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type config struct {
TLS *tlscommon.Config `config:"ssl"`
MaxRetries int `config:"max_retries"`
Timeout time.Duration `config:"timeout"`
Period time.Duration `config:"period"`
MetricsPeriod time.Duration `config:"metrics.period"`
StatePeriod time.Duration `config:"state.period"`
BulkMaxSize int `config:"bulk_max_size" validate:"min=0"`
BufferSize int `config:"buffer_size"`
Tags []string `config:"tags"`
Expand All @@ -55,7 +56,8 @@ var defaultConfig = config{
TLS: nil,
MaxRetries: 3,
Timeout: 60 * time.Second,
Period: 10 * time.Second,
MetricsPeriod: 10 * time.Second,
StatePeriod: 1 * time.Minute,
BulkMaxSize: 50,
BufferSize: 50,
Tags: nil,
Expand Down
33 changes: 19 additions & 14 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"strconv"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
Expand All @@ -43,7 +45,6 @@ import (
type reporter struct {
done *stopper

period time.Duration
checkRetry time.Duration

// event metadata
Expand Down Expand Up @@ -102,7 +103,6 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
for k, v := range config.Params {
params[k] = v
}
params["interval"] = config.Period.String()

out := outputs.Group{
Clients: nil,
Expand All @@ -129,7 +129,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
}), nil
}

monitoring := monitoring.Default.NewRegistry("xpack.monitoring")
monitoring := monitoring.Default.GetRegistry("xpack.monitoring")

pipeline, err := pipeline.New(
beat,
Expand All @@ -150,15 +150,14 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {

r := &reporter{
done: newStopper(),
period: config.Period,
beatMeta: makeMeta(beat),
tags: config.Tags,
checkRetry: checkRetry,
pipeline: pipeline,
client: client,
out: out,
}
go r.initLoop()
go r.initLoop(config)
return r, nil
}

Expand All @@ -168,7 +167,7 @@ func (r *reporter) Stop() {
r.pipeline.Close()
}

func (r *reporter) initLoop() {
func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")

Expand Down Expand Up @@ -199,15 +198,16 @@ func (r *reporter) initLoop() {
logp.Info("Successfully connected to X-Pack Monitoring endpoint.")

// Start collector and send loop if monitoring endpoint has been found.
go r.snapshotLoop()
go r.snapshotLoop("state", c.StatePeriod)
go r.snapshotLoop("stats", c.MetricsPeriod)
}

func (r *reporter) snapshotLoop() {
ticker := time.NewTicker(r.period)
func (r *reporter) snapshotLoop(namespace string, period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()

logp.Info("Start monitoring metrics snapshot loop.")
defer logp.Info("Stop monitoring metrics snapshot loop.")
logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace)

for {
var ts time.Time
Expand All @@ -218,23 +218,28 @@ func (r *reporter) snapshotLoop() {
case ts = <-ticker.C:
}

snapshot := makeSnapshot(monitoring.Default)
snapshot := makeSnapshot(monitoring.GetNamespace(namespace).GetRegistry())
if snapshot == nil {
debugf("Empty snapshot.")
continue
}

fields := common.MapStr{
"beat": r.beatMeta,
"metrics": snapshot,
namespace: snapshot,
}
if len(r.tags) > 0 {
fields["tags"] = r.tags
}

r.client.Publish(beat.Event{
Timestamp: ts,
Fields: fields,
Meta: common.MapStr{
"type": "beats_" + namespace,
"interval_ms": int64(period / time.Millisecond),
// Converting to seconds as interval only accepts `s` as unit
"params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"},
},
})
}
}
Expand Down
Loading

0 comments on commit 0db2369

Please sign in to comment.