Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic metrics reporting to the memqueue #31703

Merged
merged 8 commits into from
May 31, 2022
24 changes: 23 additions & 1 deletion libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/opt"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
c "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -76,6 +77,12 @@ type broker struct {
// if needed.
ackListener queue.ACKListener

// This channel is used to request/return metrics where such metrics require insight into
// the actual eventloop itself. This seems like it might be overkill, but it seems that
// all communication between the broker and the eventloops
// happens via channels, so we're doing it this way.
metricChan chan metricsRequest

// wait group for worker shutdown
wg sync.WaitGroup
}
Expand Down Expand Up @@ -184,6 +191,7 @@ func NewQueue(
scheduledACKs: make(chan chanList),

ackListener: settings.ACKListener,
metricChan: make(chan metricsRequest),
}

var eventLoop interface {
Expand Down Expand Up @@ -249,7 +257,21 @@ func (b *broker) Get(count int) (queue.Batch, error) {
}

func (b *broker) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented

responseChan := make(chan memQueueMetrics, 1)
select {
case <-b.done:
return queue.Metrics{}, io.EOF
case b.metricChan <- metricsRequest{
responseChan: responseChan}:
}
resp := <-responseChan

return queue.Metrics{
EventCount: opt.UintWith(uint64(resp.currentQueueSize)),
EventLimit: opt.UintWith(uint64(b.bufSize)),
UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)),
}, nil
}

var ackChanPool = sync.Pool{
Expand Down
19 changes: 19 additions & 0 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type bufferingEventLoop struct {
// those that have not yet been acked.
eventCount int

// The number of events that have been read by a consumer but not yet acked
unackedEventCount int

minEvents int
maxEvents int
flushTimeout time.Duration
Expand Down Expand Up @@ -122,13 +125,20 @@ func (l *directEventLoop) run() {
case req := <-getChan: // consumer asking for next batch
l.handleGetRequest(&req)

case req := <-l.broker.metricChan: // broker asking for queue metrics
l.handleMetricsRequest(&req)

case schedACKs <- l.pendingACKs:
// on send complete list of pending batches has been forwarded -> clear list
l.pendingACKs = chanList{}
}
}
}

func (l *directEventLoop) handleMetricsRequest(req *metricsRequest) {
req.responseChan <- memQueueMetrics{currentQueueSize: l.buf.Items(), occupiedRead: l.buf.reserved}
}

// Returns true if the queue is full after handling the insertion request.
func (l *directEventLoop) insert(req *pushRequest) {
log := l.broker.logger
Expand Down Expand Up @@ -299,6 +309,9 @@ func (l *bufferingEventLoop) run() {
case count := <-l.broker.ackChan:
l.handleACK(count)

case req := <-l.broker.metricChan: // broker asking for queue metrics
l.handleMetricsRequest(&req)

case <-l.idleC:
l.idleC = nil
l.timer.Stop()
Expand All @@ -309,6 +322,10 @@ func (l *bufferingEventLoop) run() {
}
}

func (l *bufferingEventLoop) handleMetricsRequest(req *metricsRequest) {
req.responseChan <- memQueueMetrics{currentQueueSize: l.eventCount, occupiedRead: l.unackedEventCount}
}

func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
if l.insert(req) {
l.eventCount++
Expand Down Expand Up @@ -408,13 +425,15 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) {
req.responseChan <- getResponse{acker.ackChan, entries}
l.pendingACKs.append(acker)

l.unackedEventCount += len(entries)
buf.entries = buf.entries[count:]
if buf.length() == 0 {
l.advanceFlushList()
}
}

func (l *bufferingEventLoop) handleACK(count int) {
l.unackedEventCount -= count
l.eventCount -= count
}

Expand Down
14 changes: 14 additions & 0 deletions libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,17 @@ type getResponse struct {
}

type batchAckMsg struct{}

// Metrics API

type metricsRequest struct {
responseChan chan memQueueMetrics
}

// memQueueMetrics tracks metrics that are returned by the individual memory queue implementations
type memQueueMetrics struct {
// the size of items in the queue
currentQueueSize int
// the number of items that have been read by a consumer but not yet ack'ed
occupiedRead int
}
65 changes: 65 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package memqueue

import (
"flag"
"fmt"
"math"
"math/rand"
"testing"
Expand All @@ -28,6 +29,7 @@ import (

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var seed int64
Expand Down Expand Up @@ -74,6 +76,69 @@ func TestProduceConsumer(t *testing.T) {
t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond)))
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10

// Test the directEventLoop
directSettings := Settings{
Events: maxEvents,
FlushMinEvents: 1,
FlushTimeout: 0,
}
t.Logf("Testing directEventLoop")
queueTestWithSettings(t, directSettings, eventsToTest, "directEventLoop")

}

func TestQueueMetricsBuffer(t *testing.T) {
eventsToTest := 5
maxEvents := 10
// Test Buffered Event Loop
bufferedSettings := Settings{
Events: maxEvents,
FlushMinEvents: eventsToTest, // The buffered event loop can only return FlushMinEvents per Get()
FlushTimeout: time.Millisecond,
}
t.Logf("Testing bufferedEventLoop")
queueTestWithSettings(t, bufferedSettings, eventsToTest, "bufferedEventLoop")
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}

func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) {
testQueue := NewQueue(nil, settings)
defer testQueue.Close()

// Send events to queue
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < eventsToTest; i++ {
producer.Publish(queuetest.MakeEvent(mapstr.M{"count": i}))
}
queueMetricsAreValid(t, testQueue, 5, settings.Events, 0, fmt.Sprintf("%s - First send of metrics to queue", testName))

// Read events, don't yet ack them
batch, err := testQueue.Get(eventsToTest)
assert.NilError(t, err, "error in Get")
t.Logf("Got batch of %d events", batch.Count())

queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

// Test metrics after ack
batch.ACK()

queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

}

func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occupied int, test string) {
// wait briefly to avoid races across all the queue channels
time.Sleep(time.Millisecond * 100)
testMetrics, err := q.Metrics()
assert.NilError(t, err, "error calling metrics for test %s", test)
assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test)
assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test)
assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test)
}

func TestProducerCancelRemovesEvents(t *testing.T) {
queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0))
}
Expand Down
5 changes: 5 additions & 0 deletions libbeat/publisher/queue/memqueue/ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,8 @@ func (b *ringBuffer) Full() bool {
func (b *ringBuffer) Size() int {
return len(b.entries)
}

// Items returns the count of events currently in the buffer
func (b *ringBuffer) Items() int {
return b.regA.size + b.regB.size
}
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Metrics struct {
//EventLimit is the user-configured event limit of the queue
EventLimit opt.Uint

//UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed
UnackedConsumedEvents opt.Uint

//OldestActiveTimestamp is the timestamp of the oldest item in the queue.
OldestActiveTimestamp common.Time
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queuetest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

func makeEvent(fields mapstr.M) publisher.Event {
func MakeEvent(fields mapstr.M) publisher.Event {
return publisher.Event{
Content: beat.Event{
Timestamp: time.Now(),
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/queuetest/producer_cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {

for ; i < N1; i++ {
log.Debugf("send event %v to first producer", i)
producer.Publish(makeEvent(mapstr.M{
producer.Publish(MakeEvent(mapstr.M{
"value": i,
}))
}
Expand All @@ -67,7 +67,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {
producer = b.Producer(queue.ProducerConfig{})
for ; i < N2; i++ {
log.Debugf("send event %v to new producer", i)
producer.Publish(makeEvent(mapstr.M{
producer.Publish(MakeEvent(mapstr.M{
"value": i,
}))
}
Expand Down
24 changes: 22 additions & 2 deletions libbeat/publisher/queue/queuetest/queuetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package queuetest

import (
"errors"
"io"
"sync"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -209,12 +213,28 @@ func runTestCases(t *testing.T, tests []testCase, queueFactory QueueFactory) {

go test.producers(&wg, nil, log, queue)()
go test.consumers(&wg, nil, log, queue)()

go testFetchMetrics(t, queue)
wg.Wait()
}))
}
}

func testFetchMetrics(t *testing.T, mon queue.Queue) {
_, err := mon.Metrics()
if errors.Is(err, queue.ErrMetricsNotImplemented) {
return
}

metrics, err := mon.Metrics()
// EOF is returned if the queue is closing, so the only "good" error is that
if err != nil {
assert.ErrorIs(t, err, io.EOF)
}

assert.True(t, metrics.EventCount.Exists() || metrics.ByteCount.Exists())

}

func multiple(
fns ...workerFactory,
) workerFactory {
Expand Down Expand Up @@ -269,7 +289,7 @@ func makeProducer(
})
for i := 0; i < maxEvents; i++ {
log.Debug("publish event", i)
producer.Publish(makeEvent(makeFields(i)))
producer.Publish(MakeEvent(makeFields(i)))
}

ackWG.Wait()
Expand Down