Skip to content

Commit

Permalink
Add filtering support to Generic Forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
Blinkuu committed Aug 1, 2023
1 parent ebaa79e commit a3d63c8
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 1 deletion.
10 changes: 10 additions & 0 deletions modules/distributor/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ type Config struct {
Name string `yaml:"name"`
Backend string `yaml:"backend"`
OTLPGRPC otlpgrpc.Config `yaml:"otlpgrpc"`
Filter FilterConfig `yaml:"filter"`
}

type FilterConfig struct {
Traces TraceFiltersConfig `yaml:"traces"`
}

type TraceFiltersConfig struct {
SpanConditions []string `yaml:"span"`
SpanEventConditions []string `yaml:"spanevent"`
}

func (cfg *Config) Validate() error {
Expand Down
178 changes: 177 additions & 1 deletion modules/distributor/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,26 @@ package forwarder
import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/go-kit/log"
zaplogfmt "github.com/jsternberg/zap-logfmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
"github.com/sirupsen/logrus"
"github.com/weaveworks/common/logging"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/grafana/tempo/modules/distributor/forwarder/otlpgrpc"
)
Expand Down Expand Up @@ -36,6 +51,7 @@ func New(cfg Config, logger log.Logger) (Forwarder, error) {
return nil, fmt.Errorf("failed to validate config: %w", err)
}

var forwarder Forwarder
switch cfg.Backend {
case OTLPGRPCBackend:
f, err := otlpgrpc.NewForwarder(cfg.OTLPGRPC, logger)
Expand All @@ -49,8 +65,168 @@ func New(cfg Config, logger log.Logger) (Forwarder, error) {
return nil, fmt.Errorf("failed to dial: %w", err)
}

return f, nil
forwarder = f
default:
return nil, fmt.Errorf("%s backend is not supported", cfg.Backend)
}

if len(cfg.Filter.Traces.SpanConditions) > 0 || len(cfg.Filter.Traces.SpanEventConditions) > 0 {
return NewFilterForwarder(cfg.Filter, forwarder)
}

return forwarder, nil
}

type FilterForwarder struct {
filterProcessor processor.Traces
next Forwarder
fatalError error
fatalErrorMu sync.RWMutex
}

func NewFilterForwarder(cfg FilterConfig, next Forwarder) (*FilterForwarder, error) {
factory := filterprocessor.NewFactory()

logLevel := logging.Level{}
if err := logLevel.Set("warn"); err != nil {
panic(err)
}

set := processor.CreateSettings{
ID: component.ID{},
TelemetrySettings: component.TelemetrySettings{
Logger: newLogger(logLevel),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
},
BuildInfo: component.BuildInfo{},
}
fpCfg := &filterprocessor.Config{
ErrorMode: ottl.IgnoreError,
Traces: filterprocessor.TraceFilters{
SpanConditions: cfg.Traces.SpanConditions,
SpanEventConditions: cfg.Traces.SpanEventConditions,
},
}
fp, err := factory.CreateTracesProcessor(context.TODO(), set, fpCfg, consumerToForwarderAdapter{forwarder: next})
if err != nil {
return nil, fmt.Errorf("failed to create filter processor: %w", err)
}

f := &FilterForwarder{
filterProcessor: fp,
next: next,
}

if err := f.filterProcessor.Start(context.TODO(), f); err != nil {
return nil, fmt.Errorf("failed to start filter processor: %w", err)
}

return f, nil
}

func (f *FilterForwarder) ForwardTraces(ctx context.Context, traces ptrace.Traces) error {
f.fatalErrorMu.RLock()
fatalErr := f.fatalError
f.fatalErrorMu.RUnlock()

if fatalErr != nil {
return fmt.Errorf("fatal error occurred in filter forwarder: %w", fatalErr)
}

// Copying the traces to avoid mutating the original.
tracesCopy := ptrace.NewTraces()
traces.CopyTo(tracesCopy)

fmt.Printf("traces before filter: span_count=%+v\n", tracesCopy.SpanCount())

err := f.filterProcessor.ConsumeTraces(ctx, tracesCopy)
if err != nil {
return fmt.Errorf("failed to filter traces: %w", err)
}

return nil
}

func (f *FilterForwarder) Shutdown(ctx context.Context) error {
var errs []error

if err := f.filterProcessor.Shutdown(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown filter processor: %w", err))
}

if err := f.next.Shutdown(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown next forwarder: %w", err))
}

return multierr.Combine(errs...)
}

// ReportFatalError implements component.Host
func (f *FilterForwarder) ReportFatalError(err error) {
f.fatalErrorMu.Lock()
f.fatalError = err
f.fatalErrorMu.Unlock()
}

// GetFactory implements component.Host
func (f *FilterForwarder) GetFactory(component.Kind, component.Type) component.Factory {
return nil
}

// GetExtensions implements component.Host
func (f *FilterForwarder) GetExtensions() map[component.ID]extension.Extension {
return nil
}

// GetExporters implements component.Host
func (f *FilterForwarder) GetExporters() map[component.DataType]map[component.ID]component.Component {
return nil
}

type consumerToForwarderAdapter struct {
forwarder Forwarder
}

func (c consumerToForwarderAdapter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
fmt.Printf("traces after filter: span_count=%+v\n", td.SpanCount())
return c.forwarder.ForwardTraces(ctx, td)
}

func (c consumerToForwarderAdapter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func newLogger(level logging.Level) *zap.Logger {
zapLevel := zapcore.InfoLevel

switch level.Logrus {
case logrus.PanicLevel:
zapLevel = zapcore.PanicLevel
case logrus.FatalLevel:
zapLevel = zapcore.FatalLevel
case logrus.ErrorLevel:
zapLevel = zapcore.ErrorLevel
case logrus.WarnLevel:
zapLevel = zapcore.WarnLevel
case logrus.InfoLevel:
zapLevel = zapcore.InfoLevel
case logrus.DebugLevel:
case logrus.TraceLevel:
zapLevel = zapcore.DebugLevel
}

config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(time.RFC3339))
}
logger := zap.New(zapcore.NewCore(
zaplogfmt.NewEncoder(config),
os.Stdout,
zapLevel,
))
logger = logger.With(zap.String("component", "tempo"))
logger.Info("filter forwarder logger initialized")

return logger
}

0 comments on commit a3d63c8

Please sign in to comment.