Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support elastic-agent-shipper in diskqueue #32258

Merged
merged 12 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 109 additions & 30 deletions libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ import (
"testing"
"time"

timestamppb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
)

var (
Expand All @@ -58,8 +61,8 @@ var (
}
)

//makeEvent creates a sample event, using a random message from msg above
func makeEvent() publisher.Event {
//makePublisherEvent creates a sample publisher.Event, using a random message from msg above
leehinman marked this conversation as resolved.
Show resolved Hide resolved
func makePublisherEvent() publisher.Event {
return publisher.Event{
Content: beat.Event{
Timestamp: eventTime,
Expand All @@ -70,11 +73,27 @@ func makeEvent() publisher.Event {
}
}

//makeMessagesEvent creates a sample *messages.Event, using a random message from msg above
leehinman marked this conversation as resolved.
Show resolved Hide resolved
func makeMessagesEvent() *messages.Event {
return &messages.Event{
Timestamp: timestamppb.New(eventTime),
Fields: &messages.Struct{
Data: map[string]*messages.Value{
"message": &messages.Value{
Kind: &messages.Value_StringValue{
StringValue: msgs[rand.Intn(len(msgs))],
},
},
},
},
}
}

//setup creates the disk queue, including a temporary directory to
// hold the queue. Location of the temporary directory is stored in
// the queue settings. Call `cleanup` when done with the queue to
// close the queue and remove the temp dir.
func setup(encrypt bool, compress bool) (*diskQueue, queue.Producer) {
func setup(encrypt bool, compress bool, protobuf bool) (*diskQueue, queue.Producer) {
dir, err := os.MkdirTemp("", "benchmark")
if err != nil {
panic(err)
Expand All @@ -85,6 +104,7 @@ func setup(encrypt bool, compress bool) (*diskQueue, queue.Producer) {
s.EncryptionKey = []byte("testtesttesttest")
}
s.UseCompression = compress
s.UseProtobuf = protobuf
q, err := NewQueue(logp.L(), s)
if err != nil {
os.RemoveAll(dir)
Expand All @@ -104,9 +124,15 @@ func cleanup(q *diskQueue) {
}
}

func publishEvents(p queue.Producer, num int) {
func publishEvents(p queue.Producer, num int, protobuf bool) {
for i := 0; i < num; i++ {
ok := p.Publish(makeEvent())
var e interface{}
if protobuf {
e = makeMessagesEvent()
} else {
e = makePublisherEvent()
}
ok := p.Publish(e)
if !ok {
panic("didn't publish")
}
Expand All @@ -131,36 +157,36 @@ func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error {
//produceAndConsume generates and publishes events in a go routine, in
// the main go routine it consumes and acks them. This interleaves
// publish and consume.
func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error {
go publishEvents(p, num_events)
func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, protobuf bool) error {
go publishEvents(p, num_events, protobuf)
return getAndAckEvents(q, num_events, batch_size)
}

//produceThenConsume generates and publishes events, when all events
// are published it consumes and acks them.
func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error {
publishEvents(p, num_events)
func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, protobuf bool) error {
publishEvents(p, num_events, protobuf)
return getAndAckEvents(q, num_events, batch_size)
}

//benchmarkQueue is a wrapper for produceAndConsume, it tries to limit
// timers to just produceAndConsume
func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, b *testing.B) {
func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, protobuf bool, b *testing.B) {
b.ResetTimer()
var err error

for n := 0; n < b.N; n++ {
b.StopTimer()
rand.Seed(1)
q, p := setup(encrypt, compress)
q, p := setup(encrypt, compress, protobuf)
b.StartTimer()
if async {
if err = produceAndConsume(p, q, num_events, batch_size); err != nil {
if err = produceAndConsume(p, q, num_events, batch_size, protobuf); err != nil {
cleanup(q)
break
}
} else {
if err = produceThenConsume(p, q, num_events, batch_size); err != nil {
if err = produceThenConsume(p, q, num_events, batch_size, protobuf); err != nil {
cleanup(q)
break
}
Expand All @@ -172,25 +198,78 @@ func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool,
}
}

// Actual benchmark calls follow
func BenchmarkAsync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, true, b) }
func BenchmarkAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, true, b) }
func BenchmarkEncryptAsync1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, true, b) }
func BenchmarkEncryptAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, true, b) }
func BenchmarkCompressAsync1k(b *testing.B) { benchmarkQueue(1000, 10, false, true, true, b) }
func BenchmarkCompressAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, true, true, b) }
func BenchmarkEncryptCompressAsync1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, true, b) }
// Async benchmarks
func BenchmarkAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, true, false, b)
}
func BenchmarkAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, false, false, true, false, b)
}
func BenchmarkEncryptAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, false, true, false, b)
}
func BenchmarkEncryptAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, false, true, false, b)
}
func BenchmarkCompressAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, true, true, false, b)
}
func BenchmarkCompressAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, false, true, true, false, b)
}
func BenchmarkEncryptCompressAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, true, false, b)
}
func BenchmarkEncryptCompressAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, true, true, b)
benchmarkQueue(1000000, 1000, true, true, true, false, b)
}
func BenchmarkProtoAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, true, true, b)
}
func BenchmarkProtoAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, false, false, true, true, b)
}
func BenchmarkEncCompProtoAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, true, true, b)
}
func BenchmarkEncCompProtoAsync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, true, true, true, b)
}

func BenchmarkSync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, false, b) }
func BenchmarkSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, false, b) }
func BenchmarkEncryptSync1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, false, b) }
func BenchmarkEncryptSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, false, b) }
func BenchmarkCompressSync1k(b *testing.B) { benchmarkQueue(1000, 10, false, true, false, b) }
func BenchmarkCompressSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, true, false, b) }
func BenchmarkEncryptCompressSync1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, false, b) }
// Sync Benchmarks
func BenchmarkSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, false, false, b)
}
func BenchmarkSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, false, false, false, false, b)
}
func BenchmarkEncryptSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, false, false, false, b)
}
func BenchmarkEncryptSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, false, false, false, b)
}
func BenchmarkCompressSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, true, false, false, b)
}
func BenchmarkCompressSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, false, true, false, false, b)
}
func BenchmarkEncryptCompressSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, false, false, b)
}
func BenchmarkEncryptCompressSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, true, false, b)
benchmarkQueue(1000000, 1000, true, true, false, false, b)
}
func BenchmarkProtoSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, false, true, b)
}
func BenchmarkProtoSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, false, false, false, true, b)
}
func BenchmarkEncCompProtoSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, false, true, b)
}
func BenchmarkEncCompProtoSync1M(b *testing.B) {
benchmarkQueue(1000000, 1000, true, true, false, true, b)
}
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Settings struct {

// UseCompression enables or disables LZ4 compression
UseCompression bool

// UseProtobuf enables protobuf serialization instead of CBOR
UseProtobuf bool
}

// userConfig holds the parameters for a disk queue that are configurable
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/docs/frameV2.pic
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
boxht = 0.25
SIZE1: box "size (uint32)" wid 4;
DATA: box "CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw;
DATA: box "CBOR or Protobuf serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw;
CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw;
SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw;
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/docs/frameV2.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 9 additions & 4 deletions libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,19 @@ both compression and encryption are enabled. The next 128-bits are
the initialization vector and the rest of the file is LZ4 compressed
frames.

If the options field has the third bit set, then Google Protobuf is
used to serialize the data in the frame instead of CBOR.

![Segment Schema Version 2](./schemaV2.svg)

The frames for version 2, consist of a header, followed by the
serialized event and a footer. The header contains one field which is
the size of the frame, which is an unsigned 32-bit integer in
little-endian format. The serialization format is CBOR. The footer
contains 2 fields, the first of which is a checksum which is an
unsigned 32-bit integer in little-endian format, followed by a repeat
of the size from the header. This is the same as version 1.
little-endian format. The serialization format is CBOR or Google
Protobuf. The footer contains 2 fields, the first of which is a
checksum which is an unsigned 32-bit integer in little-endian format,
followed by a repeat of the size from the header. The only difference
from Version 1 is the option for the serialization format to be CBOR
or Google Protobuf.

![Frame Version 2](./frameV2.svg)
4 changes: 1 addition & 3 deletions libbeat/publisher/queue/diskqueue/frames.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package diskqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

// Every data frame read from the queue is assigned a unique sequential
// integer, which is used to keep track of which frames have been
// acknowledged.
Expand Down Expand Up @@ -54,7 +52,7 @@ type readFrame struct {
id frameID

// The event decoded from the data frame.
event publisher.Event
event interface{}

// How much space this frame occupied on disk (before deserialization),
// including the frame header / footer.
Expand Down
8 changes: 7 additions & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,16 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig {
}

func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
var serializationFormat SerializationFormat
if dq.settings.UseProtobuf {
serializationFormat = SerializationProtobuf
} else {
serializationFormat = SerializationCBOR
}
return &diskQueueProducer{
queue: dq,
config: cfg,
encoder: newEventEncoder(),
encoder: newEventEncoder(serializationFormat),
done: make(chan struct{}),
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/reader_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon

// Open the file and seek to the starting position.
handle, err := request.segment.getReader(rl.settings)
rl.decoder.useJSON = request.segment.shouldUseJSON()
rl.decoder.serializationFormat = handle.serializationFormat
if err != nil {
return readerLoopResponse{err: err}
}
Expand Down
38 changes: 25 additions & 13 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ const currentSegmentVersion = 2
const segmentHeaderSize = 12

const (
ENABLE_ENCRYPTION uint32 = 0x1
ENABLE_COMPRESSION uint32 = 0x2
ENABLE_ENCRYPTION uint32 = 1 << iota // 0x1
ENABLE_COMPRESSION // 0x2
ENABLE_PROTOBUF // 0x4
)

// Sort order: we store loaded segments in ascending order by their id.
Expand Down Expand Up @@ -219,14 +220,6 @@ func (segment *queueSegment) headerSize() uint64 {
return segmentHeaderSize
}

// The initial release of the disk queue used JSON to encode events
// on disk. Since then, we have switched to CBOR to address issues
// with encoding multi-byte characters, and for lower encoding
// overhead.
func (segment *queueSegment) shouldUseJSON() bool {
return segment.schemaVersion != nil && *segment.schemaVersion == 0
}

// getReader sets up the segmentReader. The order of encryption and
// compression is important. If both options are enabled we want
// encrypted compressed data not compressed encrypted data. This is
Expand All @@ -251,6 +244,20 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader,
sr := &segmentReader{}
sr.src = file

if header.version == 0 {
sr.serializationFormat = SerializationJSON
}

// Version 1 is CBOR, Version 2 could be CBOR or ProtoBuf, the
// options control which
if header.version > 0 {
if (header.options & ENABLE_PROTOBUF) == ENABLE_PROTOBUF {
sr.serializationFormat = SerializationProtobuf
} else {
sr.serializationFormat = SerializationCBOR
}
}

if (header.options & ENABLE_ENCRYPTION) == ENABLE_ENCRYPTION {
sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey)
if err != nil {
Expand Down Expand Up @@ -290,6 +297,10 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter,
options = options | ENABLE_COMPRESSION
}

if queueSettings.UseProtobuf {
options = options | ENABLE_PROTOBUF
}

sw := &segmentWriter{}
sw.dst = file

Expand Down Expand Up @@ -468,9 +479,10 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 {
// the purpose of compression since encryption will make the data
// less compressable.
type segmentReader struct {
src io.ReadSeekCloser
er *EncryptionReader
cr *CompressionReader
src io.ReadSeekCloser
er *EncryptionReader
cr *CompressionReader
serializationFormat SerializationFormat
}

func (r *segmentReader) Read(p []byte) (int, error) {
Expand Down
Loading