Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate per event and bulk-event queues #649

Merged
merged 2 commits into from
Jan 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Affecting all Beats*
- Fix logging issue with file based output where newlines could be misplaced
during concurrent logging {pull}650[650]
- Reduce memory usage by separate queue sizes for single events and bulk events. {pull}649[649] {issue}516[516]

*Packetbeat*
- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557]
Expand Down
3 changes: 3 additions & 0 deletions filebeat/etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ shipper:
# refresh_topology_freq. The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# Configure local GeoIP database support.
# If no paths are not configured geoip is disabled.
#geoip:
Expand Down
10 changes: 10 additions & 0 deletions libbeat/docs/shipperconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ shipper:
#paths:
# - "/usr/share/GeoIP/GeoLiteCity.dat"
# - "/usr/local/var/GeoIP/GeoLiteCity.dat"

------------------------------------------------------------------------------

==== Options
Expand Down Expand Up @@ -143,6 +144,15 @@ useful in case a Beat stops publishing its IP addresses. The IP addresses
are removed automatically from the topology map after expiration. The default
is 15 seconds.

===== queue_size

Configure internal queue sizes for single events in processing pipeline. Default
value is 1000.

===== bulk_queue_size

(DO NOT TOUCH) Configure internal queue size for bulk events in processing pipeline. Default value is 0.

===== geoip.paths

This configuration option is currently used by Packetbeat only.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ shipper:
# refresh_topology_freq. The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# Configure local GeoIP database support.
# If no paths are not configured geoip is disabled.
#geoip:
Expand Down
11 changes: 5 additions & 6 deletions libbeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ const (
defaultBulkSize = 200
)

func newAsyncPublisher(pub *PublisherType) *asyncPublisher {

func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher {
p := &asyncPublisher{pub: pub}
p.ws.Init()

var outputs []worker
for _, out := range pub.Output {
outputs = append(outputs, asyncOutputer(&p.ws, out))
outputs = append(outputs, asyncOutputer(&p.ws, hwm, bulkHWM, out))
}

p.outputs = outputs
p.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, p))
p.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, p))
return p
}

Expand Down Expand Up @@ -67,7 +66,7 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool
return true
}

func asyncOutputer(ws *workerSignal, worker *outputWorker) worker {
func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker {
config := worker.config

flushInterval := defaultFlushInterval
Expand All @@ -89,5 +88,5 @@ func asyncOutputer(ws *workerSignal, worker *outputWorker) worker {

debug("create bulk processing worker (interval=%v, bulk size=%v)",
flushInterval, maxBulkSize)
return newBulkWorker(ws, 1000, worker, flushInterval, maxBulkSize)
return newBulkWorker(ws, hwm, bulkHWM, worker, flushInterval, maxBulkSize)
}
33 changes: 19 additions & 14 deletions libbeat/publisher/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type bulkWorker struct {
ws *workerSignal

queue chan message
bulkQueue chan message
flushTicker *time.Ticker

maxBatchSize int
Expand All @@ -20,14 +21,16 @@ type bulkWorker struct {
}

func newBulkWorker(
ws *workerSignal, hwm int, output worker,
ws *workerSignal, hwm int, bulkHWM int,
output worker,
flushInterval time.Duration,
maxBatchSize int,
) *bulkWorker {
b := &bulkWorker{
output: output,
ws: ws,
queue: make(chan message, hwm),
bulkQueue: make(chan message, bulkHWM),
flushTicker: time.NewTicker(flushInterval),
maxBatchSize: maxBatchSize,
events: make([]common.MapStr, 0, maxBatchSize),
Expand All @@ -40,7 +43,11 @@ func newBulkWorker(
}

func (b *bulkWorker) send(m message) {
b.queue <- m
if m.events == nil {
b.queue <- m
} else {
b.bulkQueue <- m
}
}

func (b *bulkWorker) run() {
Expand All @@ -51,16 +58,9 @@ func (b *bulkWorker) run() {
case <-b.ws.done:
return
case m := <-b.queue:
if m.event != nil { // single event
b.onEvent(m.context.signal, m.event)
} else { // batch of events
b.onEvents(m.context.signal, m.events)
}

// buffer full?
if len(b.events) == cap(b.events) {
b.publish()
}
b.onEvent(m.context.signal, m.event)
case m := <-b.bulkQueue:
b.onEvents(m.context.signal, m.events)
case <-b.flushTicker.C:
if len(b.events) > 0 {
b.publish()
Expand All @@ -74,18 +74,21 @@ func (b *bulkWorker) onEvent(signal outputs.Signaler, event common.MapStr) {
if signal != nil {
b.pending = append(b.pending, signal)
}

if len(b.events) == cap(b.events) {
b.publish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are bulk events only sent if the queue if completely full or also after a timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

full and timeout (line 64) force publish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) {
for len(events) > 0 {
// split up bulk to match required bulk sizes.
// If input events have been split up bufferFull will be set and
// bulk request will be published.
bufferFull := false
spaceLeft := cap(b.events) - len(b.events)
consume := len(events)
bufferFull := spaceLeft <= consume
if spaceLeft < consume {
bufferFull = true
consume = spaceLeft
if signal != nil {
// creating cascading signaler chain for
Expand Down Expand Up @@ -113,6 +116,7 @@ func (b *bulkWorker) publish() {
context: context{
signal: outputs.NewCompositeSignaler(b.pending...),
},
event: nil,
events: b.events,
})

Expand All @@ -123,5 +127,6 @@ func (b *bulkWorker) publish() {
func (b *bulkWorker) shutdown() {
b.flushTicker.Stop()
stopQueue(b.queue)
stopQueue(b.bulkQueue)
b.ws.wg.Done()
}
7 changes: 4 additions & 3 deletions libbeat/publisher/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
flushInterval time.Duration = 10 * time.Millisecond
maxBatchSize = 10
queueSize = 4 * maxBatchSize
bulkQueueSize = 1
)

// Send a single event to the bulkWorker and verify that the event
Expand All @@ -23,7 +24,7 @@ func TestBulkWorkerSendSingle(t *testing.T) {
}
ws := newWorkerSignal()
defer ws.stop()
bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize)
bw := newBulkWorker(ws, queueSize, bulkQueueSize, mh, flushInterval, maxBatchSize)

s := newTestSignaler()
m := testMessage(s, testEvent())
Expand All @@ -46,7 +47,7 @@ func TestBulkWorkerSendBatch(t *testing.T) {
}
ws := newWorkerSignal()
defer ws.stop()
bw := newBulkWorker(ws, queueSize, mh, time.Duration(time.Hour), maxBatchSize)
bw := newBulkWorker(ws, queueSize, 0, mh, time.Duration(time.Hour), maxBatchSize)

events := make([]common.MapStr, maxBatchSize)
for i := range events {
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestBulkWorkerSendBatchGreaterThanMaxBatchSize(t *testing.T) {
}
ws := newWorkerSignal()
defer ws.stop()
bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize)
bw := newBulkWorker(ws, queueSize, 0, mh, flushInterval, maxBatchSize)

// Send
events := make([]common.MapStr, maxBatchSize+1)
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher {
ow.config.BulkMaxSize = &bulkSize
ow.handler = mh
ws := workerSignal{}
ow.messageWorker.init(&ws, 1000, mh)
ow.messageWorker.init(&ws, defaultChanSize, defaultBulkChanSize, mh)

pub := &PublisherType{
Output: []*outputWorker{ow},
wsOutput: ws,
}
pub.wsOutput.Init()
pub.wsPublisher.Init()
pub.syncPublisher = newSyncPublisher(pub)
pub.asyncPublisher = newAsyncPublisher(pub)
pub.syncPublisher = newSyncPublisher(pub, defaultChanSize, defaultBulkChanSize)
pub.asyncPublisher = newAsyncPublisher(pub, defaultChanSize, defaultBulkChanSize)
return &testPublisher{
pub: pub,
outputMsgHandler: mh,
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func newOutputWorker(
out outputs.Outputer,
ws *workerSignal,
hwm int,
bulkHWM int,
) *outputWorker {
maxBulkSize := defaultBulkSize
if config.BulkMaxSize != nil {
Expand All @@ -31,14 +32,13 @@ func newOutputWorker(
config: config,
maxBulkSize: maxBulkSize,
}
o.messageWorker.init(ws, hwm, o)
o.messageWorker.init(ws, hwm, bulkHWM, o)
return o
}

func (o *outputWorker) onStop() {}

func (o *outputWorker) onMessage(m message) {

if m.event != nil {
o.onEvent(&m.context, m.event)
} else {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestOutputWorker(t *testing.T) {
outputs.MothershipConfig{},
outputer,
newWorkerSignal(),
1)
1, 0)

ow.onStop() // Noop

Expand Down
26 changes: 23 additions & 3 deletions libbeat/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,22 @@ type ShipperConfig struct {
Topology_expire int
Tags []string
Geoip common.Geoip

// internal publisher queue sizes
QueueSize *int `yaml:"queue_size"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 on making this also visible in the config file but with a warning, that people that change it must understand what they are doing.

BulkQueueSize *int `yaml:"bulk_queue_size"`
}

type Topology struct {
Name string `json:"name"`
Ip string `json:"ip"`
}

const (
defaultChanSize = 1000
defaultBulkChanSize = 0
)

func init() {
publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing")
}
Expand Down Expand Up @@ -191,6 +200,16 @@ func (publisher *PublisherType) init(
logp.Info("Dry run mode. All output types except the file based one are disabled.")
}

hwm := defaultChanSize
if shipper.QueueSize != nil && *shipper.QueueSize > 0 {
hwm = *shipper.QueueSize
}

bulkHWM := defaultBulkChanSize
if shipper.BulkQueueSize != nil && *shipper.BulkQueueSize >= 0 {
bulkHWM = *shipper.BulkQueueSize
}

publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip)

publisher.wsOutput.Init()
Expand All @@ -211,7 +230,8 @@ func (publisher *PublisherType) init(
debug("Create output worker")

outputers = append(outputers,
newOutputWorker(config, output, &publisher.wsOutput, 1000))
newOutputWorker(config, output, &publisher.wsOutput,
hwm, bulkHWM))

if !config.Save_topology {
continue
Expand Down Expand Up @@ -289,8 +309,8 @@ func (publisher *PublisherType) init(
go publisher.UpdateTopologyPeriodically()
}

publisher.asyncPublisher = newAsyncPublisher(publisher)
publisher.syncPublisher = newSyncPublisher(publisher)
publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM)
publisher.syncPublisher = newSyncPublisher(publisher, hwm, bulkHWM)

return nil
}
4 changes: 2 additions & 2 deletions libbeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type syncPublisher struct {

type syncClient func(message) bool

func newSyncPublisher(pub *PublisherType) *syncPublisher {
func newSyncPublisher(pub *PublisherType, hwm, bulkHWM int) *syncPublisher {
s := &syncPublisher{pub: pub}
s.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, s))
s.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, s))
return s
}

Expand Down
Loading