diff --git a/libbeat/publisher/bulk.go b/libbeat/publisher/bulk.go index 01887c145415..a10dff6f6a17 100644 --- a/libbeat/publisher/bulk.go +++ b/libbeat/publisher/bulk.go @@ -13,6 +13,7 @@ type bulkWorker struct { queue chan message bulkQueue chan message + guaranteed bool flushTicker *time.Ticker maxBatchSize int @@ -58,9 +59,9 @@ func (b *bulkWorker) run() { case <-b.ws.done: return case m := <-b.queue: - b.onEvent(m.context.Signal, m.event) + b.onEvent(&m.context, m.event) case m := <-b.bulkQueue: - b.onEvents(m.context.Signal, m.events) + b.onEvents(&m.context, m.events) case <-b.flushTicker.C: if len(b.events) > 0 { b.publish() @@ -69,8 +70,11 @@ func (b *bulkWorker) run() { } } -func (b *bulkWorker) onEvent(signal outputs.Signaler, event common.MapStr) { +func (b *bulkWorker) onEvent(ctx *Context, event common.MapStr) { b.events = append(b.events, event) + b.guaranteed = b.guaranteed || ctx.Guaranteed + + signal := ctx.Signal if signal != nil { b.pending = append(b.pending, signal) } @@ -80,7 +84,7 @@ func (b *bulkWorker) onEvent(signal outputs.Signaler, event common.MapStr) { } } -func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { +func (b *bulkWorker) onEvents(ctx *Context, events []common.MapStr) { for len(events) > 0 { // split up bulk to match required bulk sizes. // If input events have been split up bufferFull will be set and @@ -88,6 +92,8 @@ func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { spaceLeft := cap(b.events) - len(b.events) consume := len(events) bufferFull := spaceLeft <= consume + signal := ctx.Signal + b.guaranteed = b.guaranteed || ctx.Guaranteed if spaceLeft < consume { consume = spaceLeft if signal != nil { @@ -114,13 +120,15 @@ func (b *bulkWorker) publish() { // TODO: remember/merge and forward context options to output worker b.output.send(message{ context: Context{ - Signal: outputs.NewCompositeSignaler(b.pending...), + publishOptions: publishOptions{Guaranteed: b.guaranteed}, + Signal: outputs.NewCompositeSignaler(b.pending...), }, event: nil, events: b.events, }) b.pending = nil + b.guaranteed = false b.events = make([]common.MapStr, 0, b.maxBatchSize) } diff --git a/libbeat/publisher/output.go b/libbeat/publisher/output.go index b948f20a2c15..da4ee3465bd6 100644 --- a/libbeat/publisher/output.go +++ b/libbeat/publisher/output.go @@ -86,7 +86,8 @@ func (o *outputWorker) sendBulk( ) { debug("output worker: publish %v events", len(events)) - err := o.out.BulkPublish(ctx.Signal, outputs.Options{ctx.Guaranteed}, events) + opts := outputs.Options{ctx.Guaranteed} + err := o.out.BulkPublish(ctx.Signal, opts, events) if err != nil { logp.Info("Error bulk publishing events: %s", err) }