Skip to content

Commit

Permalink
WIP test fix flush
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanLovesCoffee committed Aug 14, 2024
1 parent d5d7830 commit 015c5bd
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
8 changes: 1 addition & 7 deletions comp/logs/agent/agentimpl/agent_serverless_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,5 @@ func (a *logAgent) SetupPipeline(

// buildEndpoints builds endpoints for the logs agent
func buildEndpoints(coreConfig pkgConfig.Reader) (*config.Endpoints, error) {
config, err := config.BuildServerlessEndpoints(coreConfig, intakeTrackType, config.DefaultIntakeProtocol)
if err != nil {
return nil, err
}
// in serverless mode, we never want the batch strategy to flush with a tick
config.BatchWait = 365 * 24 * time.Hour
return config, nil
return config.BuildServerlessEndpoints(coreConfig, intakeTrackType, config.DefaultIntakeProtocol)
}
2 changes: 0 additions & 2 deletions comp/logs/agent/agentimpl/agent_serverless_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package agentimpl

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/fx"
Expand All @@ -27,5 +26,4 @@ func TestBuildServerlessEndpoints(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "http-intake.logs.datadoghq.com", endpoints.Main.Host)
assert.Equal(t, "lambda-extension", string(endpoints.Main.Origin))
assert.True(t, endpoints.Main.BatchWait > config.BatchWait*time.Second)
}
11 changes: 8 additions & 3 deletions pkg/logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Pipeline struct {
sender *sender.Sender
serverless bool
flushWg *sync.WaitGroup
flushStart chan struct{}
}

// NewPipeline returns a new Pipeline
Expand All @@ -49,9 +50,11 @@ func NewPipeline(outputChan chan *message.Payload,

var senderDoneChan chan *sync.WaitGroup
var flushWg *sync.WaitGroup
var flushStart chan struct{}
if serverless {
senderDoneChan = make(chan *sync.WaitGroup)
flushWg = &sync.WaitGroup{}
flushStart = make(chan struct{})
}

mainDestinations := getDestinations(endpoints, destinationsContext, pipelineID, serverless, senderDoneChan, status, cfg)
Expand All @@ -73,7 +76,7 @@ func NewPipeline(outputChan chan *message.Payload,
encoder = processor.RawEncoder
}

strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineID)
strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, flushStart, pipelineID)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize, senderDoneChan, flushWg)

inputChan := make(chan *message.Message, config.ChanSize)
Expand All @@ -87,6 +90,7 @@ func NewPipeline(outputChan chan *message.Payload,
sender: logsSender,
serverless: serverless,
flushWg: flushWg,
flushStart: flushStart,
}
}

Expand All @@ -110,6 +114,7 @@ func (p *Pipeline) Flush(ctx context.Context) {
p.processor.Flush(ctx) // flush messages in the processor into the sender

if p.serverless {
<-p.flushStart
// Wait for the logs sender to finish sending payloads to all destinations before allowing the flush to finish
p.flushWg.Wait()
}
Expand Down Expand Up @@ -149,13 +154,13 @@ func getDestinations(endpoints *config.Endpoints, destinationsContext *client.De
}

//nolint:revive // TODO(AML) Fix revive linter
func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushWg *sync.WaitGroup, _ int) sender.Strategy {
func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushWg *sync.WaitGroup, flushStart chan struct{}, _ int) sender.Strategy {
if endpoints.UseHTTP || serverless {
encoder := sender.IdentityContentType
if endpoints.Main.UseCompression {
encoder = sender.NewGzipContentEncoding(endpoints.Main.CompressionLevel)
}
return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushWg, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder)
return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushWg, flushStart, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder)
}
return sender.NewStreamStrategy(inputChan, outputChan, sender.IdentityContentType)
}
7 changes: 6 additions & 1 deletion pkg/logs/sender/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type batchStrategy struct {
flushChan chan struct{}
serverless bool
flushWg *sync.WaitGroup
flushStart chan struct{}
buffer *MessageBuffer
// pipelineName provides a name for the strategy to differentiate it from other instances in other internal pipelines
pipelineName string
Expand All @@ -44,20 +45,22 @@ func NewBatchStrategy(inputChan chan *message.Message,
flushChan chan struct{},
serverless bool,
flushWg *sync.WaitGroup,
flushStart chan struct{},
serializer Serializer,
batchWait time.Duration,
maxBatchSize int,
maxContentSize int,
pipelineName string,
contentEncoding ContentEncoding) Strategy {
return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushWg, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding)
return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushWg, flushStart, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding)
}

func newBatchStrategyWithClock(inputChan chan *message.Message,
outputChan chan *message.Payload,
flushChan chan struct{},
serverless bool,
flushWg *sync.WaitGroup,
flushStart chan struct{},
serializer Serializer,
batchWait time.Duration,
maxBatchSize int,
Expand All @@ -72,6 +75,7 @@ func newBatchStrategyWithClock(inputChan chan *message.Message,
flushChan: flushChan,
serverless: serverless,
flushWg: flushWg,
flushStart: flushStart,
buffer: NewMessageBuffer(maxBatchSize, maxContentSize),
serializer: serializer,
batchWait: batchWait,
Expand Down Expand Up @@ -167,6 +171,7 @@ func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan cha
if s.serverless {
// Increment the wait group so the flush doesn't finish until all payloads are sent to all destinations
s.flushWg.Add(1)
s.flushStart <- struct{}{}
}

outputChan <- &message.Payload{
Expand Down

0 comments on commit 015c5bd

Please sign in to comment.