Skip to content

Commit

Permalink
[TraceQL Metrics] Add local disk caching for generator completed bloc…
Browse files Browse the repository at this point in the history
…ks (#3799)

* working version

* Fix start/end meta of generator-flushed blocks, and config default. Cleanup/dedupe timerange logic.

* Add tests

* lint

* changelog

* review feedback
  • Loading branch information
mdisibio authored Jun 21, 2024
1 parent 047f7d5 commit b29d56c
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio)
* [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala)
* [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno)
* [ENHANCEMENT] Use multiple goroutines to unmarshal responses in parallel in the query frontend. [#3713](https://github.com/grafana/tempo/pull/3713) (@joe-elliott)
Expand Down
12 changes: 6 additions & 6 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin
return pipeline.NewBadRequest(errors.New("invalid interval specified: 0")), nil
}

generatorReq := s.generatorRequest(*req, r, tenantID, now, samplingRate)
generatorReq := s.generatorRequest(*req, r, tenantID, now)
reqCh := make(chan *http.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
Expand Down Expand Up @@ -370,10 +370,12 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
continue
}

start, end := traceql.TrimToOverlap(searchReq.Start, searchReq.End, searchReq.Step, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()))

queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: max(searchReq.Start, uint64(m.StartTime.UnixNano())),
End: min(searchReq.End, uint64(m.EndTime.UnixNano())),
Start: start,
End: end,
Step: searchReq.Step,
// ShardID: uint32, // No sharding with RF=1
// ShardCount: uint32, // No sharding with RF=1
Expand All @@ -388,8 +390,6 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
FooterSize: m.FooterSize,
DedicatedColumns: dc,
}
alignTimeRange(queryRangeReq)
queryRangeReq.End += queryRangeReq.Step

subR = api.BuildQueryRangeRequest(subR, queryRangeReq)
subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)
Expand Down Expand Up @@ -425,7 +425,7 @@ func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, query
return start, end
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, now time.Time, samplingRate float64) *http.Request {
func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, now time.Time) *http.Request {
cutoff := uint64(now.Add(-s.cfg.QueryBackendAfter).UnixNano())

// if there's no overlap between the query and ingester range just return nil
Expand Down
1 change: 1 addition & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.TracesWAL.Version = encoding.DefaultEncoding().Version()
cfg.TracesWAL.IngestionSlack = 2 * time.Minute

// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
Expand Down
130 changes: 23 additions & 107 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ import (
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/tempodb"
"github.com/opentracing/opentracing-go"
"go.uber.org/atomic"

gen "github.com/grafana/tempo/modules/generator/processor"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
Expand All @@ -48,6 +45,8 @@ type Processor struct {
logger kitlog.Logger
Cfg Config
wal *wal.WAL
walR backend.Reader
walW backend.Writer
closeCh chan struct{}
wg sync.WaitGroup
cacheMtx sync.RWMutex
Expand Down Expand Up @@ -90,6 +89,8 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid
logger: log.WithUserID(tenant, log.Logger),
Cfg: cfg,
wal: wal,
walR: backend.NewReader(wal.LocalBackend()),
walW: backend.NewWriter(wal.LocalBackend()),
overrides: overrides,
enc: enc,
walBlocks: map[uuid.UUID]common.WALBlock{},
Expand Down Expand Up @@ -494,108 +495,6 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ
return resp, nil
}

// QueryRange returns metrics.
func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest) (traceql.SeriesSet, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

p.blocksMtx.RLock()
defer p.blocksMtx.RUnlock()

cutoff := time.Now().Add(-p.Cfg.CompleteBlockTimeout).Add(-timeBuffer)
if req.Start < uint64(cutoff.UnixNano()) {
return nil, fmt.Errorf("time range must be within last %v", p.Cfg.CompleteBlockTimeout)
}

// Blocks to check
blocks := make([]common.BackendBlock, 0, 1+len(p.walBlocks)+len(p.completeBlocks))
if p.headBlock != nil {
blocks = append(blocks, p.headBlock)
}
for _, b := range p.walBlocks {
blocks = append(blocks, b)
}
for _, b := range p.completeBlocks {
blocks = append(blocks, b)
}
if len(blocks) == 0 {
return nil, nil
}

expr, err := traceql.Parse(req.Query)
if err != nil {
return nil, fmt.Errorf("compiling query: %w", err)
}

unsafe := p.overrides.UnsafeQueryHints(p.tenant)

timeOverlapCutoff := p.Cfg.Metrics.TimeOverlapCutoff
if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 {
timeOverlapCutoff = v
}

concurrency := p.Cfg.Metrics.ConcurrentBlocks
if v, ok := expr.Hints.GetInt(traceql.HintConcurrentBlocks, unsafe); ok && v > 0 && v < 100 {
concurrency = uint(v)
}

// Compile the sharded version of the query
eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}

var (
wg = boundedwaitgroup.New(concurrency)
jobErr = atomic.Error{}
)

for _, b := range blocks {
// If a job errored then quit immediately.
if err := jobErr.Load(); err != nil {
return nil, err
}

start := uint64(b.BlockMeta().StartTime.UnixNano())
end := uint64(b.BlockMeta().EndTime.UnixNano())
if start > req.End || end < req.Start {
// Out of time range
continue
}

wg.Add(1)
go func(b common.BackendBlock) {
defer wg.Done()

m := b.BlockMeta()

span, ctx := opentracing.StartSpanFromContext(ctx, "Processor.QueryRange.Block", opentracing.Tags{
"block": m.BlockID,
"blockSize": m.Size,
})
defer span.Finish()

// TODO - caching
f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return b.Fetch(ctx, req, common.DefaultSearchOptions())
})

err := eval.Do(ctx, f, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()))
if err != nil {
jobErr.Store(err)
}
}(b)
}

wg.Wait()

if err := jobErr.Load(); err != nil {
return nil, err
}

return eval.Results(), nil
}

func (p *Processor) metricsCacheGet(key string) *traceqlmetrics.MetricsResults {
p.cacheMtx.RLock()
defer p.cacheMtx.RUnlock()
Expand Down Expand Up @@ -713,9 +612,26 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
}
}

now := uint32(time.Now().Unix())
// Get trace timestamp bounds
var start, end uint64
for _, b := range tr.Batches {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
}

// Convert from unix nanos to unix seconds
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

err := p.headBlock.AppendTrace(id, tr, now, now)
err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit b29d56c

Please sign in to comment.