From 43ee7d78f2a09fc432865606dfcc7d2e3b14ee99 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Wed, 29 Aug 2018 18:21:19 +0200 Subject: [PATCH] Improve monitoring reporter (#8090) Add backoff and failover support to the Elasticsearch monitoring reporter. The monitoring reporter runs in 2 phases. First phase it checks for monitoring being enabled in Elasticsearch. The check runs every 30s. If multiple hosts are configured, one host is selected by random. Once phase 1 succeeds, phase 2 (collection phase) is started. Before this change, phase 2 was configured to use load-balancing without timeout if multiple hosts are configured. With events being dropped on error and only one document being generated every 10s, this was ok in most cases. Still, if one output is blocked, waiting for a long timeout failover to another host can happen, even if no error occured yet. If the failover host has errors, it might end up in a tight reconnect-loop without any backoff behavior. With recent changes to 6.4 beats creates a many more documents, which was not taken into account in original design. Due to this misbehaving monitoring outputs are much more likely: => Problems with reporter 1. Failover was not handled correctly 2. Creating more then one event and potentially spurious errors raise the need for backoff This changes configures the clients to failover mode only. Whenever the connection to one host fails, another host is selected by random. On failure the reporters output will backoff exponentially. If the second client (after failover) also fails, then the backoff waiting times are doubled. And so on. --- CHANGELOG.asciidoc | 1 + auditbeat/auditbeat.reference.yml | 11 ++++++ filebeat/filebeat.reference.yml | 11 ++++++ heartbeat/heartbeat.reference.yml | 11 ++++++ libbeat/_meta/config.reference.yml | 11 ++++++ .../monitoring/shared-monitor-config.asciidoc | 24 ++++++++++++- .../monitoring/report/elasticsearch/config.go | 10 ++++++ .../report/elasticsearch/elasticsearch.go | 35 ++++++++++++------- metricbeat/metricbeat.reference.yml | 11 ++++++ packetbeat/packetbeat.reference.yml | 11 ++++++ winlogbeat/winlogbeat.reference.yml | 11 ++++++ 11 files changed, 133 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b92b7dffecb..73ed77cbaf4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Deregister pipeline loader callback when inputsRunner is stopped. {pull}7893[7893] - Replace index patterns in TSVB visualizations. {pull}7929[7929] - Fixed Support `add_docker_metadata` in Windows by identifying systems' path separator. {issue}7797[7797] +- Add backoff support to x-pack monitoring outputs. {issue}7966[7966] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 35f73d6d471..3dfadd339c4 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -1148,6 +1148,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index f798e4ed301..c580a75598a 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1803,6 +1803,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index ae75232c080..5247b15b81c 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -1250,6 +1250,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 75caa03bc42..61066c4ac80 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -1036,6 +1036,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/libbeat/docs/monitoring/shared-monitor-config.asciidoc b/libbeat/docs/monitoring/shared-monitor-config.asciidoc index 2990d8ef7e3..2ea94649b45 100644 --- a/libbeat/docs/monitoring/shared-monitor-config.asciidoc +++ b/libbeat/docs/monitoring/shared-monitor-config.asciidoc @@ -39,6 +39,21 @@ configuration option contains the following fields: The maximum number of metrics to bulk in a single {es} bulk API index request. The default is `50`. For more information, see <>. +[float] +==== `backoff.init` + +The number of seconds to wait before trying to reconnect to Elasticsearch after +a network error. After waiting `backoff.init` seconds, {beatname_uc} tries to +reconnect. If the attempt fails, the backoff timer is increased exponentially up +to `backoff.max`. After a successful connection, the backoff timer is reset. The +default is 1s. + +[float] +===== `backoff.max` + +The maximum number of seconds to wait before attempting to connect to +Elasticsearch after a network error. The default is 60s. + [float] ==== `compression_level` @@ -79,10 +94,17 @@ The password that {beatname_uc} uses to authenticate with the {es} instances for shipping monitoring data. [float] -==== `period` +==== `metrics.period` The time interval (in seconds) when metrics are sent to the {es} cluster. A new snapshot of {beatname_uc} metrics is generated and scheduled for publishing each +period. The default value is 10 * time.Second. + +[float] +==== `state.period` + +The time interval (in seconds) when state information are sent to the {es} cluster. A new +snapshot of {beatname_uc} state is generated and scheduled for publishing each period. The default value is 60 * time.Second. [float] diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 2856e6d88b8..8f59cf79bad 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -42,6 +42,12 @@ type config struct { BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` BufferSize int `config:"buffer_size"` Tags []string `config:"tags"` + Backoff backoff `config:"backoff"` +} + +type backoff struct { + Init time.Duration + Max time.Duration } var defaultConfig = config{ @@ -61,4 +67,8 @@ var defaultConfig = config{ BulkMaxSize: 50, BufferSize: 50, Tags: nil, + Backoff: backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index f64bad7432a..3bd1bf0b8b9 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -55,7 +55,8 @@ type reporter struct { // pipeline pipeline *pipeline.Pipeline client beat.Client - out outputs.Group + + out []outputs.NetworkClient } const selector = "monitoring" @@ -109,22 +110,21 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { params[k] = v } - out := outputs.Group{ - Clients: nil, - BatchSize: windowSize, - Retry: 0, // no retry. on error drop events - } - hosts, err := outputs.ReadHostList(cfg) if err != nil { return nil, err } + if len(hosts) == 0 { + return nil, errors.New("empty hosts list") + } + + var clients []outputs.NetworkClient for _, host := range hosts { client, err := makeClient(host, params, proxyURL, tlsConfig, &config) if err != nil { return nil, err } - out.Clients = append(out.Clients, client) + clients = append(clients, client) } queueFactory := func(e queue.Eventer) (queue.Queue, error) { @@ -137,10 +137,19 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { monitoring := monitoring.Default.GetRegistry("xpack.monitoring") + outClient := outputs.NewFailoverClient(clients) + outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) + pipeline, err := pipeline.New( beat, monitoring, - queueFactory, out, pipeline.Settings{ + queueFactory, + outputs.Group{ + Clients: []outputs.Client{outClient}, + BatchSize: windowSize, + Retry: 0, // no retry. Drop event on error. + }, + pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, }) @@ -148,7 +157,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { return nil, err } - client, err := pipeline.Connect() + pipeConn, err := pipeline.Connect() if err != nil { pipeline.Close() return nil, err @@ -161,8 +170,8 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { tags: config.Tags, checkRetry: checkRetry, pipeline: pipeline, - client: client, - out: out, + client: pipeConn, + out: clients, } go r.initLoop(config) return r, nil @@ -184,7 +193,7 @@ func (r *reporter) initLoop(c config) { for { // Select one configured endpoint by random and check if xpack is available - client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient) + client := r.out[rand.Intn(len(r.out))] err := client.Connect() if err == nil { closing(log, client) diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index eb95e3de41b..169c3b975c9 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1713,6 +1713,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index bafc5400a59..758c381055f 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -1521,6 +1521,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index f0ac22bdd65..b686b5746d6 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -1065,6 +1065,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90