Skip to content

Commit

Permalink
[chore] Refactor/move LogEmitter to make it reusable by operators (#3…
Browse files Browse the repository at this point in the history
…2629)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR moves `LogEmitter` to a common place in
`pkg/stanza/operator/helper` so as to be reusable by operators without
hitting circular imports.
This is explained at
#31959 (comment).

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>

Signed-off-by: ChrsMark <[email protected]>
  • Loading branch information
ChrsMark authored Apr 23, 2024
1 parent 17ebc7b commit 5ec85d5
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 181 deletions.
169 changes: 5 additions & 164 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,174 +4,15 @@
package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"

import (
"context"
"sync"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// LogEmitter is a stanza operator that emits log entries to a channel
type LogEmitter struct {
helper.OutputOperator
logChan chan []*entry.Entry
stopOnce sync.Once
cancel context.CancelFunc
batchMux sync.Mutex
batch []*entry.Entry
wg sync.WaitGroup
maxBatchSize uint
flushInterval time.Duration
}

var (
defaultFlushInterval = 100 * time.Millisecond
defaultMaxBatchSize uint = 100
)

type emitterOption interface {
apply(*LogEmitter)
}

func withMaxBatchSize(maxBatchSize uint) emitterOption {
return maxBatchSizeOption{maxBatchSize}
}

type maxBatchSizeOption struct {
maxBatchSize uint
}

func (o maxBatchSizeOption) apply(e *LogEmitter) {
e.maxBatchSize = o.maxBatchSize
}

func withFlushInterval(flushInterval time.Duration) emitterOption {
return flushIntervalOption{flushInterval}
}

type flushIntervalOption struct {
flushInterval time.Duration
}

func (o flushIntervalOption) apply(e *LogEmitter) {
e.flushInterval = o.flushInterval
}

// NewLogEmitter creates a new receiver output
func NewLogEmitter(logger *zap.SugaredLogger, opts ...emitterOption) *LogEmitter {
e := &LogEmitter{
OutputOperator: helper.OutputOperator{
BasicOperator: helper.BasicOperator{
OperatorID: "log_emitter",
OperatorType: "log_emitter",
SugaredLogger: logger,
},
},
logChan: make(chan []*entry.Entry),
maxBatchSize: defaultMaxBatchSize,
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
flushInterval: defaultFlushInterval,
cancel: func() {},
}
for _, opt := range opts {
opt.apply(e)
}
return e
}

// Start starts the goroutine(s) required for this operator
func (e *LogEmitter) Start(_ operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel

e.wg.Add(1)
go e.flusher(ctx)
return nil
}

// Stop will close the log channel and stop running goroutines
func (e *LogEmitter) Stop() error {
e.stopOnce.Do(func() {
e.cancel()
e.wg.Wait()

close(e.logChan)
})

return nil
}

// OutChannel returns the channel on which entries will be sent to.
func (e *LogEmitter) OutChannel() <-chan []*entry.Entry {
return e.logChan
}

// Process will emit an entry to the output channel
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {
e.flush(ctx, oldBatch)
}

return nil
}

// appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
// (which should be flushed) will be returned
func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
e.batchMux.Lock()
defer e.batchMux.Unlock()

e.batch = append(e.batch, ent)
if uint(len(e.batch)) >= e.maxBatchSize {
var oldBatch []*entry.Entry
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
return oldBatch
}

return nil
}

// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
func (e *LogEmitter) flusher(ctx context.Context) {
defer e.wg.Done()

ticker := time.NewTicker(e.flushInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
e.flush(ctx, oldBatch)
}
case <-ctx.Done():
return
}
}
}

// flush flushes the provided batch to the log channel.
func (e *LogEmitter) flush(ctx context.Context, batch []*entry.Entry) {
select {
case e.logChan <- batch:
case <-ctx.Done():
}
}

// makeNewBatch replaces the current batch on the log emitter with a new batch, returning the old one
func (e *LogEmitter) makeNewBatch() []*entry.Entry {
e.batchMux.Lock()
defer e.batchMux.Unlock()

if len(e.batch) == 0 {
return nil
}
// Deprecated [v0.100.0] Use helper.LogEmitter directly instead
type LogEmitter = helper.LogEmitter

var oldBatch []*entry.Entry
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
return oldBatch
// Deprecated [v0.100.0] Use helper.NewLogEmitter directly instead
func NewLogEmitter(logger *zap.SugaredLogger, opts ...helper.EmitterOption) *LogEmitter {
return helper.NewLogEmitter(logger, opts...)
}
9 changes: 5 additions & 4 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

Expand Down Expand Up @@ -45,14 +46,14 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {

operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)

emitterOpts := []emitterOption{}
emitterOpts := []helper.EmitterOption{}
if baseCfg.maxBatchSize > 0 {
emitterOpts = append(emitterOpts, withMaxBatchSize(baseCfg.maxBatchSize))
emitterOpts = append(emitterOpts, helper.WithMaxBatchSize(baseCfg.maxBatchSize))
}
if baseCfg.flushInterval > 0 {
emitterOpts = append(emitterOpts, withFlushInterval(baseCfg.flushInterval))
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
}
emitter := NewLogEmitter(params.Logger.Sugar(), emitterOpts...)
emitter := helper.NewLogEmitter(params.Logger.Sugar(), emitterOpts...)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/noop"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
emitter := NewLogEmitter(zap.NewNop().Sugar())
emitter := helper.NewLogEmitter(zap.NewNop().Sugar())

pipe, err := pipeline.Config{
Operators: []operator.Config{
Expand Down
5 changes: 3 additions & 2 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

Expand All @@ -25,7 +26,7 @@ type receiver struct {
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *LogEmitter
emitter *helper.LogEmitter
consumer consumer.Logs
converter *Converter
logger *zap.Logger
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
r.logger.Debug("Receive loop stopped")
return

case e, ok := <-r.emitter.logChan:
case e, ok := <-r.emitter.OutChannel():
if !ok {
continue
}
Expand Down
20 changes: 13 additions & 7 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

Expand All @@ -43,7 +44,8 @@ func TestStart(t *testing.T) {
require.NoError(t, err, "receiver start failed")

stanzaReceiver := logsReceiver.(*receiver)
stanzaReceiver.emitter.logChan <- []*entry.Entry{entry.New()}
logChan := stanzaReceiver.emitter.OutChannelForWrite()
logChan <- []*entry.Entry{entry.New()}

// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
Expand Down Expand Up @@ -81,7 +83,8 @@ func TestHandleConsume(t *testing.T) {
require.NoError(t, err, "receiver start failed")

stanzaReceiver := logsReceiver.(*receiver)
stanzaReceiver.emitter.logChan <- []*entry.Entry{entry.New()}
logChan := stanzaReceiver.emitter.OutChannelForWrite()
logChan <- []*entry.Entry{entry.New()}

// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
Expand All @@ -106,7 +109,8 @@ func TestHandleConsumeRetry(t *testing.T) {
require.NoError(t, logsReceiver.Start(context.Background(), componenttest.NewNopHost()))

stanzaReceiver := logsReceiver.(*receiver)
stanzaReceiver.emitter.logChan <- []*entry.Entry{entry.New()}
logChan := stanzaReceiver.emitter.OutChannelForWrite()
logChan <- []*entry.Entry{entry.New()}

require.Eventually(t,
func() bool {
Expand All @@ -130,7 +134,7 @@ func BenchmarkReadLine(b *testing.B) {
var operatorCfgs []operator.Config
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

emitter := NewLogEmitter(zap.NewNop().Sugar())
emitter := helper.NewLogEmitter(zap.NewNop().Sugar())
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -158,8 +162,9 @@ func BenchmarkReadLine(b *testing.B) {
// Run the actual benchmark
b.ResetTimer()
require.NoError(b, pipe.Start(storageClient))
logChan := emitter.OutChannel()
for i := 0; i < b.N; i++ {
entries := <-emitter.logChan
entries := <-logChan
for _, e := range entries {
convert(e)
}
Expand Down Expand Up @@ -195,7 +200,7 @@ func BenchmarkParseAndMap(b *testing.B) {
var operatorCfgs []operator.Config
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

emitter := NewLogEmitter(zap.NewNop().Sugar())
emitter := helper.NewLogEmitter(zap.NewNop().Sugar())
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -223,8 +228,9 @@ func BenchmarkParseAndMap(b *testing.B) {
// Run the actual benchmark
b.ResetTimer()
require.NoError(b, pipe.Start(storageClient))
logChan := emitter.OutChannel()
for i := 0; i < b.N; i++ {
entries := <-emitter.logChan
entries := <-logChan
for _, e := range entries {
convert(e)
}
Expand Down
Loading

0 comments on commit 5ec85d5

Please sign in to comment.