Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support elastic-agent-shipper in diskqueue #32258

Merged
merged 12 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 114 additions & 35 deletions libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// Usage:
//
// go test -bench=1M -benchtime 1x -count 10 -timeout 600m -benchmem > results.txt
// go test -bench=100k -benchtime 1x -count 10 -timeout 10m -benchmem > results.txt
//
// then
//
Expand All @@ -35,11 +35,14 @@ import (
"testing"
"time"

timestamppb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
)

var (
Expand All @@ -58,8 +61,8 @@ var (
}
)

//makeEvent creates a sample event, using a random message from msg above
func makeEvent() publisher.Event {
//makePublisherEvent creates a sample publisher.Event, using a random message from msgs list
func makePublisherEvent() publisher.Event {
return publisher.Event{
Content: beat.Event{
Timestamp: eventTime,
Expand All @@ -70,11 +73,27 @@ func makeEvent() publisher.Event {
}
}

//makeMessagesEvent creates a sample *messages.Event, using a random message from msgs list
func makeMessagesEvent() *messages.Event {
return &messages.Event{
Timestamp: timestamppb.New(eventTime),
Fields: &messages.Struct{
Data: map[string]*messages.Value{
"message": &messages.Value{
Kind: &messages.Value_StringValue{
StringValue: msgs[rand.Intn(len(msgs))],
},
},
},
},
}
}

//setup creates the disk queue, including a temporary directory to
// hold the queue. Location of the temporary directory is stored in
// the queue settings. Call `cleanup` when done with the queue to
// close the queue and remove the temp dir.
func setup(encrypt bool, compress bool) (*diskQueue, queue.Producer) {
func setup(encrypt bool, compress bool, protobuf bool) (*diskQueue, queue.Producer) {
dir, err := os.MkdirTemp("", "benchmark")
if err != nil {
panic(err)
Expand All @@ -85,6 +104,7 @@ func setup(encrypt bool, compress bool) (*diskQueue, queue.Producer) {
s.EncryptionKey = []byte("testtesttesttest")
}
s.UseCompression = compress
s.UseProtobuf = protobuf
q, err := NewQueue(logp.L(), s)
if err != nil {
os.RemoveAll(dir)
Expand All @@ -104,9 +124,15 @@ func cleanup(q *diskQueue) {
}
}

func publishEvents(p queue.Producer, num int) {
func publishEvents(p queue.Producer, num int, protobuf bool) {
for i := 0; i < num; i++ {
ok := p.Publish(makeEvent())
var e interface{}
if protobuf {
e = makeMessagesEvent()
} else {
e = makePublisherEvent()
}
ok := p.Publish(e)
if !ok {
panic("didn't publish")
}
Expand All @@ -131,36 +157,36 @@ func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error {
//produceAndConsume generates and publishes events in a go routine, in
// the main go routine it consumes and acks them. This interleaves
// publish and consume.
func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error {
go publishEvents(p, num_events)
func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, protobuf bool) error {
go publishEvents(p, num_events, protobuf)
return getAndAckEvents(q, num_events, batch_size)
}

//produceThenConsume generates and publishes events, when all events
// are published it consumes and acks them.
func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error {
publishEvents(p, num_events)
func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, protobuf bool) error {
publishEvents(p, num_events, protobuf)
return getAndAckEvents(q, num_events, batch_size)
}

//benchmarkQueue is a wrapper for produceAndConsume, it tries to limit
// timers to just produceAndConsume
func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, b *testing.B) {
func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, protobuf bool, b *testing.B) {
b.ResetTimer()
var err error

for n := 0; n < b.N; n++ {
b.StopTimer()
rand.Seed(1)
q, p := setup(encrypt, compress)
q, p := setup(encrypt, compress, protobuf)
b.StartTimer()
if async {
if err = produceAndConsume(p, q, num_events, batch_size); err != nil {
if err = produceAndConsume(p, q, num_events, batch_size, protobuf); err != nil {
cleanup(q)
break
}
} else {
if err = produceThenConsume(p, q, num_events, batch_size); err != nil {
if err = produceThenConsume(p, q, num_events, batch_size, protobuf); err != nil {
cleanup(q)
break
}
Expand All @@ -172,25 +198,78 @@ func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool,
}
}

// Actual benchmark calls follow
func BenchmarkAsync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, true, b) }
func BenchmarkAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, true, b) }
func BenchmarkEncryptAsync1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, true, b) }
func BenchmarkEncryptAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, true, b) }
func BenchmarkCompressAsync1k(b *testing.B) { benchmarkQueue(1000, 10, false, true, true, b) }
func BenchmarkCompressAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, true, true, b) }
func BenchmarkEncryptCompressAsync1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, true, b) }
func BenchmarkEncryptCompressAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, true, true, b)
}

func BenchmarkSync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, false, b) }
func BenchmarkSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, false, b) }
func BenchmarkEncryptSync1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, false, b) }
func BenchmarkEncryptSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, false, b) }
func BenchmarkCompressSync1k(b *testing.B) { benchmarkQueue(1000, 10, false, true, false, b) }
func BenchmarkCompressSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, true, false, b) }
func BenchmarkEncryptCompressSync1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, false, b) }
func BenchmarkEncryptCompressSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, true, false, b)
// Async benchmarks
func BenchmarkAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, true, false, b)
}
func BenchmarkAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, true, false, b)
}
func BenchmarkEncryptAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, false, true, false, b)
}
func BenchmarkEncryptAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, false, true, false, b)
}
func BenchmarkCompressAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, true, true, false, b)
}
func BenchmarkCompressAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, true, true, false, b)
}
func BenchmarkEncryptCompressAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, true, false, b)
}
func BenchmarkEncryptCompressAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, true, false, b)
}
func BenchmarkProtoAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, true, true, b)
}
func BenchmarkProtoAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, true, true, b)
}
func BenchmarkEncCompProtoAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, true, true, b)
}
func BenchmarkEncCompProtoAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, true, true, b)
}

// Sync Benchmarks
func BenchmarkSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, false, false, b)
}
func BenchmarkSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, false, false, b)
}
func BenchmarkEncryptSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, false, false, false, b)
}
func BenchmarkEncryptSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, false, false, false, b)
}
func BenchmarkCompressSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, true, false, false, b)
}
func BenchmarkCompressSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, true, false, false, b)
}
func BenchmarkEncryptCompressSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, false, false, b)
}
func BenchmarkEncryptCompressSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, false, false, b)
}
func BenchmarkProtoSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, false, true, b)
}
func BenchmarkProtoSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, false, true, b)
}
func BenchmarkEncCompProtoSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, false, true, b)
}
func BenchmarkEncCompProtoSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, false, true, b)
}
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Settings struct {

// UseCompression enables or disables LZ4 compression
UseCompression bool

// UseProtobuf enables protobuf serialization instead of CBOR
UseProtobuf bool
}

// userConfig holds the parameters for a disk queue that are configurable
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/docs/frameV2.pic
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
boxht = 0.25
SIZE1: box "size (uint32)" wid 4;
DATA: box "CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw;
DATA: box "CBOR or Protobuf serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw;
CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw;
SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw;
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/docs/frameV2.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 9 additions & 4 deletions libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,19 @@ both compression and encryption are enabled. The next 128-bits are
the initialization vector and the rest of the file is LZ4 compressed
frames.

If the options field has the third bit set, then Google Protobuf is
used to serialize the data in the frame instead of CBOR.

![Segment Schema Version 2](./schemaV2.svg)

The frames for version 2, consist of a header, followed by the
serialized event and a footer. The header contains one field which is
the size of the frame, which is an unsigned 32-bit integer in
little-endian format. The serialization format is CBOR. The footer
contains 2 fields, the first of which is a checksum which is an
unsigned 32-bit integer in little-endian format, followed by a repeat
of the size from the header. This is the same as version 1.
little-endian format. The serialization format is CBOR or Google
Protobuf. The footer contains 2 fields, the first of which is a
checksum which is an unsigned 32-bit integer in little-endian format,
followed by a repeat of the size from the header. The only difference
from Version 1 is the option for the serialization format to be CBOR
or Google Protobuf.

![Frame Version 2](./frameV2.svg)
4 changes: 1 addition & 3 deletions libbeat/publisher/queue/diskqueue/frames.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package diskqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

// Every data frame read from the queue is assigned a unique sequential
// integer, which is used to keep track of which frames have been
// acknowledged.
Expand Down Expand Up @@ -54,7 +52,7 @@ type readFrame struct {
id frameID

// The event decoded from the data frame.
event publisher.Event
event interface{}

// How much space this frame occupied on disk (before deserialization),
// including the frame header / footer.
Expand Down
8 changes: 7 additions & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,16 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig {
}

func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
var serializationFormat SerializationFormat
if dq.settings.UseProtobuf {
serializationFormat = SerializationProtobuf
} else {
serializationFormat = SerializationCBOR
}
return &diskQueueProducer{
queue: dq,
config: cfg,
encoder: newEventEncoder(),
encoder: newEventEncoder(serializationFormat),
done: make(chan struct{}),
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/reader_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon

// Open the file and seek to the starting position.
handle, err := request.segment.getReader(rl.settings)
rl.decoder.useJSON = request.segment.shouldUseJSON()
rl.decoder.serializationFormat = handle.serializationFormat
if err != nil {
return readerLoopResponse{err: err}
}
Expand Down
Loading