diff --git a/libbeat/publisher/pipeline/batch.go b/libbeat/publisher/pipeline/batch.go index ddb222f0b02d..8d5ffaef9d58 100644 --- a/libbeat/publisher/pipeline/batch.go +++ b/libbeat/publisher/pipeline/batch.go @@ -15,7 +15,7 @@ type Batch struct { } type batchContext struct { - observer *observer + observer outputObserver retryer *retryer } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index d6e75e770e13..bc92b4da91f6 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -12,7 +12,7 @@ import ( // - reload type outputController struct { logger *logp.Logger - observer *observer + observer outputObserver queue queue.Queue @@ -40,7 +40,7 @@ type outputWorker interface { func newOutputController( log *logp.Logger, - observer *observer, + observer outputObserver, b queue.Queue, ) *outputController { c := &outputController{ @@ -52,7 +52,7 @@ func newOutputController( ctx := &batchContext{} c.consumer = newEventConsumer(log, b, ctx) c.retryer = newRetryer(log, observer, nil, c.consumer) - ctx.observer = c.observer + ctx.observer = observer ctx.retryer = c.retryer c.consumer.sigContinue() diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index 91c2a80cd9f4..8ba2ad373494 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -2,13 +2,48 @@ package pipeline import "github.com/elastic/beats/libbeat/monitoring" -// observer is used by many component in the publisher pipeline, to report +type observer interface { + pipelineObserver + clientObserver + queueObserver + outputObserver + + cleanup() +} + +type pipelineObserver interface { + clientConnected() + clientClosing() + clientClosed() +} + +type clientObserver interface { + newEvent() + filteredEvent() + publishedEvent() + failedPublishEvent() +} + +type queueObserver interface { + queueACKed(n int) +} + +type outputObserver interface { + updateOutputGroup() + eventsFailed(int) + eventsDropped(int) + eventsRetry(int) + outBatchSend(int) + outBatchACKed(int) +} + +// metricsObserver is used by many component in the publisher pipeline, to report // internal events. The oberserver can call registered global event handlers or // updated shared counters/metrics for reporting. // All events required for reporting events/metrics on the pipeline-global level // are defined by observer. The components are only allowed to serve localized // event-handlers only (e.g. the client centric events callbacks) -type observer struct { +type metricsObserver struct { metrics *monitoring.Registry // clients metrics @@ -23,14 +58,13 @@ type observer struct { ackedQueue *monitoring.Uint } -func (o *observer) init(metrics *monitoring.Registry) { - o.metrics = metrics +func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { reg := metrics.GetRegistry("pipeline") if reg == nil { reg = metrics.NewRegistry("pipeline") } - *o = observer{ + return &metricsObserver{ metrics: metrics, clients: monitoring.NewUint(reg, "clients"), @@ -47,8 +81,10 @@ func (o *observer) init(metrics *monitoring.Registry) { } } -func (o *observer) cleanup() { - o.metrics.Remove("pipeline") // drop all metrics from registry +func (o *metricsObserver) cleanup() { + if o.metrics != nil { + o.metrics.Remove("pipeline") // drop all metrics from registry + } } // @@ -56,37 +92,37 @@ func (o *observer) cleanup() { // // (pipeline) pipeline did finish creating a new client instance -func (o *observer) clientConnected() { o.clients.Inc() } +func (o *metricsObserver) clientConnected() { o.clients.Inc() } // (client) close being called on client -func (o *observer) clientClosing() {} +func (o *metricsObserver) clientClosing() {} // (client) client finished processing close -func (o *observer) clientClosed() { o.clients.Dec() } +func (o *metricsObserver) clientClosed() { o.clients.Dec() } // // client publish events // // (client) client is trying to publish a new event -func (o *observer) newEvent() { +func (o *metricsObserver) newEvent() { o.events.Inc() o.activeEvents.Inc() } // (client) event is filtered out (on purpose or failed) -func (o *observer) filteredEvent() { +func (o *metricsObserver) filteredEvent() { o.filtered.Inc() o.activeEvents.Dec() } // (client) managed to push an event into the publisher pipeline -func (o *observer) publishedEvent() { +func (o *metricsObserver) publishedEvent() { o.published.Inc() } // (client) client closing down or DropIfFull is set -func (o *observer) failedPublishEvent() { +func (o *metricsObserver) failedPublishEvent() { o.failed.Inc() o.activeEvents.Dec() } @@ -96,7 +132,7 @@ func (o *observer) failedPublishEvent() { // // (queue) number of events ACKed by the queue/broker in use -func (o *observer) queueACKed(n int) { +func (o *metricsObserver) queueACKed(n int) { o.ackedQueue.Add(uint64(n)) o.activeEvents.Sub(uint64(n)) } @@ -106,23 +142,43 @@ func (o *observer) queueACKed(n int) { // // (controller) new output group is about to be loaded -func (o *observer) updateOutputGroup() {} +func (o *metricsObserver) updateOutputGroup() {} // (retryer) new failed batch has been received -func (o *observer) eventsFailed(int) {} +func (o *metricsObserver) eventsFailed(int) {} // (retryer) number of events dropped by retryer -func (o *observer) eventsDropped(n int) { +func (o *metricsObserver) eventsDropped(n int) { o.dropped.Add(uint64(n)) } // (retryer) number of events pushed to the output worker queue -func (o *observer) eventsRetry(n int) { +func (o *metricsObserver) eventsRetry(n int) { o.retry.Add(uint64(n)) } // (output) number of events to be forwarded to the output client -func (o *observer) outBatchSend(int) {} +func (o *metricsObserver) outBatchSend(int) {} // (output) number of events acked by the output batch -func (o *observer) outBatchACKed(int) {} +func (o *metricsObserver) outBatchACKed(int) {} + +type emptyObserver struct{} + +var nilObserver observer = (*emptyObserver)(nil) + +func (*emptyObserver) cleanup() {} +func (*emptyObserver) clientConnected() {} +func (*emptyObserver) clientClosing() {} +func (*emptyObserver) clientClosed() {} +func (*emptyObserver) newEvent() {} +func (*emptyObserver) filteredEvent() {} +func (*emptyObserver) publishedEvent() {} +func (*emptyObserver) failedPublishEvent() {} +func (*emptyObserver) queueACKed(n int) {} +func (*emptyObserver) updateOutputGroup() {} +func (*emptyObserver) eventsFailed(int) {} +func (*emptyObserver) eventsDropped(int) {} +func (*emptyObserver) eventsRetry(int) {} +func (*emptyObserver) outBatchSend(int) {} +func (*emptyObserver) outBatchACKed(int) {} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 11f8e39990bb..7995f8dbfef8 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -8,7 +8,7 @@ import ( // clientWorker manages output client of type outputs.Client, not supporting reconnect. type clientWorker struct { - observer *observer + observer outputObserver qu workQueue client outputs.Client closed atomic.Bool @@ -16,7 +16,7 @@ type clientWorker struct { // netClientWorker manages reconnectable output clients of type outputs.NetworkClient. type netClientWorker struct { - observer *observer + observer outputObserver qu workQueue client outputs.NetworkClient closed atomic.Bool @@ -25,13 +25,13 @@ type netClientWorker struct { batchSizer func() int } -func makeClientWorker(observer *observer, qu workQueue, client outputs.Client) outputWorker { +func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { if nc, ok := client.(outputs.NetworkClient); ok { c := &netClientWorker{observer: observer, qu: qu, client: nc} go c.run() return c } - c := &clientWorker{qu: qu, client: client} + c := &clientWorker{observer: observer, qu: qu, client: client} go c.run() return c } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index fee988641e36..e3ac209580cf 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -111,7 +111,7 @@ type pipelineEventer struct { mutex sync.Mutex modifyable bool - observer *observer + observer queueObserver waitClose *waitCloser cb *pipelineEventCB } @@ -157,6 +157,7 @@ func New( log := defaultLogger p := &Pipeline{ logger: log, + observer: nilObserver, waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, processors: pipelineProcessors{ @@ -169,7 +170,10 @@ func New( p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) - p.eventer.observer = &p.observer + if metrics != nil { + p.observer = newMetricsObserver(metrics) + } + p.eventer.observer = p.observer p.eventer.modifyable = true if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { @@ -185,9 +189,7 @@ func New( } p.eventSema = newSema(p.queue.BufferConfig().Events) - p.observer.init(metrics) - - p.output = newOutputController(log, &p.observer, p.queue) + p.output = newOutputController(log, p.observer, p.queue) p.output.Set(out) return p, nil diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index c9fab386f9ae..b15905bb07f5 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -14,7 +14,7 @@ import ( // outputs. type retryer struct { logger *logp.Logger - observer *observer + observer outputObserver done chan struct{} @@ -54,7 +54,7 @@ const ( func newRetryer( log *logp.Logger, - observer *observer, + observer outputObserver, out workQueue, c *eventConsumer, ) *retryer {