From acad1cd838c7f9c538cc81f96183422dfc7f3cd7 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 6 Jan 2016 17:42:08 +0100 Subject: [PATCH 1/2] Separate per event and bulk-event queues Have separate queues with different buffer sizes for single events and bulk events in publisher. Default queues size for single events is 1000 elements and for bulk events is set to 0. Default values dramatically reduce total number of buffered events and memory usage in case elasticsearch/logstash become unresponsive. Introduce (hidden) configuration in shipper section: - queue_size - bulk_queue_size When using async publisher number of buffered events is about given by: Q_e = queue size (default 1000) Q_b = bulk queue size (default 0) B = bulk_max_size W = number of load balancing workers (only if load balancing is enabled) N = number of events in memory N = 2*(Q_e) + (2 + Q_b + W)*B using default values: Q_e = 1000 Q_b = 0 B = 2048 - topbeat/packetbeat with default values + single output: N = 2*1000 + 2*2048 = 6096 events - topbeat/packetbeat with default values + load balancing with 4 workers: N = 2*1000 + 6*2048 = 14288 events --- libbeat/publisher/async.go | 11 +++++------ libbeat/publisher/bulk.go | 33 ++++++++++++++++++-------------- libbeat/publisher/bulk_test.go | 7 ++++--- libbeat/publisher/common_test.go | 6 +++--- libbeat/publisher/output.go | 4 ++-- libbeat/publisher/output_test.go | 2 +- libbeat/publisher/publish.go | 26 ++++++++++++++++++++++--- libbeat/publisher/sync.go | 4 ++-- libbeat/publisher/worker.go | 24 ++++++++++++++++------- libbeat/publisher/worker_test.go | 2 +- 10 files changed, 77 insertions(+), 42 deletions(-) diff --git a/libbeat/publisher/async.go b/libbeat/publisher/async.go index cb0b80040121..efbaa321656f 100644 --- a/libbeat/publisher/async.go +++ b/libbeat/publisher/async.go @@ -20,18 +20,17 @@ const ( defaultBulkSize = 200 ) -func newAsyncPublisher(pub *PublisherType) *asyncPublisher { - +func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher { p := &asyncPublisher{pub: pub} p.ws.Init() var outputs []worker for _, out := range pub.Output { - outputs = append(outputs, asyncOutputer(&p.ws, out)) + outputs = append(outputs, asyncOutputer(&p.ws, hwm, bulkHWM, out)) } p.outputs = outputs - p.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, p)) + p.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, p)) return p } @@ -67,7 +66,7 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool return true } -func asyncOutputer(ws *workerSignal, worker *outputWorker) worker { +func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker { config := worker.config flushInterval := defaultFlushInterval @@ -89,5 +88,5 @@ func asyncOutputer(ws *workerSignal, worker *outputWorker) worker { debug("create bulk processing worker (interval=%v, bulk size=%v)", flushInterval, maxBulkSize) - return newBulkWorker(ws, 1000, worker, flushInterval, maxBulkSize) + return newBulkWorker(ws, hwm, bulkHWM, worker, flushInterval, maxBulkSize) } diff --git a/libbeat/publisher/bulk.go b/libbeat/publisher/bulk.go index fda2228f331b..5604c1ff1df1 100644 --- a/libbeat/publisher/bulk.go +++ b/libbeat/publisher/bulk.go @@ -12,6 +12,7 @@ type bulkWorker struct { ws *workerSignal queue chan message + bulkQueue chan message flushTicker *time.Ticker maxBatchSize int @@ -20,7 +21,8 @@ type bulkWorker struct { } func newBulkWorker( - ws *workerSignal, hwm int, output worker, + ws *workerSignal, hwm int, bulkHWM int, + output worker, flushInterval time.Duration, maxBatchSize int, ) *bulkWorker { @@ -28,6 +30,7 @@ func newBulkWorker( output: output, ws: ws, queue: make(chan message, hwm), + bulkQueue: make(chan message, bulkHWM), flushTicker: time.NewTicker(flushInterval), maxBatchSize: maxBatchSize, events: make([]common.MapStr, 0, maxBatchSize), @@ -40,7 +43,11 @@ func newBulkWorker( } func (b *bulkWorker) send(m message) { - b.queue <- m + if m.events == nil { + b.queue <- m + } else { + b.bulkQueue <- m + } } func (b *bulkWorker) run() { @@ -51,16 +58,9 @@ func (b *bulkWorker) run() { case <-b.ws.done: return case m := <-b.queue: - if m.event != nil { // single event - b.onEvent(m.context.signal, m.event) - } else { // batch of events - b.onEvents(m.context.signal, m.events) - } - - // buffer full? - if len(b.events) == cap(b.events) { - b.publish() - } + b.onEvent(m.context.signal, m.event) + case m := <-b.bulkQueue: + b.onEvents(m.context.signal, m.events) case <-b.flushTicker.C: if len(b.events) > 0 { b.publish() @@ -74,6 +74,10 @@ func (b *bulkWorker) onEvent(signal outputs.Signaler, event common.MapStr) { if signal != nil { b.pending = append(b.pending, signal) } + + if len(b.events) == cap(b.events) { + b.publish() + } } func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { @@ -81,11 +85,10 @@ func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { // split up bulk to match required bulk sizes. // If input events have been split up bufferFull will be set and // bulk request will be published. - bufferFull := false spaceLeft := cap(b.events) - len(b.events) consume := len(events) + bufferFull := spaceLeft <= consume if spaceLeft < consume { - bufferFull = true consume = spaceLeft if signal != nil { // creating cascading signaler chain for @@ -113,6 +116,7 @@ func (b *bulkWorker) publish() { context: context{ signal: outputs.NewCompositeSignaler(b.pending...), }, + event: nil, events: b.events, }) @@ -123,5 +127,6 @@ func (b *bulkWorker) publish() { func (b *bulkWorker) shutdown() { b.flushTicker.Stop() stopQueue(b.queue) + stopQueue(b.bulkQueue) b.ws.wg.Done() } diff --git a/libbeat/publisher/bulk_test.go b/libbeat/publisher/bulk_test.go index c8cfd482bcdb..a28ce425f3aa 100644 --- a/libbeat/publisher/bulk_test.go +++ b/libbeat/publisher/bulk_test.go @@ -12,6 +12,7 @@ const ( flushInterval time.Duration = 10 * time.Millisecond maxBatchSize = 10 queueSize = 4 * maxBatchSize + bulkQueueSize = 1 ) // Send a single event to the bulkWorker and verify that the event @@ -23,7 +24,7 @@ func TestBulkWorkerSendSingle(t *testing.T) { } ws := newWorkerSignal() defer ws.stop() - bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize) + bw := newBulkWorker(ws, queueSize, bulkQueueSize, mh, flushInterval, maxBatchSize) s := newTestSignaler() m := testMessage(s, testEvent()) @@ -46,7 +47,7 @@ func TestBulkWorkerSendBatch(t *testing.T) { } ws := newWorkerSignal() defer ws.stop() - bw := newBulkWorker(ws, queueSize, mh, time.Duration(time.Hour), maxBatchSize) + bw := newBulkWorker(ws, queueSize, 0, mh, time.Duration(time.Hour), maxBatchSize) events := make([]common.MapStr, maxBatchSize) for i := range events { @@ -76,7 +77,7 @@ func TestBulkWorkerSendBatchGreaterThanMaxBatchSize(t *testing.T) { } ws := newWorkerSignal() defer ws.stop() - bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize) + bw := newBulkWorker(ws, queueSize, 0, mh, flushInterval, maxBatchSize) // Send events := make([]common.MapStr, maxBatchSize+1) diff --git a/libbeat/publisher/common_test.go b/libbeat/publisher/common_test.go index f44a1d45147f..a27e5ec3fc47 100644 --- a/libbeat/publisher/common_test.go +++ b/libbeat/publisher/common_test.go @@ -141,7 +141,7 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { ow.config.BulkMaxSize = &bulkSize ow.handler = mh ws := workerSignal{} - ow.messageWorker.init(&ws, 1000, mh) + ow.messageWorker.init(&ws, defaultChanSize, defaultBulkChanSize, mh) pub := &PublisherType{ Output: []*outputWorker{ow}, @@ -149,8 +149,8 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { } pub.wsOutput.Init() pub.wsPublisher.Init() - pub.syncPublisher = newSyncPublisher(pub) - pub.asyncPublisher = newAsyncPublisher(pub) + pub.syncPublisher = newSyncPublisher(pub, defaultChanSize, defaultBulkChanSize) + pub.asyncPublisher = newAsyncPublisher(pub, defaultChanSize, defaultBulkChanSize) return &testPublisher{ pub: pub, outputMsgHandler: mh, diff --git a/libbeat/publisher/output.go b/libbeat/publisher/output.go index 53d69bb23edc..e19558147129 100644 --- a/libbeat/publisher/output.go +++ b/libbeat/publisher/output.go @@ -20,6 +20,7 @@ func newOutputWorker( out outputs.Outputer, ws *workerSignal, hwm int, + bulkHWM int, ) *outputWorker { maxBulkSize := defaultBulkSize if config.BulkMaxSize != nil { @@ -31,14 +32,13 @@ func newOutputWorker( config: config, maxBulkSize: maxBulkSize, } - o.messageWorker.init(ws, hwm, o) + o.messageWorker.init(ws, hwm, bulkHWM, o) return o } func (o *outputWorker) onStop() {} func (o *outputWorker) onMessage(m message) { - if m.event != nil { o.onEvent(&m.context, m.event) } else { diff --git a/libbeat/publisher/output_test.go b/libbeat/publisher/output_test.go index 46b0f154cad4..97e851497c01 100644 --- a/libbeat/publisher/output_test.go +++ b/libbeat/publisher/output_test.go @@ -32,7 +32,7 @@ func TestOutputWorker(t *testing.T) { outputs.MothershipConfig{}, outputer, newWorkerSignal(), - 1) + 1, 0) ow.onStop() // Noop diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index e9985f70233f..63267eafc092 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -79,6 +79,10 @@ type ShipperConfig struct { Topology_expire int Tags []string Geoip common.Geoip + + // internal publisher queue sizes + QueueSize *int `yaml:"queue_size"` + BulkQueueSize *int `yaml:"bulk_queue_size"` } type Topology struct { @@ -86,6 +90,11 @@ type Topology struct { Ip string `json:"ip"` } +const ( + defaultChanSize = 1000 + defaultBulkChanSize = 0 +) + func init() { publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") } @@ -191,6 +200,16 @@ func (publisher *PublisherType) init( logp.Info("Dry run mode. All output types except the file based one are disabled.") } + hwm := defaultChanSize + if shipper.QueueSize != nil && *shipper.QueueSize > 0 { + hwm = *shipper.QueueSize + } + + bulkHWM := defaultBulkChanSize + if shipper.BulkQueueSize != nil && *shipper.BulkQueueSize >= 0 { + bulkHWM = *shipper.BulkQueueSize + } + publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip) publisher.wsOutput.Init() @@ -211,7 +230,8 @@ func (publisher *PublisherType) init( debug("Create output worker") outputers = append(outputers, - newOutputWorker(config, output, &publisher.wsOutput, 1000)) + newOutputWorker(config, output, &publisher.wsOutput, + hwm, bulkHWM)) if !config.Save_topology { continue @@ -289,8 +309,8 @@ func (publisher *PublisherType) init( go publisher.UpdateTopologyPeriodically() } - publisher.asyncPublisher = newAsyncPublisher(publisher) - publisher.syncPublisher = newSyncPublisher(publisher) + publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM) + publisher.syncPublisher = newSyncPublisher(publisher, hwm, bulkHWM) return nil } diff --git a/libbeat/publisher/sync.go b/libbeat/publisher/sync.go index 475d8918c683..dad9ae7bce95 100644 --- a/libbeat/publisher/sync.go +++ b/libbeat/publisher/sync.go @@ -12,9 +12,9 @@ type syncPublisher struct { type syncClient func(message) bool -func newSyncPublisher(pub *PublisherType) *syncPublisher { +func newSyncPublisher(pub *PublisherType, hwm, bulkHWM int) *syncPublisher { s := &syncPublisher{pub: pub} - s.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, s)) + s.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, s)) return s } diff --git a/libbeat/publisher/worker.go b/libbeat/publisher/worker.go index fd88151b4d18..51f4d520ab94 100644 --- a/libbeat/publisher/worker.go +++ b/libbeat/publisher/worker.go @@ -18,9 +18,10 @@ type worker interface { } type messageWorker struct { - queue chan message - ws *workerSignal - handler messageHandler + queue chan message + bulkQueue chan message + ws *workerSignal + handler messageHandler } type message struct { @@ -39,14 +40,15 @@ type messageHandler interface { onStop() } -func newMessageWorker(ws *workerSignal, hwm int, h messageHandler) *messageWorker { +func newMessageWorker(ws *workerSignal, hwm, bulkHWM int, h messageHandler) *messageWorker { p := &messageWorker{} - p.init(ws, hwm, h) + p.init(ws, hwm, bulkHWM, h) return p } -func (p *messageWorker) init(ws *workerSignal, hwm int, h messageHandler) { +func (p *messageWorker) init(ws *workerSignal, hwm, bulkHWM int, h messageHandler) { p.queue = make(chan message, hwm) + p.bulkQueue = make(chan message, bulkHWM) p.ws = ws p.handler = h ws.wg.Add(1) @@ -62,6 +64,9 @@ func (p *messageWorker) run() { case m := <-p.queue: messagesInWorkerQueues.Add(-1) p.handler.onMessage(m) + case m := <-p.bulkQueue: + messagesInWorkerQueues.Add(-1) + p.handler.onMessage(m) } } } @@ -69,11 +74,16 @@ func (p *messageWorker) run() { func (p *messageWorker) shutdown() { p.handler.onStop() stopQueue(p.queue) + stopQueue(p.bulkQueue) p.ws.wg.Done() } func (p *messageWorker) send(m message) { - p.queue <- m + if m.event != nil { + p.queue <- m + } else { + p.bulkQueue <- m + } messagesInWorkerQueues.Add(1) } diff --git a/libbeat/publisher/worker_test.go b/libbeat/publisher/worker_test.go index 4cc4b624bd3a..9013d90d8faa 100644 --- a/libbeat/publisher/worker_test.go +++ b/libbeat/publisher/worker_test.go @@ -13,7 +13,7 @@ func TestMessageWorkerSend(t *testing.T) { ws := &workerSignal{} ws.Init() mh := &testMessageHandler{msgs: make(chan message, 10), response: true} - mw := newMessageWorker(ws, 10, mh) + mw := newMessageWorker(ws, 10, 0, mh) // Send an event. s1 := newTestSignaler() From 6865d1ac97e1c8ba202d7c3c49ed7ad0c7c958aa Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 7 Jan 2016 21:38:34 +0100 Subject: [PATCH 2/2] shipper queue size configuration --- CHANGELOG.asciidoc | 1 + filebeat/etc/filebeat.yml | 3 +++ libbeat/docs/shipperconfig.asciidoc | 10 ++++++++++ libbeat/etc/libbeat.yml | 3 +++ packetbeat/etc/packetbeat.yml | 3 +++ topbeat/etc/topbeat.yml | 3 +++ winlogbeat/etc/winlogbeat.yml | 3 +++ 7 files changed, 26 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 428291bd907a..736160355568 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Affecting all Beats* - Fix logging issue with file based output where newlines could be misplaced during concurrent logging {pull}650[650] +- Reduce memory usage by separate queue sizes for single events and bulk events. {pull}649[649] {issue}516[516] *Packetbeat* - Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557] diff --git a/filebeat/etc/filebeat.yml b/filebeat/etc/filebeat.yml index 5c0747a78f9f..e72593875f0d 100644 --- a/filebeat/etc/filebeat.yml +++ b/filebeat/etc/filebeat.yml @@ -345,6 +345,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/libbeat/docs/shipperconfig.asciidoc b/libbeat/docs/shipperconfig.asciidoc index d4e7cfe4a63c..46899baa26af 100644 --- a/libbeat/docs/shipperconfig.asciidoc +++ b/libbeat/docs/shipperconfig.asciidoc @@ -51,6 +51,7 @@ shipper: #paths: # - "/usr/share/GeoIP/GeoLiteCity.dat" # - "/usr/local/var/GeoIP/GeoLiteCity.dat" + ------------------------------------------------------------------------------ ==== Options @@ -143,6 +144,15 @@ useful in case a Beat stops publishing its IP addresses. The IP addresses are removed automatically from the topology map after expiration. The default is 15 seconds. +===== queue_size + +Configure internal queue sizes for single events in processing pipeline. Default +value is 1000. + +===== bulk_queue_size + +(DO NOT TOUCH) Configure internal queue size for bulk events in processing pipeline. Default value is 0. + ===== geoip.paths This configuration option is currently used by Packetbeat only. diff --git a/libbeat/etc/libbeat.yml b/libbeat/etc/libbeat.yml index 507e13a5bcf5..e2c595a9e86a 100644 --- a/libbeat/etc/libbeat.yml +++ b/libbeat/etc/libbeat.yml @@ -183,6 +183,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/packetbeat/etc/packetbeat.yml b/packetbeat/etc/packetbeat.yml index 4e9097dbdfbf..814a88cb0f97 100644 --- a/packetbeat/etc/packetbeat.yml +++ b/packetbeat/etc/packetbeat.yml @@ -322,6 +322,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/topbeat/etc/topbeat.yml b/topbeat/etc/topbeat.yml index 8ebcc3886dcc..bcecd4bf3fce 100644 --- a/topbeat/etc/topbeat.yml +++ b/topbeat/etc/topbeat.yml @@ -209,6 +209,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/winlogbeat/etc/winlogbeat.yml b/winlogbeat/etc/winlogbeat.yml index b3d4fdda93c7..f8e6d886acb9 100644 --- a/winlogbeat/etc/winlogbeat.yml +++ b/winlogbeat/etc/winlogbeat.yml @@ -208,6 +208,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: