Skip to content

Commit

Permalink
Local blocks processor honor per-tenant max trace size (#3305)
Browse files Browse the repository at this point in the history
* Keep track of trace sizes for the lifetime of the headblock, add metrics

* lint

* Comment

* Fix invalid assumption that all batches have the same trace id. Undo variable name change, review comments

* changelog
  • Loading branch information
mdisibio authored Jan 17, 2024
1 parent c4b7f8e commit e96c371
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Set `autocomplete_filtering_enabled` to `true` by default [#3178](https://github.com/grafana/tempo/pull/3178) (@mapno)
* [CHANGE] Update Alpine image version to 3.19 [#3289](https://github.com/grafana/tempo/pull/3289) (@zalegrala)
* [CHANGE] Introduce localblocks process config option to select only server spans 3303https://github.com/grafana/tempo/pull/3303 (@zalegrala)
* [CHANGE] Localblocks processor honor tenant max trace size limit [3305](https://github.com/grafana/tempo/pull/3305) (@mdisibio)
* [CHANGE] Major cache refactor to allow multiple role based caches to be configured [#3166](https://github.com/grafana/tempo/pull/3166).
**BREAKING CHANGE** Deprecate the following fields. These have all been migrated to a top level "cache:" field.
```
Expand Down
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorServiceGraphsEnableClientServerPrefix(userID string) bool
MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(userID string) []string
DedicatedColumns(userID string) backend.DedicatedColumns
MaxBytesPerTrace(userID string) int
}

var _ metricsGeneratorOverrides = (overrides.Interface)(nil)
5 changes: 5 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type mockOverrides struct {
localBlocksTraceIdlePeriod time.Duration
localBlocksCompleteBlockTimeout time.Duration
dedicatedColumns backend.DedicatedColumns
maxBytesPerTrace int
}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
Expand Down Expand Up @@ -129,3 +130,7 @@ func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDi
func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return m.dedicatedColumns
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return m.maxBytesPerTrace
}
16 changes: 4 additions & 12 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package localblocks

import (
"errors"
"hash"
"hash/fnv"
"time"

v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

var errMaxExceeded = errors.New("asdf")

type liveTrace struct {
id []byte
timestamp time.Time
Expand All @@ -25,7 +22,7 @@ type liveTraces struct {
func newLiveTraces() *liveTraces {
return &liveTraces{
hash: fnv.New64(),
traces: map[uint64]*liveTrace{},
traces: make(map[uint64]*liveTrace),
}
}

Expand All @@ -39,12 +36,7 @@ func (l *liveTraces) Len() uint64 {
return uint64(len(l.traces))
}

func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
if len(batch.ScopeSpans) == 0 || len(batch.ScopeSpans[0].Spans) == 0 {
return nil
}

traceID := batch.ScopeSpans[0].Spans[0].TraceId
func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
token := l.token(traceID)

tr := l.traces[token]
Expand All @@ -53,7 +45,7 @@ func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
// Before adding this check against max
// Zero means no limit
if max > 0 && uint64(len(l.traces)) >= max {
return errMaxExceeded
return false
}

tr = &liveTrace{
Expand All @@ -64,7 +56,7 @@ func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {

tr.Batches = append(tr.Batches, batch)
tr.timestamp = time.Now()
return nil
return true
}

func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace {
Expand Down
7 changes: 7 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
subsystem = "metrics_generator_processor_local_blocks"

reasonLiveTracesExceeded = "live_traces_exceeded"
reasonTraceSizeExceeded = "trace_too_large"
)

var (
Expand All @@ -25,6 +26,12 @@ var (
Name: "spans_total",
Help: "Total number of spans after filtering",
}, []string{"tenant"})
metricDroppedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "spans_dropped_total",
Help: "Number of spans dropped",
}, []string{"tenant", "reason"})
metricLiveTraces = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down
105 changes: 63 additions & 42 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const timeBuffer = 5 * time.Minute
// ProcessorOverrides is just the set of overrides needed here.
type ProcessorOverrides interface {
DedicatedColumns(string) backend.DedicatedColumns
MaxBytesPerTrace(string) int
}

type Processor struct {
Expand All @@ -55,6 +56,7 @@ type Processor struct {

liveTracesMtx sync.Mutex
liveTraces *liveTraces
traceSizes *traceSizes
}

var _ gen.Processor = (*Processor)(nil)
Expand All @@ -81,6 +83,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, overrides ProcessorOverrides)
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]common.BackendBlock{},
liveTraces: newLiveTraces(),
traceSizes: newTraceSizes(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
cache: lru.New(100),
Expand Down Expand Up @@ -108,30 +111,43 @@ func (p *Processor) PushSpans(_ context.Context, req *tempopb.PushSpansRequest)
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

var count int
before := p.liveTraces.Len()

// A quick way to reduce the number of spans we have to process
maxSz := p.overrides.MaxBytesPerTrace(p.tenant)

batches := req.Batches
if p.Cfg.FilterServerSpans {
for _, batch := range req.Batches {
if batch, count = filterBatch(batch); batch != nil {
err := p.liveTraces.Push(batch, p.Cfg.MaxLiveTraces)
if errors.Is(err, errMaxExceeded) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
}
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(count))
}
batches = filterBatches(batches)
}

for _, batch := range batches {

// Spans in the batch are for the same trace.
// We use the first one.
if len(batch.ScopeSpans) == 0 || len(batch.ScopeSpans[0].Spans) == 0 {
return
}
} else {
for _, batch := range req.Batches {
err := p.liveTraces.Push(batch, p.Cfg.MaxLiveTraces)
if errors.Is(err, errMaxExceeded) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
}
for _, ss := range batch.ScopeSpans {
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(len(ss.Spans)))
}
traceID := batch.ScopeSpans[0].Spans[0].TraceId

// Metric total spans regardless of outcome
numSpans := 0
for _, ss := range batch.ScopeSpans {
numSpans += len(ss.Spans)
}
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(numSpans))

// Check max trace size
if maxSz > 0 && !p.traceSizes.Allow(traceID, batch.Size(), maxSz) {
metricDroppedSpans.WithLabelValues(p.tenant, reasonTraceSizeExceeded).Add(float64(numSpans))
continue
}

// Live traces
if !p.liveTraces.Push(traceID, batch, p.Cfg.MaxLiveTraces) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
continue
}

}

after := p.liveTraces.Len()
Expand Down Expand Up @@ -615,6 +631,9 @@ func (p *Processor) cutBlocks(immediate bool) error {
return nil
}

// Clear historical trace sizes for traces that weren't seen in this block.
p.traceSizes.ClearIdle(p.lastCutTime)

// Final flush
err := p.headBlock.Flush()
if err != nil {
Expand Down Expand Up @@ -741,36 +760,38 @@ func metricSeriesToProto(series traceqlmetrics.MetricSeries) []*tempopb.KeyValue
return r
}

// filterBatch to only root spans or kind==server. Does not modify the input
// filterBatches to only root spans or kind==server. Does not modify the input
// but returns a new struct referencing the same input pointers. Returns nil
// if there were no matching spans.
func filterBatch(batch *v1.ResourceSpans) (*v1.ResourceSpans, int) {
var keep int
var keepSS []*v1.ScopeSpans
for _, ss := range batch.ScopeSpans {

var keepSpans []*v1.Span
for _, s := range ss.Spans {
if s.Kind == v1.Span_SPAN_KIND_SERVER || len(s.ParentSpanId) == 0 {
keepSpans = append(keepSpans, s)
func filterBatches(batches []*v1.ResourceSpans) []*v1.ResourceSpans {
keep := make([]*v1.ResourceSpans, 0, len(batches))

for _, batch := range batches {
var keepSS []*v1.ScopeSpans
for _, ss := range batch.ScopeSpans {

var keepSpans []*v1.Span
for _, s := range ss.Spans {
if s.Kind == v1.Span_SPAN_KIND_SERVER || len(s.ParentSpanId) == 0 {
keepSpans = append(keepSpans, s)
}
}

if len(keepSpans) > 0 {
keepSS = append(keepSS, &v1.ScopeSpans{
Scope: ss.Scope,
Spans: keepSpans,
})
}
}

if len(keepSpans) > 0 {
keepSS = append(keepSS, &v1.ScopeSpans{
Scope: ss.Scope,
Spans: keepSpans,
if len(keepSS) > 0 {
keep = append(keep, &v1.ResourceSpans{
Resource: batch.Resource,
ScopeSpans: keepSS,
})
keep += len(keepSpans)
}
}

if len(keepSS) > 0 {
return &v1.ResourceSpans{
Resource: batch.Resource,
ScopeSpans: keepSS,
}, keep
}

return nil, 0
return keep
}
4 changes: 4 additions & 0 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return nil
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return 0
}

func TestProcessorDoesNotRace(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Expand Down
65 changes: 65 additions & 0 deletions modules/generator/processor/localblocks/traceSizes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package localblocks

import (
"hash"
"hash/fnv"
"sync"
"time"
)

type traceSizes struct {
mtx sync.Mutex
hash hash.Hash64
sizes map[uint64]*traceSize
}

type traceSize struct {
size int
timestamp time.Time
}

func newTraceSizes() *traceSizes {
return &traceSizes{
hash: fnv.New64(),
sizes: make(map[uint64]*traceSize),
}
}

func (s *traceSizes) token(traceID []byte) uint64 {
s.hash.Reset()
s.hash.Write(traceID)
return s.hash.Sum64()
}

// Allow returns true if the historical total plus incoming size is less than
// or equal to the max. The historical total is kept alive and incremented even
// if not allowed, so that long-running traces are cutoff as expected.
func (s *traceSizes) Allow(traceID []byte, sz, max int) bool {
s.mtx.Lock()
defer s.mtx.Unlock()

token := s.token(traceID)
tr := s.sizes[token]
if tr == nil {
tr = &traceSize{
size: sz,
}
s.sizes[token] = tr
}

tr.timestamp = time.Now()
tr.size += sz

return tr.size <= max
}

func (s *traceSizes) ClearIdle(idleSince time.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()

for token, tr := range s.sizes {
if tr.timestamp.Before(idleSince) {
delete(s.sizes, token)
}
}
}

0 comments on commit e96c371

Please sign in to comment.