From 138c704383ca8ad64cf6b207ae26c8fa1452025f Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Fri, 20 May 2022 11:23:57 -0700 Subject: [PATCH 1/6] first pass at having the memqueue report metrics --- libbeat/publisher/queue/memqueue/broker.go | 20 ++++++++++++++++++- libbeat/publisher/queue/memqueue/eventloop.go | 7 +++++++ .../publisher/queue/memqueue/internal_api.go | 10 ++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 3a99664e827b..41400dc0f883 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -23,6 +23,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/opt" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" c "github.com/elastic/elastic-agent-libs/config" @@ -77,6 +78,12 @@ type broker struct { // if needed. ackListener queue.ACKListener + // This channel is used to request/return metrics where such metrics require insight into + // the actual eventloop itself. This seems like it might be overkill, but it seems that + // all communication between the broker and the eventloops + // happens via channels, so we're doing it this way. + metricChan chan metricsRequest + // wait group for worker shutdown wg sync.WaitGroup } @@ -185,6 +192,7 @@ func NewQueue( scheduledACKs: make(chan chanList), ackListener: settings.ACKListener, + metricChan: make(chan metricsRequest), } var eventLoop interface { @@ -256,7 +264,17 @@ func (b *broker) Get(count int) (queue.Batch, error) { } func (b *broker) Metrics() (queue.Metrics, error) { - return queue.Metrics{}, queue.ErrMetricsNotImplemented + + responseChan := make(chan memQueueMetrics, 1) + select { + case <-b.done: + return queue.Metrics{}, io.EOF + case b.metricChan <- metricsRequest{ + responseChan: responseChan}: + } + resp := <-responseChan + + return queue.Metrics{EventCount: opt.UintWith(uint64(resp.currentQueueSize))}, nil } var ackChanPool = sync.Pool{ diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index a2a3a16dd7d1..df01c65afaf9 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -121,6 +121,9 @@ func (l *directEventLoop) run() { case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) + case req := <-l.broker.metricChan: + l.handleMetricsRequest(&req) + case schedACKs <- l.pendingACKs: // on send complete list of pending batches has been forwarded -> clear list l.pendingACKs = chanList{} @@ -128,6 +131,10 @@ func (l *directEventLoop) run() { } } +func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) { + req.responseChan <- memQueueMetrics{currentQueueSize: len(l.buf.entries)} +} + // Returns true if the queue is full after handling the insertion request. func (l *directEventLoop) insert(req *pushRequest) { log := l.broker.logger diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 8d4571ce225f..5f454dbdfbc1 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -49,3 +49,13 @@ type getResponse struct { } type batchAckMsg struct{} + +// Metrics API + +type metricsRequest struct { + responseChan chan memQueueMetrics +} + +type memQueueMetrics struct { + currentQueueSize int +} From c145fefb02b8be39c2e7389be46703768ae85506 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Fri, 20 May 2022 15:32:27 -0700 Subject: [PATCH 2/6] add sort of a test --- libbeat/publisher/queue/memqueue/eventloop.go | 9 +++++++- .../publisher/queue/queuetest/queuetest.go | 23 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index df01c65afaf9..6b5ae795697d 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -121,7 +121,7 @@ func (l *directEventLoop) run() { case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) - case req := <-l.broker.metricChan: + case req := <-l.broker.metricChan: // broker asking for queue metrics l.handleMetricsRequest(&req) case schedACKs <- l.pendingACKs: @@ -309,6 +309,9 @@ func (l *bufferingEventLoop) run() { case count := <-l.broker.ackChan: l.handleACK(count) + case req := <-l.broker.metricChan: // broker asking for queue metrics + l.handleMetricsRequest(&req) + case <-l.idleC: l.idleC = nil l.timer.Stop() @@ -319,6 +322,10 @@ func (l *bufferingEventLoop) run() { } } +func (l *bufferingEventLoop) handleMetricsRequest(req *metricsRequest) { + req.responseChan <- memQueueMetrics{currentQueueSize: l.eventCount} +} + func (l *bufferingEventLoop) handleInsert(req *pushRequest) { if l.insert(req) { l.eventCount++ diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 29b1da7a7d28..88d4d937fea4 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -18,11 +18,15 @@ package queuetest import ( + "errors" + "io" "sync" "testing" + "time" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/stretchr/testify/assert" ) // QueueFactory is used to create a per test queue instance. @@ -209,12 +213,29 @@ func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) { go test.producers(&wg, nil, log, queue)() go test.consumers(&wg, nil, log, queue)() - + go testFetchMetrics(t, queue) wg.Wait() })) } } +func testFetchMetrics(t *testing.T, mon queue.Queue) { + _, err := mon.Metrics() + if errors.Is(err, queue.ErrMetricsNotImplemented) { + return + } + + for { + metrics, err := mon.Metrics() + if errors.Is(err, io.EOF) { + continue + } + t.Logf("Got event count: %d/%d", metrics.EventCount.ValueOr(0), metrics.ByteCount.ValueOr(0)) + assert.True(t, metrics.EventCount.Exists() || metrics.ByteCount.Exists()) + time.Sleep(time.Second) + } +} + func multiple( fns ...workerFactory, ) workerFactory { From b529717872402815cf68c0327c214ec79b8afc14 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Mon, 23 May 2022 11:59:40 -0700 Subject: [PATCH 3/6] format --- libbeat/publisher/queue/queuetest/queuetest.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 88d4d937fea4..2897c4a38421 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -24,9 +24,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/stretchr/testify/assert" ) // QueueFactory is used to create a per test queue instance. From 5ec8af1e8a2f5678a0d73df6a5659d9501f64bd3 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 26 May 2022 11:07:14 -0700 Subject: [PATCH 4/6] fix length for ring buffer, clean up tests --- libbeat/publisher/queue/memqueue/eventloop.go | 2 +- libbeat/publisher/queue/memqueue/ringbuf.go | 5 +++++ libbeat/publisher/queue/queuetest/queuetest.go | 16 +++++++--------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index c1c503d47c04..c89bd355ece0 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -133,7 +133,7 @@ func (l *directEventLoop) run() { } func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) { - req.responseChan <- memQueueMetrics{currentQueueSize: len(l.buf.entries)} + req.responseChan <- memQueueMetrics{currentQueueSize: l.buf.Items()} } // Returns true if the queue is full after handling the insertion request. diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index 8cbae774f625..aceea52ec1c7 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -209,3 +209,8 @@ func (b *ringBuffer) Full() bool { func (b *ringBuffer) Size() int { return len(b.entries) } + +// Items returns the count of events currently in the buffer +func (b *ringBuffer) Items() int { + return b.regA.size + b.regB.size +} diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index ba5fe2bdf5bd..7c69fcf242bb 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -22,7 +22,6 @@ import ( "io" "sync" "testing" - "time" "github.com/stretchr/testify/assert" @@ -226,15 +225,14 @@ func testFetchMetrics(t *testing.T, mon queue.Queue) { return } - for { - metrics, err := mon.Metrics() - if errors.Is(err, io.EOF) { - continue - } - t.Logf("Got event count: %d/%d", metrics.EventCount.ValueOr(0), metrics.ByteCount.ValueOr(0)) - assert.True(t, metrics.EventCount.Exists() || metrics.ByteCount.Exists()) - time.Sleep(time.Second) + metrics, err := mon.Metrics() + // EOF is returned if the queue is closing, so the only "good" error is that + if err != nil { + assert.ErrorIs(t, err, io.EOF) } + + assert.True(t, metrics.EventCount.Exists() || metrics.ByteCount.Exists()) + } func multiple( From 833454a3ef8ab7694b41d95fda82555bd1d80869 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 26 May 2022 15:06:13 -0700 Subject: [PATCH 5/6] add tests, occupiedRead --- libbeat/publisher/queue/memqueue/broker.go | 6 +- libbeat/publisher/queue/memqueue/eventloop.go | 9 ++- .../publisher/queue/memqueue/internal_api.go | 4 ++ .../publisher/queue/memqueue/queue_test.go | 60 +++++++++++++++++++ libbeat/publisher/queue/queue.go | 3 + libbeat/publisher/queue/queuetest/event.go | 2 +- .../queue/queuetest/producer_cancel.go | 4 +- .../publisher/queue/queuetest/queuetest.go | 2 +- 8 files changed, 83 insertions(+), 7 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b713edb9c317..e367d8c8426f 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -267,7 +267,11 @@ func (b *broker) Metrics() (queue.Metrics, error) { } resp := <-responseChan - return queue.Metrics{EventCount: opt.UintWith(uint64(resp.currentQueueSize))}, nil + return queue.Metrics{ + EventCount: opt.UintWith(uint64(resp.currentQueueSize)), + EventLimit: opt.UintWith(uint64(b.bufSize)), + OccupiedRead: opt.UintWith(uint64(resp.occupiedRead)), + }, nil } var ackChanPool = sync.Pool{ diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index c89bd355ece0..c7e8ec07f12e 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -49,6 +49,9 @@ type bufferingEventLoop struct { // those that have not yet been acked. eventCount int + // The number of events that have been read by a consumer but not yet acked + unackedEventCount int + minEvents int maxEvents int flushTimeout time.Duration @@ -133,7 +136,7 @@ func (l *directEventLoop) run() { } func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) { - req.responseChan <- memQueueMetrics{currentQueueSize: l.buf.Items()} + req.responseChan <- memQueueMetrics{currentQueueSize: l.buf.Items(), occupiedRead: l.buf.reserved} } // Returns true if the queue is full after handling the insertion request. @@ -320,7 +323,7 @@ func (l *bufferingEventLoop) run() { } func (l *bufferingEventLoop) handleMetricsRequest(req *metricsRequest) { - req.responseChan <- memQueueMetrics{currentQueueSize: l.eventCount} + req.responseChan <- memQueueMetrics{currentQueueSize: l.eventCount, occupiedRead: l.unackedEventCount} } func (l *bufferingEventLoop) handleInsert(req *pushRequest) { @@ -422,6 +425,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { req.responseChan <- getResponse{acker.ackChan, entries} l.pendingACKs.append(acker) + l.unackedEventCount += len(entries) buf.entries = buf.entries[count:] if buf.length() == 0 { l.advanceFlushList() @@ -429,6 +433,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { } func (l *bufferingEventLoop) handleACK(count int) { + l.unackedEventCount -= count l.eventCount -= count } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index a11c36eb93c9..4059b71aadde 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -54,6 +54,10 @@ type metricsRequest struct { responseChan chan memQueueMetrics } +// memQueueMetrics tracks metrics that are returned by the individual memory queue implementations type memQueueMetrics struct { + // the size of items in the queue currentQueueSize int + // the number of items that have been read by a consumer but not yet ack'ed + occupiedRead int } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 84c353e5cd7a..28d520e576bf 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -19,6 +19,7 @@ package memqueue import ( "flag" + "fmt" "math" "math/rand" "testing" @@ -28,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" + "github.com/elastic/elastic-agent-libs/mapstr" ) var seed int64 @@ -74,6 +76,64 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } +func TestQueueMetrics(t *testing.T) { + eventsToTest := 5 + maxEvents := 10 + + // Test the directEventLoop + directSettings := Settings{ + Events: maxEvents, + FlushMinEvents: 1, + FlushTimeout: 0, + } + t.Logf("Testing directEventLoop") + queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop") + + // Test Buffered Event Loop + bufferedSettings := Settings{ + Events: maxEvents, + FlushMinEvents: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get() + FlushTimeout: time.Millisecond, + } + t.Logf("Testing bufferedEventLoop") + queueTestWithSettings(t, bufferedSettings, eventsToTest, "bufferedEventLoop") +} + +func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { + testQueue := NewQueue(nil, settings) + defer testQueue.Close() + + // Send events to queue + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < eventsToTest; i++ { + producer.Publish(queuetest.MakeEvent(mapstr.M{"count": i})) + } + queueMetricsAreValid(t, testQueue, 5, settings.Events, 0, fmt.Sprintf("%s - First send of metrics to queue", testName)) + + // Read events, don't yet ack them + batch, err := testQueue.Get(eventsToTest) + assert.NilError(t, err, "error in Get") + t.Logf("Got batch of %d events", batch.Count()) + + queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) + + // Test metrics after ack + batch.ACK() + + queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) + +} + +func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occupied int, test string) { + // wait briefly to avoid races across all the queue channels + time.Sleep(time.Millisecond * 100) + testMetrics, err := q.Metrics() + assert.NilError(t, err, "error calling metrics for test %s", test) + assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) + assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) + assert.Equal(t, testMetrics.OccupiedRead.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) +} + func TestProducerCancelRemovesEvents(t *testing.T) { queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0)) } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 9ca660ba28ea..d9bb4da8ab36 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -46,6 +46,9 @@ type Metrics struct { //EventLimit is the user-configured event limit of the queue EventLimit opt.Uint + //OccupiedRead is the count of events that an output consumer has read, but not yet ack'ed + OccupiedRead opt.Uint + //OldestActiveTimestamp is the timestamp of the oldest item in the queue. OldestActiveTimestamp common.Time } diff --git a/libbeat/publisher/queue/queuetest/event.go b/libbeat/publisher/queue/queuetest/event.go index dbe6841d6b61..9cfc0de11bd0 100644 --- a/libbeat/publisher/queue/queuetest/event.go +++ b/libbeat/publisher/queue/queuetest/event.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -func makeEvent(fields mapstr.M) publisher.Event { +func MakeEvent(fields mapstr.M) publisher.Event { return publisher.Event{ Content: beat.Event{ Timestamp: time.Now(), diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go index 628080854cb8..2b44f538854c 100644 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -53,7 +53,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { for ; i < N1; i++ { log.Debugf("send event %v to first producer", i) - producer.Publish(makeEvent(mapstr.M{ + producer.Publish(MakeEvent(mapstr.M{ "value": i, })) } @@ -67,7 +67,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { producer = b.Producer(queue.ProducerConfig{}) for ; i < N2; i++ { log.Debugf("send event %v to new producer", i) - producer.Publish(makeEvent(mapstr.M{ + producer.Publish(MakeEvent(mapstr.M{ "value": i, })) } diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 7c69fcf242bb..0fb95b1dfbe1 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -289,7 +289,7 @@ func makeProducer( }) for i := 0; i < maxEvents; i++ { log.Debug("publish event", i) - producer.Publish(makeEvent(makeFields(i))) + producer.Publish(MakeEvent(makeFields(i))) } ackWG.Wait() From 50d7031abae04a0f6439af6c6e6a9c8101beb477 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 31 May 2022 10:38:15 -0700 Subject: [PATCH 6/6] change field names, separate tests --- libbeat/publisher/queue/memqueue/broker.go | 6 +++--- libbeat/publisher/queue/memqueue/queue_test.go | 9 +++++++-- libbeat/publisher/queue/queue.go | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index e367d8c8426f..e5e7a984046f 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -268,9 +268,9 @@ func (b *broker) Metrics() (queue.Metrics, error) { resp := <-responseChan return queue.Metrics{ - EventCount: opt.UintWith(uint64(resp.currentQueueSize)), - EventLimit: opt.UintWith(uint64(b.bufSize)), - OccupiedRead: opt.UintWith(uint64(resp.occupiedRead)), + EventCount: opt.UintWith(uint64(resp.currentQueueSize)), + EventLimit: opt.UintWith(uint64(b.bufSize)), + UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)), }, nil } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 28d520e576bf..54747883ce97 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -76,7 +76,7 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } -func TestQueueMetrics(t *testing.T) { +func TestQueueMetricsDirect(t *testing.T) { eventsToTest := 5 maxEvents := 10 @@ -89,6 +89,11 @@ func TestQueueMetrics(t *testing.T) { t.Logf("Testing directEventLoop") queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop") +} + +func TestQueueMetricsBuffer(t *testing.T) { + eventsToTest := 5 + maxEvents := 10 // Test Buffered Event Loop bufferedSettings := Settings{ Events: maxEvents, @@ -131,7 +136,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup assert.NilError(t, err, "error calling metrics for test %s", test) assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) - assert.Equal(t, testMetrics.OccupiedRead.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) + assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) } func TestProducerCancelRemovesEvents(t *testing.T) { diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index d9bb4da8ab36..8184e31ef2b4 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -46,8 +46,8 @@ type Metrics struct { //EventLimit is the user-configured event limit of the queue EventLimit opt.Uint - //OccupiedRead is the count of events that an output consumer has read, but not yet ack'ed - OccupiedRead opt.Uint + //UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed + UnackedConsumedEvents opt.Uint //OldestActiveTimestamp is the timestamp of the oldest item in the queue. OldestActiveTimestamp common.Time