From c2360d08da718c2ebeca9757f9e0895d39e921c1 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 24 Aug 2017 11:53:35 +0200 Subject: [PATCH] Add option to enable/disable LS output slow start (#4972) * Add option to enable/disable LS output slow start - add support to disable slow start in LS output - disable slow start by default. LS with java rewrite returns some kind of heartbeat every few seconds, so beats can tell the batch is still actively processed. With this change in LS, the original purpose of limiting batch sizes has become somewhat superfluous and is not really required anymore. -> We disable slow start and windowing by default, but keep the setting in case we find it's still required (e.g. when sending to older LS instances). * Update docs and reference configs * Remove unused method * Update changelog * review --- CHANGELOG.asciidoc | 1 + auditbeat/auditbeat.reference.yml | 5 ++++ filebeat/filebeat.reference.yml | 5 ++++ heartbeat/heartbeat.reference.yml | 5 ++++ libbeat/_meta/config.reference.yml | 5 ++++ libbeat/docs/outputconfig.asciidoc | 8 +++++ libbeat/outputs/logstash/async.go | 45 +++++++++++++++++------------ libbeat/outputs/logstash/config.go | 2 ++ libbeat/outputs/logstash/sync.go | 36 +++++++++++++++++------ libbeat/outputs/logstash/window.go | 6 ++++ metricbeat/metricbeat.reference.yml | 5 ++++ packetbeat/packetbeat.reference.yml | 5 ++++ winlogbeat/winlogbeat.reference.yml | 5 ++++ 13 files changed, 106 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0f3a5b31ea09..1409a2b4adf0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di *Affecting all Beats* +- Add setting to enable/disable the slow start in logstash output. {pull}4972[4972] - Update init scripts to use the `test config` subcommand instead of the deprecated `-configtest` flag. {issue}4600[4600] - Get by default the credentials for connecting to Kibana from the Elasticsearch output configuration. {pull}4867[4867] - Move TCP UDP start up into `server.Start()` {pull}4903[4903] diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 64c83f7bb630..2e3a127379c7 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -311,6 +311,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'auditbeat' diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 0271ea399ff9..1bd319ca0fc6 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -731,6 +731,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'filebeat' diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index bbf555c33c69..e17e962120e2 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -460,6 +460,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'heartbeat' diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 4e80099b5408..28bf014bbb4e 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -246,6 +246,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'beatname' diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 49a4ed763e05..c07e0037a2bd 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -532,6 +532,14 @@ Beats that publish single events (such as Packetbeat) send each event directly t Elasticsearch. Beats that publish data in batches (such as Filebeat) send events in batches based on the spooler size. +===== `slow_start` + +If enabled only a subset of events in a batch of events is transfered per transaction. +The number of events to be sent increases up to `bulk_max_size` if no error is encountered. +On error the number of events per transaction is reduced again. + +The default is `false`. + [[kafka-output]] === Configure the Kafka output diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 92b9ea3813ac..10c0ed1fe23f 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -16,7 +16,7 @@ type asyncClient struct { *transport.Client stats *outputs.Stats client *v2.AsyncClient - win window + win *window connect func() error } @@ -36,10 +36,14 @@ func newAsyncClient( stats *outputs.Stats, config *Config, ) (*asyncClient, error) { - c := &asyncClient{} - c.Client = conn - c.stats = stats - c.win.init(defaultStartMaxWindowSize, config.BulkMaxSize) + c := &asyncClient{ + Client: conn, + stats: stats, + } + + if config.SlowStart { + c.win = newWindower(defaultStartMaxWindowSize, config.BulkMaxSize) + } if config.TTL != 0 { logp.Warn(`The async Logstash client does not support the "ttl" option`) @@ -99,10 +103,6 @@ func (c *asyncClient) Close() error { return c.Client.Close() } -func (c *asyncClient) BatchSize() int { - return c.win.get() -} - func (c *asyncClient) Publish(batch publisher.Batch) error { st := c.stats events := batch.Events() @@ -113,24 +113,29 @@ func (c *asyncClient) Publish(batch publisher.Batch) error { return nil } - window := make([]interface{}, len(events)) - for i := range events { - window[i] = &events[i] - } - ref := &msgRef{ client: c, count: atomic.MakeUint32(1), batch: batch, slice: events, batchSize: len(events), - win: &c.win, + win: c.win, err: nil, } defer ref.dec() for len(events) > 0 { - n, err := c.publishWindowed(ref, events) + var ( + n int + err error + ) + + if c.win == nil { + n = len(events) + err = c.sendEvents(ref, events) + } else { + n, err = c.publishWindowed(ref, events) + } debugf("%v events out of %v events sent to logstash. Continue sending", n, len(events)) @@ -188,7 +193,9 @@ func (r *msgRef) callback(seq uint32, err error) { func (r *msgRef) done(n uint32) { r.client.stats.Acked(int(n)) r.slice = r.slice[n:] - r.win.tryGrowWindow(r.batchSize) + if r.win != nil { + r.win.tryGrowWindow(r.batchSize) + } r.dec() } @@ -197,7 +204,9 @@ func (r *msgRef) fail(n uint32, err error) { r.err = err } r.slice = r.slice[n:] - r.win.shrinkWindow() + if r.win != nil { + r.win.shrinkWindow() + } r.client.stats.Acked(int(n)) diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index 6e8e1feb5e81..5bb6fcc69b5f 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -12,6 +12,7 @@ type Config struct { Port int `config:"port"` LoadBalance bool `config:"loadbalance"` BulkMaxSize int `config:"bulk_max_size"` + SlowStart bool `config:"slow_start"` Timeout time.Duration `config:"timeout"` TTL time.Duration `config:"ttl" validate:"min=0"` Pipelining int `config:"pipelining" validate:"min=0"` @@ -32,6 +33,7 @@ var defaultConfig = Config{ LoadBalance: false, Pipelining: 5, BulkMaxSize: 2048, + SlowStart: false, CompressionLevel: 3, Timeout: 30 * time.Second, MaxRetries: 3, diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 9caf46cf495a..4bf2bdbd98b5 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -14,7 +14,7 @@ type syncClient struct { *transport.Client client *v2.SyncClient stats *outputs.Stats - win window + win *window ttl time.Duration ticker *time.Ticker } @@ -24,11 +24,15 @@ func newSyncClient( stats *outputs.Stats, config *Config, ) (*syncClient, error) { - c := &syncClient{} - c.Client = conn - c.ttl = config.TTL - c.stats = stats - c.win.init(defaultStartMaxWindowSize, config.BulkMaxSize) + c := &syncClient{ + Client: conn, + stats: stats, + ttl: config.TTL, + } + + if config.SlowStart { + c.win = newWindower(defaultStartMaxWindowSize, config.BulkMaxSize) + } if c.ttl > 0 { c.ticker = time.NewTicker(c.ttl) } @@ -95,13 +99,25 @@ func (c *syncClient) Publish(batch publisher.Batch) error { batch.Retry() return err } + // reset window size on reconnect - c.win.windowSize = int32(defaultStartMaxWindowSize) + if c.win != nil { + c.win.windowSize = int32(defaultStartMaxWindowSize) + } default: } } - n, err := c.publishWindowed(events) + var ( + n int + err error + ) + + if c.win == nil { + n, err = c.sendEvents(events) + } else { + n, err = c.publishWindowed(events) + } events = events[n:] st.Acked(n) @@ -112,7 +128,9 @@ func (c *syncClient) Publish(batch publisher.Batch) error { // return batch to pipeline before reporting/counting error batch.RetryEvents(events) - c.win.shrinkWindow() + if c.win != nil { + c.win.shrinkWindow() + } _ = c.Close() logp.Err("Failed to publish events caused by: %v", err) diff --git a/libbeat/outputs/logstash/window.go b/libbeat/outputs/logstash/window.go index 1786c53662fb..df7ff1cd4dd8 100644 --- a/libbeat/outputs/logstash/window.go +++ b/libbeat/outputs/logstash/window.go @@ -11,6 +11,12 @@ type window struct { maxWindowSize int } +func newWindower(start, max int) *window { + w := &window{} + w.init(start, max) + return w +} + func (w *window) init(start, max int) { *w = window{ windowSize: int32(start), diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 22180766a241..12766fa9a2bf 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -690,6 +690,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'metricbeat' diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 676ab18cf984..0beec09d5429 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -698,6 +698,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'packetbeat' diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index f3e130aa5639..4454e8b44d7d 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -275,6 +275,11 @@ output.elasticsearch: # new batches. #pipelining: 5 + # If enabled only a subset of events in a batch of events is transfered per + # transaction. The number of events to sent increases up to `bulk_max_size` + # if no error is encountered. + #slow_start: false + # Optional index name. The default index name is set to name of the beat # in all lowercase. #index: 'winlogbeat'