diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index dafcf1f08ed..e5e7a984046 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -23,6 +23,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/opt" "github.com/elastic/beats/v7/libbeat/publisher/queue" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -76,6 +77,12 @@ type broker struct { // if needed. ackListener queue.ACKListener + // This channel is used to request/return metrics where such metrics require insight into + // the actual eventloop itself. This seems like it might be overkill, but it seems that + // all communication between the broker and the eventloops + // happens via channels, so we're doing it this way. + metricChan chan metricsRequest + // wait group for worker shutdown wg sync.WaitGroup } @@ -184,6 +191,7 @@ func NewQueue( scheduledACKs: make(chan chanList), ackListener: settings.ACKListener, + metricChan: make(chan metricsRequest), } var eventLoop interface { @@ -249,7 +257,21 @@ func (b *broker) Get(count int) (queue.Batch, error) { } func (b *broker) Metrics() (queue.Metrics, error) { - return queue.Metrics{}, queue.ErrMetricsNotImplemented + + responseChan := make(chan memQueueMetrics, 1) + select { + case <-b.done: + return queue.Metrics{}, io.EOF + case b.metricChan <- metricsRequest{ + responseChan: responseChan}: + } + resp := <-responseChan + + return queue.Metrics{ + EventCount: opt.UintWith(uint64(resp.currentQueueSize)), + EventLimit: opt.UintWith(uint64(b.bufSize)), + UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)), + }, nil } var ackChanPool = sync.Pool{ diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 6b16cd7aefd..c7e8ec07f12 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -49,6 +49,9 @@ type bufferingEventLoop struct { // those that have not yet been acked. eventCount int + // The number of events that have been read by a consumer but not yet acked + unackedEventCount int + minEvents int maxEvents int flushTimeout time.Duration @@ -122,6 +125,9 @@ func (l *directEventLoop) run() { case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) + case req := <-l.broker.metricChan: // broker asking for queue metrics + l.handleMetricsRequest(&req) + case schedACKs <- l.pendingACKs: // on send complete list of pending batches has been forwarded -> clear list l.pendingACKs = chanList{} @@ -129,6 +135,10 @@ func (l *directEventLoop) run() { } } +func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) { + req.responseChan <- memQueueMetrics{currentQueueSize: l.buf.Items(), occupiedRead: l.buf.reserved} +} + // Returns true if the queue is full after handling the insertion request. func (l *directEventLoop) insert(req *pushRequest) { log := l.broker.logger @@ -299,6 +309,9 @@ func (l *bufferingEventLoop) run() { case count := <-l.broker.ackChan: l.handleACK(count) + case req := <-l.broker.metricChan: // broker asking for queue metrics + l.handleMetricsRequest(&req) + case <-l.idleC: l.idleC = nil l.timer.Stop() @@ -309,6 +322,10 @@ func (l *bufferingEventLoop) run() { } } +func (l *bufferingEventLoop) handleMetricsRequest(req *metricsRequest) { + req.responseChan <- memQueueMetrics{currentQueueSize: l.eventCount, occupiedRead: l.unackedEventCount} +} + func (l *bufferingEventLoop) handleInsert(req *pushRequest) { if l.insert(req) { l.eventCount++ @@ -408,6 +425,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { req.responseChan <- getResponse{acker.ackChan, entries} l.pendingACKs.append(acker) + l.unackedEventCount += len(entries) buf.entries = buf.entries[count:] if buf.length() == 0 { l.advanceFlushList() @@ -415,6 +433,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { } func (l *bufferingEventLoop) handleACK(count int) { + l.unackedEventCount -= count l.eventCount -= count } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 00580d4d39a..4059b71aadd 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -47,3 +47,17 @@ type getResponse struct { } type batchAckMsg struct{} + +// Metrics API + +type metricsRequest struct { + responseChan chan memQueueMetrics +} + +// memQueueMetrics tracks metrics that are returned by the individual memory queue implementations +type memQueueMetrics struct { + // the size of items in the queue + currentQueueSize int + // the number of items that have been read by a consumer but not yet ack'ed + occupiedRead int +} diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 84c353e5cd7..54747883ce9 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -19,6 +19,7 @@ package memqueue import ( "flag" + "fmt" "math" "math/rand" "testing" @@ -28,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" + "github.com/elastic/elastic-agent-libs/mapstr" ) var seed int64 @@ -74,6 +76,69 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } +func TestQueueMetricsDirect(t *testing.T) { + eventsToTest := 5 + maxEvents := 10 + + // Test the directEventLoop + directSettings := Settings{ + Events: maxEvents, + FlushMinEvents: 1, + FlushTimeout: 0, + } + t.Logf("Testing directEventLoop") + queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop") + +} + +func TestQueueMetricsBuffer(t *testing.T) { + eventsToTest := 5 + maxEvents := 10 + // Test Buffered Event Loop + bufferedSettings := Settings{ + Events: maxEvents, + FlushMinEvents: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get() + FlushTimeout: time.Millisecond, + } + t.Logf("Testing bufferedEventLoop") + queueTestWithSettings(t, bufferedSettings, eventsToTest, "bufferedEventLoop") +} + +func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { + testQueue := NewQueue(nil, settings) + defer testQueue.Close() + + // Send events to queue + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < eventsToTest; i++ { + producer.Publish(queuetest.MakeEvent(mapstr.M{"count": i})) + } + queueMetricsAreValid(t, testQueue, 5, settings.Events, 0, fmt.Sprintf("%s - First send of metrics to queue", testName)) + + // Read events, don't yet ack them + batch, err := testQueue.Get(eventsToTest) + assert.NilError(t, err, "error in Get") + t.Logf("Got batch of %d events", batch.Count()) + + queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) + + // Test metrics after ack + batch.ACK() + + queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) + +} + +func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occupied int, test string) { + // wait briefly to avoid races across all the queue channels + time.Sleep(time.Millisecond * 100) + testMetrics, err := q.Metrics() + assert.NilError(t, err, "error calling metrics for test %s", test) + assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test) + assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test) + assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) +} + func TestProducerCancelRemovesEvents(t *testing.T) { queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0)) } diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index 8cbae774f62..aceea52ec1c 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -209,3 +209,8 @@ func (b *ringBuffer) Full() bool { func (b *ringBuffer) Size() int { return len(b.entries) } + +// Items returns the count of events currently in the buffer +func (b *ringBuffer) Items() int { + return b.regA.size + b.regB.size +} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 9ca660ba28e..8184e31ef2b 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -46,6 +46,9 @@ type Metrics struct { //EventLimit is the user-configured event limit of the queue EventLimit opt.Uint + //UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed + UnackedConsumedEvents opt.Uint + //OldestActiveTimestamp is the timestamp of the oldest item in the queue. OldestActiveTimestamp common.Time } diff --git a/libbeat/publisher/queue/queuetest/event.go b/libbeat/publisher/queue/queuetest/event.go index dbe6841d6b6..9cfc0de11bd 100644 --- a/libbeat/publisher/queue/queuetest/event.go +++ b/libbeat/publisher/queue/queuetest/event.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -func makeEvent(fields mapstr.M) publisher.Event { +func MakeEvent(fields mapstr.M) publisher.Event { return publisher.Event{ Content: beat.Event{ Timestamp: time.Now(), diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go index 628080854cb..2b44f538854 100644 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -53,7 +53,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { for ; i < N1; i++ { log.Debugf("send event %v to first producer", i) - producer.Publish(makeEvent(mapstr.M{ + producer.Publish(MakeEvent(mapstr.M{ "value": i, })) } @@ -67,7 +67,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { producer = b.Producer(queue.ProducerConfig{}) for ; i < N2; i++ { log.Debugf("send event %v to new producer", i) - producer.Publish(makeEvent(mapstr.M{ + producer.Publish(MakeEvent(mapstr.M{ "value": i, })) } diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index eb65e0259d0..0fb95b1dfbe 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -18,9 +18,13 @@ package queuetest import ( + "errors" + "io" "sync" "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -209,12 +213,28 @@ func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) { go test.producers(&wg, nil, log, queue)() go test.consumers(&wg, nil, log, queue)() - + go testFetchMetrics(t, queue) wg.Wait() })) } } +func testFetchMetrics(t *testing.T, mon queue.Queue) { + _, err := mon.Metrics() + if errors.Is(err, queue.ErrMetricsNotImplemented) { + return + } + + metrics, err := mon.Metrics() + // EOF is returned if the queue is closing, so the only "good" error is that + if err != nil { + assert.ErrorIs(t, err, io.EOF) + } + + assert.True(t, metrics.EventCount.Exists() || metrics.ByteCount.Exists()) + +} + func multiple( fns ...workerFactory, ) workerFactory { @@ -269,7 +289,7 @@ func makeProducer( }) for i := 0; i < maxEvents; i++ { log.Debug("publish event", i) - producer.Publish(makeEvent(makeFields(i))) + producer.Publish(MakeEvent(makeFields(i))) } ackWG.Wait()