diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 428291bd907..73616035556 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Affecting all Beats* - Fix logging issue with file based output where newlines could be misplaced during concurrent logging {pull}650[650] +- Reduce memory usage by separate queue sizes for single events and bulk events. {pull}649[649] {issue}516[516] *Packetbeat* - Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557] diff --git a/filebeat/etc/filebeat.yml b/filebeat/etc/filebeat.yml index 5c0747a78f9..e72593875f0 100644 --- a/filebeat/etc/filebeat.yml +++ b/filebeat/etc/filebeat.yml @@ -345,6 +345,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/libbeat/docs/shipperconfig.asciidoc b/libbeat/docs/shipperconfig.asciidoc index d4e7cfe4a63..46899baa26a 100644 --- a/libbeat/docs/shipperconfig.asciidoc +++ b/libbeat/docs/shipperconfig.asciidoc @@ -51,6 +51,7 @@ shipper: #paths: # - "/usr/share/GeoIP/GeoLiteCity.dat" # - "/usr/local/var/GeoIP/GeoLiteCity.dat" + ------------------------------------------------------------------------------ ==== Options @@ -143,6 +144,15 @@ useful in case a Beat stops publishing its IP addresses. The IP addresses are removed automatically from the topology map after expiration. The default is 15 seconds. +===== queue_size + +Configure internal queue sizes for single events in processing pipeline. Default +value is 1000. + +===== bulk_queue_size + +(DO NOT TOUCH) Configure internal queue size for bulk events in processing pipeline. Default value is 0. + ===== geoip.paths This configuration option is currently used by Packetbeat only. diff --git a/libbeat/etc/libbeat.yml b/libbeat/etc/libbeat.yml index 507e13a5bcf..e2c595a9e86 100644 --- a/libbeat/etc/libbeat.yml +++ b/libbeat/etc/libbeat.yml @@ -183,6 +183,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/libbeat/publisher/async.go b/libbeat/publisher/async.go index cb0b8004012..efbaa321656 100644 --- a/libbeat/publisher/async.go +++ b/libbeat/publisher/async.go @@ -20,18 +20,17 @@ const ( defaultBulkSize = 200 ) -func newAsyncPublisher(pub *PublisherType) *asyncPublisher { - +func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher { p := &asyncPublisher{pub: pub} p.ws.Init() var outputs []worker for _, out := range pub.Output { - outputs = append(outputs, asyncOutputer(&p.ws, out)) + outputs = append(outputs, asyncOutputer(&p.ws, hwm, bulkHWM, out)) } p.outputs = outputs - p.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, p)) + p.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, p)) return p } @@ -67,7 +66,7 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool return true } -func asyncOutputer(ws *workerSignal, worker *outputWorker) worker { +func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker { config := worker.config flushInterval := defaultFlushInterval @@ -89,5 +88,5 @@ func asyncOutputer(ws *workerSignal, worker *outputWorker) worker { debug("create bulk processing worker (interval=%v, bulk size=%v)", flushInterval, maxBulkSize) - return newBulkWorker(ws, 1000, worker, flushInterval, maxBulkSize) + return newBulkWorker(ws, hwm, bulkHWM, worker, flushInterval, maxBulkSize) } diff --git a/libbeat/publisher/bulk.go b/libbeat/publisher/bulk.go index fda2228f331..5604c1ff1df 100644 --- a/libbeat/publisher/bulk.go +++ b/libbeat/publisher/bulk.go @@ -12,6 +12,7 @@ type bulkWorker struct { ws *workerSignal queue chan message + bulkQueue chan message flushTicker *time.Ticker maxBatchSize int @@ -20,7 +21,8 @@ type bulkWorker struct { } func newBulkWorker( - ws *workerSignal, hwm int, output worker, + ws *workerSignal, hwm int, bulkHWM int, + output worker, flushInterval time.Duration, maxBatchSize int, ) *bulkWorker { @@ -28,6 +30,7 @@ func newBulkWorker( output: output, ws: ws, queue: make(chan message, hwm), + bulkQueue: make(chan message, bulkHWM), flushTicker: time.NewTicker(flushInterval), maxBatchSize: maxBatchSize, events: make([]common.MapStr, 0, maxBatchSize), @@ -40,7 +43,11 @@ func newBulkWorker( } func (b *bulkWorker) send(m message) { - b.queue <- m + if m.events == nil { + b.queue <- m + } else { + b.bulkQueue <- m + } } func (b *bulkWorker) run() { @@ -51,16 +58,9 @@ func (b *bulkWorker) run() { case <-b.ws.done: return case m := <-b.queue: - if m.event != nil { // single event - b.onEvent(m.context.signal, m.event) - } else { // batch of events - b.onEvents(m.context.signal, m.events) - } - - // buffer full? - if len(b.events) == cap(b.events) { - b.publish() - } + b.onEvent(m.context.signal, m.event) + case m := <-b.bulkQueue: + b.onEvents(m.context.signal, m.events) case <-b.flushTicker.C: if len(b.events) > 0 { b.publish() @@ -74,6 +74,10 @@ func (b *bulkWorker) onEvent(signal outputs.Signaler, event common.MapStr) { if signal != nil { b.pending = append(b.pending, signal) } + + if len(b.events) == cap(b.events) { + b.publish() + } } func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { @@ -81,11 +85,10 @@ func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { // split up bulk to match required bulk sizes. // If input events have been split up bufferFull will be set and // bulk request will be published. - bufferFull := false spaceLeft := cap(b.events) - len(b.events) consume := len(events) + bufferFull := spaceLeft <= consume if spaceLeft < consume { - bufferFull = true consume = spaceLeft if signal != nil { // creating cascading signaler chain for @@ -113,6 +116,7 @@ func (b *bulkWorker) publish() { context: context{ signal: outputs.NewCompositeSignaler(b.pending...), }, + event: nil, events: b.events, }) @@ -123,5 +127,6 @@ func (b *bulkWorker) publish() { func (b *bulkWorker) shutdown() { b.flushTicker.Stop() stopQueue(b.queue) + stopQueue(b.bulkQueue) b.ws.wg.Done() } diff --git a/libbeat/publisher/bulk_test.go b/libbeat/publisher/bulk_test.go index c8cfd482bcd..a28ce425f3a 100644 --- a/libbeat/publisher/bulk_test.go +++ b/libbeat/publisher/bulk_test.go @@ -12,6 +12,7 @@ const ( flushInterval time.Duration = 10 * time.Millisecond maxBatchSize = 10 queueSize = 4 * maxBatchSize + bulkQueueSize = 1 ) // Send a single event to the bulkWorker and verify that the event @@ -23,7 +24,7 @@ func TestBulkWorkerSendSingle(t *testing.T) { } ws := newWorkerSignal() defer ws.stop() - bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize) + bw := newBulkWorker(ws, queueSize, bulkQueueSize, mh, flushInterval, maxBatchSize) s := newTestSignaler() m := testMessage(s, testEvent()) @@ -46,7 +47,7 @@ func TestBulkWorkerSendBatch(t *testing.T) { } ws := newWorkerSignal() defer ws.stop() - bw := newBulkWorker(ws, queueSize, mh, time.Duration(time.Hour), maxBatchSize) + bw := newBulkWorker(ws, queueSize, 0, mh, time.Duration(time.Hour), maxBatchSize) events := make([]common.MapStr, maxBatchSize) for i := range events { @@ -76,7 +77,7 @@ func TestBulkWorkerSendBatchGreaterThanMaxBatchSize(t *testing.T) { } ws := newWorkerSignal() defer ws.stop() - bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize) + bw := newBulkWorker(ws, queueSize, 0, mh, flushInterval, maxBatchSize) // Send events := make([]common.MapStr, maxBatchSize+1) diff --git a/libbeat/publisher/common_test.go b/libbeat/publisher/common_test.go index f44a1d45147..a27e5ec3fc4 100644 --- a/libbeat/publisher/common_test.go +++ b/libbeat/publisher/common_test.go @@ -141,7 +141,7 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { ow.config.BulkMaxSize = &bulkSize ow.handler = mh ws := workerSignal{} - ow.messageWorker.init(&ws, 1000, mh) + ow.messageWorker.init(&ws, defaultChanSize, defaultBulkChanSize, mh) pub := &PublisherType{ Output: []*outputWorker{ow}, @@ -149,8 +149,8 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { } pub.wsOutput.Init() pub.wsPublisher.Init() - pub.syncPublisher = newSyncPublisher(pub) - pub.asyncPublisher = newAsyncPublisher(pub) + pub.syncPublisher = newSyncPublisher(pub, defaultChanSize, defaultBulkChanSize) + pub.asyncPublisher = newAsyncPublisher(pub, defaultChanSize, defaultBulkChanSize) return &testPublisher{ pub: pub, outputMsgHandler: mh, diff --git a/libbeat/publisher/output.go b/libbeat/publisher/output.go index 53d69bb23ed..e1955814712 100644 --- a/libbeat/publisher/output.go +++ b/libbeat/publisher/output.go @@ -20,6 +20,7 @@ func newOutputWorker( out outputs.Outputer, ws *workerSignal, hwm int, + bulkHWM int, ) *outputWorker { maxBulkSize := defaultBulkSize if config.BulkMaxSize != nil { @@ -31,14 +32,13 @@ func newOutputWorker( config: config, maxBulkSize: maxBulkSize, } - o.messageWorker.init(ws, hwm, o) + o.messageWorker.init(ws, hwm, bulkHWM, o) return o } func (o *outputWorker) onStop() {} func (o *outputWorker) onMessage(m message) { - if m.event != nil { o.onEvent(&m.context, m.event) } else { diff --git a/libbeat/publisher/output_test.go b/libbeat/publisher/output_test.go index 46b0f154cad..97e851497c0 100644 --- a/libbeat/publisher/output_test.go +++ b/libbeat/publisher/output_test.go @@ -32,7 +32,7 @@ func TestOutputWorker(t *testing.T) { outputs.MothershipConfig{}, outputer, newWorkerSignal(), - 1) + 1, 0) ow.onStop() // Noop diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index e9985f70233..63267eafc09 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -79,6 +79,10 @@ type ShipperConfig struct { Topology_expire int Tags []string Geoip common.Geoip + + // internal publisher queue sizes + QueueSize *int `yaml:"queue_size"` + BulkQueueSize *int `yaml:"bulk_queue_size"` } type Topology struct { @@ -86,6 +90,11 @@ type Topology struct { Ip string `json:"ip"` } +const ( + defaultChanSize = 1000 + defaultBulkChanSize = 0 +) + func init() { publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") } @@ -191,6 +200,16 @@ func (publisher *PublisherType) init( logp.Info("Dry run mode. All output types except the file based one are disabled.") } + hwm := defaultChanSize + if shipper.QueueSize != nil && *shipper.QueueSize > 0 { + hwm = *shipper.QueueSize + } + + bulkHWM := defaultBulkChanSize + if shipper.BulkQueueSize != nil && *shipper.BulkQueueSize >= 0 { + bulkHWM = *shipper.BulkQueueSize + } + publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip) publisher.wsOutput.Init() @@ -211,7 +230,8 @@ func (publisher *PublisherType) init( debug("Create output worker") outputers = append(outputers, - newOutputWorker(config, output, &publisher.wsOutput, 1000)) + newOutputWorker(config, output, &publisher.wsOutput, + hwm, bulkHWM)) if !config.Save_topology { continue @@ -289,8 +309,8 @@ func (publisher *PublisherType) init( go publisher.UpdateTopologyPeriodically() } - publisher.asyncPublisher = newAsyncPublisher(publisher) - publisher.syncPublisher = newSyncPublisher(publisher) + publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM) + publisher.syncPublisher = newSyncPublisher(publisher, hwm, bulkHWM) return nil } diff --git a/libbeat/publisher/sync.go b/libbeat/publisher/sync.go index 475d8918c68..dad9ae7bce9 100644 --- a/libbeat/publisher/sync.go +++ b/libbeat/publisher/sync.go @@ -12,9 +12,9 @@ type syncPublisher struct { type syncClient func(message) bool -func newSyncPublisher(pub *PublisherType) *syncPublisher { +func newSyncPublisher(pub *PublisherType, hwm, bulkHWM int) *syncPublisher { s := &syncPublisher{pub: pub} - s.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, s)) + s.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, s)) return s } diff --git a/libbeat/publisher/worker.go b/libbeat/publisher/worker.go index fd88151b4d1..51f4d520ab9 100644 --- a/libbeat/publisher/worker.go +++ b/libbeat/publisher/worker.go @@ -18,9 +18,10 @@ type worker interface { } type messageWorker struct { - queue chan message - ws *workerSignal - handler messageHandler + queue chan message + bulkQueue chan message + ws *workerSignal + handler messageHandler } type message struct { @@ -39,14 +40,15 @@ type messageHandler interface { onStop() } -func newMessageWorker(ws *workerSignal, hwm int, h messageHandler) *messageWorker { +func newMessageWorker(ws *workerSignal, hwm, bulkHWM int, h messageHandler) *messageWorker { p := &messageWorker{} - p.init(ws, hwm, h) + p.init(ws, hwm, bulkHWM, h) return p } -func (p *messageWorker) init(ws *workerSignal, hwm int, h messageHandler) { +func (p *messageWorker) init(ws *workerSignal, hwm, bulkHWM int, h messageHandler) { p.queue = make(chan message, hwm) + p.bulkQueue = make(chan message, bulkHWM) p.ws = ws p.handler = h ws.wg.Add(1) @@ -62,6 +64,9 @@ func (p *messageWorker) run() { case m := <-p.queue: messagesInWorkerQueues.Add(-1) p.handler.onMessage(m) + case m := <-p.bulkQueue: + messagesInWorkerQueues.Add(-1) + p.handler.onMessage(m) } } } @@ -69,11 +74,16 @@ func (p *messageWorker) run() { func (p *messageWorker) shutdown() { p.handler.onStop() stopQueue(p.queue) + stopQueue(p.bulkQueue) p.ws.wg.Done() } func (p *messageWorker) send(m message) { - p.queue <- m + if m.event != nil { + p.queue <- m + } else { + p.bulkQueue <- m + } messagesInWorkerQueues.Add(1) } diff --git a/libbeat/publisher/worker_test.go b/libbeat/publisher/worker_test.go index 4cc4b624bd3..9013d90d8fa 100644 --- a/libbeat/publisher/worker_test.go +++ b/libbeat/publisher/worker_test.go @@ -13,7 +13,7 @@ func TestMessageWorkerSend(t *testing.T) { ws := &workerSignal{} ws.Init() mh := &testMessageHandler{msgs: make(chan message, 10), response: true} - mw := newMessageWorker(ws, 10, mh) + mw := newMessageWorker(ws, 10, 0, mh) // Send an event. s1 := newTestSignaler() diff --git a/packetbeat/etc/packetbeat.yml b/packetbeat/etc/packetbeat.yml index 4e9097dbdfb..814a88cb0f9 100644 --- a/packetbeat/etc/packetbeat.yml +++ b/packetbeat/etc/packetbeat.yml @@ -322,6 +322,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/topbeat/etc/topbeat.yml b/topbeat/etc/topbeat.yml index 8ebcc3886dc..bcecd4bf3fc 100644 --- a/topbeat/etc/topbeat.yml +++ b/topbeat/etc/topbeat.yml @@ -209,6 +209,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: diff --git a/winlogbeat/etc/winlogbeat.yml b/winlogbeat/etc/winlogbeat.yml index b3d4fdda93c..f8e6d886acb 100644 --- a/winlogbeat/etc/winlogbeat.yml +++ b/winlogbeat/etc/winlogbeat.yml @@ -208,6 +208,9 @@ shipper: # refresh_topology_freq. The default is 15 seconds. #topology_expire: 15 + # Internal queue size for single events in processing pipeline + #queue_size: 1000 + # Configure local GeoIP database support. # If no paths are not configured geoip is disabled. #geoip: