Skip to content

Commit

Permalink
Add basic metrics reporting to the memqueue (#31703)
Browse files Browse the repository at this point in the history
* first pass at having the memqueue report metrics

* add sort of a test

* format

* fix length for ring buffer, clean up tests

* add tests, occupiedRead

* change field names, separate tests
  • Loading branch information
fearful-symmetry authored May 31, 2022
1 parent eacde0d commit a160e52
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 6 deletions.
24 changes: 23 additions & 1 deletion libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/opt"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
c "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -76,6 +77,12 @@ type broker struct {
// if needed.
ackListener queue.ACKListener

// This channel is used to request/return metrics where such metrics require insight into
// the actual eventloop itself. This seems like it might be overkill, but it seems that
// all communication between the broker and the eventloops
// happens via channels, so we're doing it this way.
metricChan chan metricsRequest

// wait group for worker shutdown
wg sync.WaitGroup
}
Expand Down Expand Up @@ -184,6 +191,7 @@ func NewQueue(
scheduledACKs: make(chan chanList),

ackListener: settings.ACKListener,
metricChan: make(chan metricsRequest),
}

var eventLoop interface {
Expand Down Expand Up @@ -249,7 +257,21 @@ func (b *broker) Get(count int) (queue.Batch, error) {
}

func (b *broker) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented

responseChan := make(chan memQueueMetrics, 1)
select {
case <-b.done:
return queue.Metrics{}, io.EOF
case b.metricChan <- metricsRequest{
responseChan: responseChan}:
}
resp := <-responseChan

return queue.Metrics{
EventCount: opt.UintWith(uint64(resp.currentQueueSize)),
EventLimit: opt.UintWith(uint64(b.bufSize)),
UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)),
}, nil
}

var ackChanPool = sync.Pool{
Expand Down
19 changes: 19 additions & 0 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type bufferingEventLoop struct {
// those that have not yet been acked.
eventCount int

// The number of events that have been read by a consumer but not yet acked
unackedEventCount int

minEvents int
maxEvents int
flushTimeout time.Duration
Expand Down Expand Up @@ -122,13 +125,20 @@ func (l *directEventLoop) run() {
case req := <-getChan: // consumer asking for next batch
l.handleGetRequest(&req)

case req := <-l.broker.metricChan: // broker asking for queue metrics
l.handleMetricsRequest(&req)

case schedACKs <- l.pendingACKs:
// on send complete list of pending batches has been forwarded -> clear list
l.pendingACKs = chanList{}
}
}
}

func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) {
req.responseChan <- memQueueMetrics{currentQueueSize: l.buf.Items(), occupiedRead: l.buf.reserved}
}

// Returns true if the queue is full after handling the insertion request.
func (l *directEventLoop) insert(req *pushRequest) {
log := l.broker.logger
Expand Down Expand Up @@ -299,6 +309,9 @@ func (l *bufferingEventLoop) run() {
case count := <-l.broker.ackChan:
l.handleACK(count)

case req := <-l.broker.metricChan: // broker asking for queue metrics
l.handleMetricsRequest(&req)

case <-l.idleC:
l.idleC = nil
l.timer.Stop()
Expand All @@ -309,6 +322,10 @@ func (l *bufferingEventLoop) run() {
}
}

func (l *bufferingEventLoop) handleMetricsRequest(req *metricsRequest) {
req.responseChan <- memQueueMetrics{currentQueueSize: l.eventCount, occupiedRead: l.unackedEventCount}
}

func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
if l.insert(req) {
l.eventCount++
Expand Down Expand Up @@ -408,13 +425,15 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) {
req.responseChan <- getResponse{acker.ackChan, entries}
l.pendingACKs.append(acker)

l.unackedEventCount += len(entries)
buf.entries = buf.entries[count:]
if buf.length() == 0 {
l.advanceFlushList()
}
}

func (l *bufferingEventLoop) handleACK(count int) {
l.unackedEventCount -= count
l.eventCount -= count
}

Expand Down
14 changes: 14 additions & 0 deletions libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,17 @@ type getResponse struct {
}

type batchAckMsg struct{}

// Metrics API

type metricsRequest struct {
responseChan chan memQueueMetrics
}

// memQueueMetrics tracks metrics that are returned by the individual memory queue implementations
type memQueueMetrics struct {
// the size of items in the queue
currentQueueSize int
// the number of items that have been read by a consumer but not yet ack'ed
occupiedRead int
}
65 changes: 65 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package memqueue

import (
"flag"
"fmt"
"math"
"math/rand"
"testing"
Expand All @@ -28,6 +29,7 @@ import (

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var seed int64
Expand Down Expand Up @@ -71,6 +73,69 @@ func TestProduceConsumer(t *testing.T) {
t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond)))
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10

// Test the directEventLoop
directSettings := Settings{
Events: maxEvents,
FlushMinEvents: 1,
FlushTimeout: 0,
}
t.Logf("Testing directEventLoop")
queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop")

}

func TestQueueMetricsBuffer(t *testing.T) {
eventsToTest := 5
maxEvents := 10
// Test Buffered Event Loop
bufferedSettings := Settings{
Events: maxEvents,
FlushMinEvents: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get()
FlushTimeout: time.Millisecond,
}
t.Logf("Testing bufferedEventLoop")
queueTestWithSettings(t, bufferedSettings, eventsToTest, "bufferedEventLoop")
}

func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) {
testQueue := NewQueue(nil, settings)
defer testQueue.Close()

// Send events to queue
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < eventsToTest; i++ {
producer.Publish(queuetest.MakeEvent(mapstr.M{"count": i}))
}
queueMetricsAreValid(t, testQueue, 5, settings.Events, 0, fmt.Sprintf("%s - First send of metrics to queue", testName))

// Read events, don't yet ack them
batch, err := testQueue.Get(eventsToTest)
assert.NilError(t, err, "error in Get")
t.Logf("Got batch of %d events", batch.Count())

queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

// Test metrics after ack
batch.ACK()

queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

}

func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occupied int, test string) {
// wait briefly to avoid races across all the queue channels
time.Sleep(time.Millisecond * 100)
testMetrics, err := q.Metrics()
assert.NilError(t, err, "error calling metrics for test %s", test)
assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test)
assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test)
assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test)
}

func TestProducerCancelRemovesEvents(t *testing.T) {
queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0))
}
Expand Down
5 changes: 5 additions & 0 deletions libbeat/publisher/queue/memqueue/ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,8 @@ func (b *ringBuffer) Full() bool {
func (b *ringBuffer) Size() int {
return len(b.entries)
}

// Items returns the count of events currently in the buffer
func (b *ringBuffer) Items() int {
return b.regA.size + b.regB.size
}
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Metrics struct {
//EventLimit is the user-configured event limit of the queue
EventLimit opt.Uint

//UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed
UnackedConsumedEvents opt.Uint

//OldestActiveTimestamp is the timestamp of the oldest item in the queue.
OldestActiveTimestamp common.Time
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queuetest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

func makeEvent(fields mapstr.M) publisher.Event {
func MakeEvent(fields mapstr.M) publisher.Event {
return publisher.Event{
Content: beat.Event{
Timestamp: time.Now(),
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/queuetest/producer_cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {

for ; i < N1; i++ {
log.Debugf("send event %v to first producer", i)
producer.Publish(makeEvent(mapstr.M{
producer.Publish(MakeEvent(mapstr.M{
"value": i,
}))
}
Expand All @@ -67,7 +67,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {
producer = b.Producer(queue.ProducerConfig{})
for ; i < N2; i++ {
log.Debugf("send event %v to new producer", i)
producer.Publish(makeEvent(mapstr.M{
producer.Publish(MakeEvent(mapstr.M{
"value": i,
}))
}
Expand Down
24 changes: 22 additions & 2 deletions libbeat/publisher/queue/queuetest/queuetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package queuetest

import (
"errors"
"io"
"sync"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -209,12 +213,28 @@ func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) {

go test.producers(&wg, nil, log, queue)()
go test.consumers(&wg, nil, log, queue)()

go testFetchMetrics(t, queue)
wg.Wait()
}))
}
}

func testFetchMetrics(t *testing.T, mon queue.Queue) {
_, err := mon.Metrics()
if errors.Is(err, queue.ErrMetricsNotImplemented) {
return
}

metrics, err := mon.Metrics()
// EOF is returned if the queue is closing, so the only "good" error is that
if err != nil {
assert.ErrorIs(t, err, io.EOF)
}

assert.True(t, metrics.EventCount.Exists() || metrics.ByteCount.Exists())

}

func multiple(
fns ...workerFactory,
) workerFactory {
Expand Down Expand Up @@ -269,7 +289,7 @@ func makeProducer(
})
for i := 0; i < maxEvents; i++ {
log.Debug("publish event", i)
producer.Publish(makeEvent(makeFields(i)))
producer.Publish(MakeEvent(makeFields(i)))
}

ackWG.Wait()
Expand Down

0 comments on commit a160e52

Please sign in to comment.