Skip to content

Commit

Permalink
Improve monitoring reporter (#8090)
Browse files Browse the repository at this point in the history
Add backoff and failover support to the Elasticsearch monitoring
reporter.
The monitoring reporter runs in 2 phases. First phase it checks for
monitoring being enabled in Elasticsearch. The check runs every 30s.
If multiple hosts are configured, one host is selected by random.
Once phase 1 succeeds, phase 2 (collection phase) is started.

Before this change, phase 2 was configured to use load-balancing without
timeout if multiple hosts are configured. With events being dropped on
error and only one document being generated every 10s, this was ok in
most cases. Still, if one output is blocked, waiting for a long timeout
failover to another host can happen, even if no error occured yet.
If the failover host has errors, it might end up in a tight
reconnect-loop without any backoff behavior.
With recent changes to 6.4 beats creates a many more documents, which
was not taken into account in original design. Due to this misbehaving
monitoring outputs are much more likely:
=> Problems with reporter
1. Failover was not handled correctly
2. Creating more then one event and potentially spurious errors raise the need for backoff

This changes configures the clients to failover mode only. Whenever the
connection to one host fails, another host is selected by random.
On failure the reporters output will backoff exponentially. If the second client
(after failover) also fails, then the backoff waiting times are doubled.
And so on.
  • Loading branch information
Steffen Siering authored Aug 29, 2018
1 parent bfd2843 commit 43ee7d7
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Deregister pipeline loader callback when inputsRunner is stopped. {pull}7893[7893]
- Replace index patterns in TSVB visualizations. {pull}7929[7929]
- Fixed Support `add_docker_metadata` in Windows by identifying systems' path separator. {issue}7797[7797]
- Add backoff support to x-pack monitoring outputs. {issue}7966[7966]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
24 changes: 23 additions & 1 deletion libbeat/docs/monitoring/shared-monitor-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ configuration option contains the following fields:
The maximum number of metrics to bulk in a single {es} bulk API index request.
The default is `50`. For more information, see <<elasticsearch-output>>.

[float]
==== `backoff.init`

The number of seconds to wait before trying to reconnect to Elasticsearch after
a network error. After waiting `backoff.init` seconds, {beatname_uc} tries to
reconnect. If the attempt fails, the backoff timer is increased exponentially up
to `backoff.max`. After a successful connection, the backoff timer is reset. The
default is 1s.

[float]
===== `backoff.max`

The maximum number of seconds to wait before attempting to connect to
Elasticsearch after a network error. The default is 60s.

[float]
==== `compression_level`

Expand Down Expand Up @@ -79,10 +94,17 @@ The password that {beatname_uc} uses to authenticate with the {es} instances for
shipping monitoring data.

[float]
==== `period`
==== `metrics.period`

The time interval (in seconds) when metrics are sent to the {es} cluster. A new
snapshot of {beatname_uc} metrics is generated and scheduled for publishing each
period. The default value is 10 * time.Second.

[float]
==== `state.period`

The time interval (in seconds) when state information are sent to the {es} cluster. A new
snapshot of {beatname_uc} state is generated and scheduled for publishing each
period. The default value is 60 * time.Second.

[float]
Expand Down
10 changes: 10 additions & 0 deletions libbeat/monitoring/report/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type config struct {
BulkMaxSize int `config:"bulk_max_size" validate:"min=0"`
BufferSize int `config:"buffer_size"`
Tags []string `config:"tags"`
Backoff backoff `config:"backoff"`
}

type backoff struct {
Init time.Duration
Max time.Duration
}

var defaultConfig = config{
Expand All @@ -61,4 +67,8 @@ var defaultConfig = config{
BulkMaxSize: 50,
BufferSize: 50,
Tags: nil,
Backoff: backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
}
35 changes: 22 additions & 13 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type reporter struct {
// pipeline
pipeline *pipeline.Pipeline
client beat.Client
out outputs.Group

out []outputs.NetworkClient
}

const selector = "monitoring"
Expand Down Expand Up @@ -109,22 +110,21 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
params[k] = v
}

out := outputs.Group{
Clients: nil,
BatchSize: windowSize,
Retry: 0, // no retry. on error drop events
}

hosts, err := outputs.ReadHostList(cfg)
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, errors.New("empty hosts list")
}

var clients []outputs.NetworkClient
for _, host := range hosts {
client, err := makeClient(host, params, proxyURL, tlsConfig, &config)
if err != nil {
return nil, err
}
out.Clients = append(out.Clients, client)
clients = append(clients, client)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
Expand All @@ -137,18 +137,27 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")

outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

pipeline, err := pipeline.New(
beat,
monitoring,
queueFactory, out, pipeline.Settings{
queueFactory,
outputs.Group{
Clients: []outputs.Client{outClient},
BatchSize: windowSize,
Retry: 0, // no retry. Drop event on error.
},
pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})
if err != nil {
return nil, err
}

client, err := pipeline.Connect()
pipeConn, err := pipeline.Connect()
if err != nil {
pipeline.Close()
return nil, err
Expand All @@ -161,8 +170,8 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
tags: config.Tags,
checkRetry: checkRetry,
pipeline: pipeline,
client: client,
out: out,
client: pipeConn,
out: clients,
}
go r.initLoop(config)
return r, nil
Expand All @@ -184,7 +193,7 @@ func (r *reporter) initLoop(c config) {

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient)
client := r.out[rand.Intn(len(r.out))]
err := client.Connect()
if err == nil {
closing(log, client)
Expand Down
11 changes: 11 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down

0 comments on commit 43ee7d7

Please sign in to comment.