Skip to content

Commit

Permalink
Add option to enable/disable LS output slow start (#4972)
Browse files Browse the repository at this point in the history
* Add option to enable/disable LS output slow start

- add support to disable slow start in LS output
- disable slow start by default.

LS with java rewrite returns some kind of heartbeat every few seconds,
so beats can tell the batch is still actively processed. With this
change in LS, the original purpose of limiting batch sizes has become somewhat
superfluous and is not really required anymore.
-> We disable slow start and windowing by default, but keep the setting
in case we find it's still required (e.g. when sending to older LS
instances).

* Update docs and reference configs

* Remove unused method

* Update changelog

* review
  • Loading branch information
Steffen Siering authored and tsg committed Aug 24, 2017
1 parent 66097bd commit c2360d0
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di

*Affecting all Beats*

- Add setting to enable/disable the slow start in logstash output. {pull}4972[4972]
- Update init scripts to use the `test config` subcommand instead of the deprecated `-configtest` flag. {issue}4600[4600]
- Get by default the credentials for connecting to Kibana from the Elasticsearch output configuration. {pull}4867[4867]
- Move TCP UDP start up into `server.Start()` {pull}4903[4903]
Expand Down
5 changes: 5 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'auditbeat'
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'filebeat'
Expand Down
5 changes: 5 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'heartbeat'
Expand Down
5 changes: 5 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'beatname'
Expand Down
8 changes: 8 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,14 @@ Beats that publish single events (such as Packetbeat) send each event directly t
Elasticsearch. Beats that publish data in batches (such as Filebeat) send events in batches based on the
spooler size.

===== `slow_start`

If enabled only a subset of events in a batch of events is transfered per transaction.
The number of events to be sent increases up to `bulk_max_size` if no error is encountered.
On error the number of events per transaction is reduced again.

The default is `false`.

[[kafka-output]]
=== Configure the Kafka output

Expand Down
45 changes: 27 additions & 18 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type asyncClient struct {
*transport.Client
stats *outputs.Stats
client *v2.AsyncClient
win window
win *window

connect func() error
}
Expand All @@ -36,10 +36,14 @@ func newAsyncClient(
stats *outputs.Stats,
config *Config,
) (*asyncClient, error) {
c := &asyncClient{}
c.Client = conn
c.stats = stats
c.win.init(defaultStartMaxWindowSize, config.BulkMaxSize)
c := &asyncClient{
Client: conn,
stats: stats,
}

if config.SlowStart {
c.win = newWindower(defaultStartMaxWindowSize, config.BulkMaxSize)
}

if config.TTL != 0 {
logp.Warn(`The async Logstash client does not support the "ttl" option`)
Expand Down Expand Up @@ -99,10 +103,6 @@ func (c *asyncClient) Close() error {
return c.Client.Close()
}

func (c *asyncClient) BatchSize() int {
return c.win.get()
}

func (c *asyncClient) Publish(batch publisher.Batch) error {
st := c.stats
events := batch.Events()
Expand All @@ -113,24 +113,29 @@ func (c *asyncClient) Publish(batch publisher.Batch) error {
return nil
}

window := make([]interface{}, len(events))
for i := range events {
window[i] = &events[i]
}

ref := &msgRef{
client: c,
count: atomic.MakeUint32(1),
batch: batch,
slice: events,
batchSize: len(events),
win: &c.win,
win: c.win,
err: nil,
}
defer ref.dec()

for len(events) > 0 {
n, err := c.publishWindowed(ref, events)
var (
n int
err error
)

if c.win == nil {
n = len(events)
err = c.sendEvents(ref, events)
} else {
n, err = c.publishWindowed(ref, events)
}

debugf("%v events out of %v events sent to logstash. Continue sending",
n, len(events))
Expand Down Expand Up @@ -188,7 +193,9 @@ func (r *msgRef) callback(seq uint32, err error) {
func (r *msgRef) done(n uint32) {
r.client.stats.Acked(int(n))
r.slice = r.slice[n:]
r.win.tryGrowWindow(r.batchSize)
if r.win != nil {
r.win.tryGrowWindow(r.batchSize)
}
r.dec()
}

Expand All @@ -197,7 +204,9 @@ func (r *msgRef) fail(n uint32, err error) {
r.err = err
}
r.slice = r.slice[n:]
r.win.shrinkWindow()
if r.win != nil {
r.win.shrinkWindow()
}

r.client.stats.Acked(int(n))

Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
Port int `config:"port"`
LoadBalance bool `config:"loadbalance"`
BulkMaxSize int `config:"bulk_max_size"`
SlowStart bool `config:"slow_start"`
Timeout time.Duration `config:"timeout"`
TTL time.Duration `config:"ttl" validate:"min=0"`
Pipelining int `config:"pipelining" validate:"min=0"`
Expand All @@ -32,6 +33,7 @@ var defaultConfig = Config{
LoadBalance: false,
Pipelining: 5,
BulkMaxSize: 2048,
SlowStart: false,
CompressionLevel: 3,
Timeout: 30 * time.Second,
MaxRetries: 3,
Expand Down
36 changes: 27 additions & 9 deletions libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type syncClient struct {
*transport.Client
client *v2.SyncClient
stats *outputs.Stats
win window
win *window
ttl time.Duration
ticker *time.Ticker
}
Expand All @@ -24,11 +24,15 @@ func newSyncClient(
stats *outputs.Stats,
config *Config,
) (*syncClient, error) {
c := &syncClient{}
c.Client = conn
c.ttl = config.TTL
c.stats = stats
c.win.init(defaultStartMaxWindowSize, config.BulkMaxSize)
c := &syncClient{
Client: conn,
stats: stats,
ttl: config.TTL,
}

if config.SlowStart {
c.win = newWindower(defaultStartMaxWindowSize, config.BulkMaxSize)
}
if c.ttl > 0 {
c.ticker = time.NewTicker(c.ttl)
}
Expand Down Expand Up @@ -95,13 +99,25 @@ func (c *syncClient) Publish(batch publisher.Batch) error {
batch.Retry()
return err
}

// reset window size on reconnect
c.win.windowSize = int32(defaultStartMaxWindowSize)
if c.win != nil {
c.win.windowSize = int32(defaultStartMaxWindowSize)
}
default:
}
}

n, err := c.publishWindowed(events)
var (
n int
err error
)

if c.win == nil {
n, err = c.sendEvents(events)
} else {
n, err = c.publishWindowed(events)
}
events = events[n:]
st.Acked(n)

Expand All @@ -112,7 +128,9 @@ func (c *syncClient) Publish(batch publisher.Batch) error {
// return batch to pipeline before reporting/counting error
batch.RetryEvents(events)

c.win.shrinkWindow()
if c.win != nil {
c.win.shrinkWindow()
}
_ = c.Close()

logp.Err("Failed to publish events caused by: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions libbeat/outputs/logstash/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ type window struct {
maxWindowSize int
}

func newWindower(start, max int) *window {
w := &window{}
w.init(start, max)
return w
}

func (w *window) init(start, max int) {
*w = window{
windowSize: int32(start),
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'metricbeat'
Expand Down
5 changes: 5 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'packetbeat'
Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ output.elasticsearch:
# new batches.
#pipelining: 5

# If enabled only a subset of events in a batch of events is transfered per
# transaction. The number of events to sent increases up to `bulk_max_size`
# if no error is encountered.
#slow_start: false

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: 'winlogbeat'
Expand Down

0 comments on commit c2360d0

Please sign in to comment.