diff --git a/CHANGELOG.md b/CHANGELOG.md index b4e712269d2..44f67816624 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio) * [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio) * [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio) +* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio) * [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala) * [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno) * [ENHANCEMENT] Use multiple goroutines to unmarshal responses in parallel in the query frontend. [#3713](https://github.com/grafana/tempo/pull/3713) (@joe-elliott) diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index acc1b99e045..9d4fb09f837 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -112,7 +112,7 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin return pipeline.NewBadRequest(errors.New("invalid interval specified: 0")), nil } - generatorReq := s.generatorRequest(*req, r, tenantID, now, samplingRate) + generatorReq := s.generatorRequest(*req, r, tenantID, now) reqCh := make(chan *http.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics if generatorReq != nil { @@ -370,10 +370,12 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s continue } + start, end := traceql.TrimToOverlap(searchReq.Start, searchReq.End, searchReq.Step, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano())) + queryRangeReq := &tempopb.QueryRangeRequest{ Query: searchReq.Query, - Start: max(searchReq.Start, uint64(m.StartTime.UnixNano())), - End: min(searchReq.End, uint64(m.EndTime.UnixNano())), + Start: start, + End: end, Step: searchReq.Step, // ShardID: uint32, // No sharding with RF=1 // ShardCount: uint32, // No sharding with RF=1 @@ -388,8 +390,6 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s FooterSize: m.FooterSize, DedicatedColumns: dc, } - alignTimeRange(queryRangeReq) - queryRangeReq.End += queryRangeReq.Step subR = api.BuildQueryRangeRequest(subR, queryRangeReq) subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf) @@ -425,7 +425,7 @@ func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, query return start, end } -func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, now time.Time, samplingRate float64) *http.Request { +func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, now time.Time) *http.Request { cutoff := uint64(now.Add(-s.cfg.QueryBackendAfter).UnixNano()) // if there's no overlap between the query and ingester range just return nil diff --git a/modules/generator/config.go b/modules/generator/config.go index 30b5353af6b..9cce671848a 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -43,6 +43,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f) cfg.TracesWAL.Version = encoding.DefaultEncoding().Version() + cfg.TracesWAL.IngestionSlack = 2 * time.Minute // setting default for max span age before discarding to 30s cfg.MetricsIngestionSlack = 30 * time.Second diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 1dafcbe4dbc..21acf6476bf 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -17,11 +17,8 @@ import ( "github.com/grafana/tempo/modules/ingester" "github.com/grafana/tempo/pkg/flushqueues" "github.com/grafana/tempo/tempodb" - "github.com/opentracing/opentracing-go" - "go.uber.org/atomic" gen "github.com/grafana/tempo/modules/generator/processor" - "github.com/grafana/tempo/pkg/boundedwaitgroup" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" @@ -48,6 +45,8 @@ type Processor struct { logger kitlog.Logger Cfg Config wal *wal.WAL + walR backend.Reader + walW backend.Writer closeCh chan struct{} wg sync.WaitGroup cacheMtx sync.RWMutex @@ -90,6 +89,8 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid logger: log.WithUserID(tenant, log.Logger), Cfg: cfg, wal: wal, + walR: backend.NewReader(wal.LocalBackend()), + walW: backend.NewWriter(wal.LocalBackend()), overrides: overrides, enc: enc, walBlocks: map[uuid.UUID]common.WALBlock{}, @@ -494,108 +495,6 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ return resp, nil } -// QueryRange returns metrics. -func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest) (traceql.SeriesSet, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - p.blocksMtx.RLock() - defer p.blocksMtx.RUnlock() - - cutoff := time.Now().Add(-p.Cfg.CompleteBlockTimeout).Add(-timeBuffer) - if req.Start < uint64(cutoff.UnixNano()) { - return nil, fmt.Errorf("time range must be within last %v", p.Cfg.CompleteBlockTimeout) - } - - // Blocks to check - blocks := make([]common.BackendBlock, 0, 1+len(p.walBlocks)+len(p.completeBlocks)) - if p.headBlock != nil { - blocks = append(blocks, p.headBlock) - } - for _, b := range p.walBlocks { - blocks = append(blocks, b) - } - for _, b := range p.completeBlocks { - blocks = append(blocks, b) - } - if len(blocks) == 0 { - return nil, nil - } - - expr, err := traceql.Parse(req.Query) - if err != nil { - return nil, fmt.Errorf("compiling query: %w", err) - } - - unsafe := p.overrides.UnsafeQueryHints(p.tenant) - - timeOverlapCutoff := p.Cfg.Metrics.TimeOverlapCutoff - if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 { - timeOverlapCutoff = v - } - - concurrency := p.Cfg.Metrics.ConcurrentBlocks - if v, ok := expr.Hints.GetInt(traceql.HintConcurrentBlocks, unsafe); ok && v > 0 && v < 100 { - concurrency = uint(v) - } - - // Compile the sharded version of the query - eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe) - if err != nil { - return nil, err - } - - var ( - wg = boundedwaitgroup.New(concurrency) - jobErr = atomic.Error{} - ) - - for _, b := range blocks { - // If a job errored then quit immediately. - if err := jobErr.Load(); err != nil { - return nil, err - } - - start := uint64(b.BlockMeta().StartTime.UnixNano()) - end := uint64(b.BlockMeta().EndTime.UnixNano()) - if start > req.End || end < req.Start { - // Out of time range - continue - } - - wg.Add(1) - go func(b common.BackendBlock) { - defer wg.Done() - - m := b.BlockMeta() - - span, ctx := opentracing.StartSpanFromContext(ctx, "Processor.QueryRange.Block", opentracing.Tags{ - "block": m.BlockID, - "blockSize": m.Size, - }) - defer span.Finish() - - // TODO - caching - f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { - return b.Fetch(ctx, req, common.DefaultSearchOptions()) - }) - - err := eval.Do(ctx, f, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano())) - if err != nil { - jobErr.Store(err) - } - }(b) - } - - wg.Wait() - - if err := jobErr.Load(); err != nil { - return nil, err - } - - return eval.Results(), nil -} - func (p *Processor) metricsCacheGet(key string) *traceqlmetrics.MetricsResults { p.cacheMtx.RLock() defer p.cacheMtx.RUnlock() @@ -713,9 +612,26 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error { } } - now := uint32(time.Now().Unix()) + // Get trace timestamp bounds + var start, end uint64 + for _, b := range tr.Batches { + for _, ss := range b.ScopeSpans { + for _, s := range ss.Spans { + if start == 0 || s.StartTimeUnixNano < start { + start = s.StartTimeUnixNano + } + if s.EndTimeUnixNano > end { + end = s.EndTimeUnixNano + } + } + } + } + + // Convert from unix nanos to unix seconds + startSeconds := uint32(start / uint64(time.Second)) + endSeconds := uint32(end / uint64(time.Second)) - err := p.headBlock.AppendTrace(id, tr, now, now) + err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds) if err != nil { return err } diff --git a/modules/generator/processor/localblocks/query_range.go b/modules/generator/processor/localblocks/query_range.go new file mode 100644 index 00000000000..0be8c437544 --- /dev/null +++ b/modules/generator/processor/localblocks/query_range.go @@ -0,0 +1,265 @@ +package localblocks + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/segmentio/fasthash/fnv1a" + + "github.com/gogo/protobuf/proto" + "github.com/grafana/tempo/modules/ingester" + "github.com/grafana/tempo/pkg/boundedwaitgroup" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/traceql" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/opentracing/opentracing-go" + "go.uber.org/atomic" +) + +// QueryRange returns metrics. +func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest) (traceql.SeriesSet, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + p.blocksMtx.RLock() + defer p.blocksMtx.RUnlock() + + cutoff := time.Now().Add(-p.Cfg.CompleteBlockTimeout).Add(-timeBuffer) + if req.Start < uint64(cutoff.UnixNano()) { + return nil, fmt.Errorf("time range must be within last %v", p.Cfg.CompleteBlockTimeout) + } + + expr, err := traceql.Parse(req.Query) + if err != nil { + return nil, fmt.Errorf("compiling query: %w", err) + } + + unsafe := p.overrides.UnsafeQueryHints(p.tenant) + + timeOverlapCutoff := p.Cfg.Metrics.TimeOverlapCutoff + if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 { + timeOverlapCutoff = v + } + + concurrency := p.Cfg.Metrics.ConcurrentBlocks + if v, ok := expr.Hints.GetInt(traceql.HintConcurrentBlocks, unsafe); ok && v > 0 && v < 100 { + concurrency = uint(v) + } + + e := traceql.NewEngine() + + // Compile the raw version of the query for wal blocks + // These aren't cached and we put them all into the same evaluator + // for efficiency. + eval, err := e.CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe) + if err != nil { + return nil, err + } + + // This is a summation version of the query for complete blocks + // which can be cached. But we need their results separately so they are + // computed separately. + overallEvalMtx := sync.Mutex{} + overallEval, err := traceql.NewEngine().CompileMetricsQueryRangeNonRaw(req, traceql.AggregateModeSum) + if err != nil { + return nil, err + } + + withinRange := func(m *backend.BlockMeta) bool { + start := uint64(m.StartTime.UnixNano()) + end := uint64(m.EndTime.UnixNano()) + return req.Start < end && req.End > start + } + + var ( + wg = boundedwaitgroup.New(concurrency) + jobErr = atomic.Error{} + ) + + if p.headBlock != nil && withinRange(p.headBlock.BlockMeta()) { + wg.Add(1) + go func(w common.WALBlock) { + defer wg.Done() + err := p.queryRangeWALBlock(ctx, w, eval) + if err != nil { + jobErr.Store(err) + } + }(p.headBlock) + } + + for _, w := range p.walBlocks { + if jobErr.Load() != nil { + break + } + + if !withinRange(w.BlockMeta()) { + continue + } + + wg.Add(1) + go func(w common.WALBlock) { + defer wg.Done() + err := p.queryRangeWALBlock(ctx, w, eval) + if err != nil { + jobErr.Store(err) + } + }(w) + } + + for _, b := range p.completeBlocks { + if jobErr.Load() != nil { + break + } + + if !withinRange(b.BlockMeta()) { + continue + } + + wg.Add(1) + go func(b *ingester.LocalBlock) { + defer wg.Done() + resp, err := p.queryRangeCompleteBlock(ctx, b, *req, timeOverlapCutoff, unsafe) + if err != nil { + jobErr.Store(err) + return + } + + overallEvalMtx.Lock() + defer overallEvalMtx.Unlock() + overallEval.ObserveSeries(resp) + }(b) + } + + wg.Wait() + + if err := jobErr.Load(); err != nil { + return nil, err + } + + // Combine the uncacheable results into the overall results + walResults := eval.Results().ToProto(req) + overallEval.ObserveSeries(walResults) + + return overallEval.Results(), nil +} + +func (p *Processor) queryRangeWALBlock(ctx context.Context, b common.WALBlock, eval *traceql.MetricsEvalulator) error { + m := b.BlockMeta() + span, ctx := opentracing.StartSpanFromContext(ctx, "Processor.QueryRange.WALBlock", opentracing.Tags{ + "block": m.BlockID, + "blockSize": m.Size, + }) + defer span.Finish() + + fetcher := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { + return b.Fetch(ctx, req, common.DefaultSearchOptions()) + }) + + return eval.Do(ctx, fetcher, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano())) +} + +func (p *Processor) queryRangeCompleteBlock(ctx context.Context, b *ingester.LocalBlock, req tempopb.QueryRangeRequest, timeOverlapCutoff float64, unsafe bool) ([]*tempopb.TimeSeries, error) { + m := b.BlockMeta() + span, ctx := opentracing.StartSpanFromContext(ctx, "Processor.QueryRange.CompleteBlock", opentracing.Tags{ + "block": m.BlockID, + "blockSize": m.Size, + }) + defer span.Finish() + + // Trim and align the request for this block. I.e. if the request is "Last Hour" we don't want to + // cache the response for that, we want only the few minutes time range for this block. This has + // size savings but the main thing is that the response is reuseable for any overlapping query. + req.Start, req.End = traceql.TrimToOverlap(req.Start, req.End, req.Step, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano())) + + if req.Start >= req.End { + // After alignment there is no overlap or something else isn't right + return nil, nil + } + + cached, name, err := p.queryRangeCacheGet(ctx, m, req) + if err != nil { + return nil, err + } + + span.SetTag("cached", cached != nil) + + if cached != nil { + return cached.Series, nil + } + + // Not in cache or not cacheable, so execute + eval, err := traceql.NewEngine().CompileMetricsQueryRange(&req, false, timeOverlapCutoff, unsafe) + if err != nil { + return nil, err + } + f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { + return b.Fetch(ctx, req, common.DefaultSearchOptions()) + }) + err = eval.Do(ctx, f, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano())) + if err != nil { + return nil, err + } + + results := eval.Results().ToProto(&req) + + if name != "" { + err = p.queryRangeCacheSet(ctx, m, name, &tempopb.QueryRangeResponse{ + Series: results, + }) + if err != nil { + return nil, fmt.Errorf("writing local query cache: %w", err) + } + } + + return results, nil +} + +func (p *Processor) queryRangeCacheGet(ctx context.Context, m *backend.BlockMeta, req tempopb.QueryRangeRequest) (*tempopb.QueryRangeResponse, string, error) { + hash := queryRangeHashForBlock(req) + + name := fmt.Sprintf("cache_query_range_%v.buf", hash) + + data, err := p.walR.Read(ctx, name, m.BlockID, m.TenantID, nil) + if err != nil { + if errors.Is(err, backend.ErrDoesNotExist) { + // Not cached, but return the name/keypath so it can be set after + return nil, name, nil + } + return nil, "", err + } + + resp := &tempopb.QueryRangeResponse{} + err = proto.Unmarshal(data, resp) + if err != nil { + return nil, "", err + } + + return resp, name, nil +} + +func (p *Processor) queryRangeCacheSet(ctx context.Context, m *backend.BlockMeta, name string, resp *tempopb.QueryRangeResponse) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + return p.walW.Write(ctx, name, m.BlockID, m.TenantID, data, nil) +} + +func queryRangeHashForBlock(req tempopb.QueryRangeRequest) uint64 { + h := fnv1a.HashString64(req.Query) + h = fnv1a.AddUint64(h, req.Start) + h = fnv1a.AddUint64(h, req.End) + h = fnv1a.AddUint64(h, req.Step) + + // TODO - caching for WAL blocks + // Including trace count means we can safely cache results + // for wal blocks which might receive new data + // h = fnv1a.AddUint64(h, m.TotalObjects) + + return h +} diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index 690a7954628..f5d9306cd8e 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -71,6 +71,15 @@ func IntervalOfMs(tsmills int64, start, end, step uint64) int { return IntervalOf(ts, start, end, step) } +// TrimToOverlap returns the aligned overlap between the two given time ranges. +func TrimToOverlap(start1, end1, step, start2, end2 uint64) (uint64, uint64) { + start1 = max(start1, start2) + end1 = min(end1, end2) + start1 = (start1 / step) * step + end1 = (end1/step)*step + step + return start1, end1 +} + type Label struct { Name string Value Static diff --git a/pkg/traceql/engine_metrics_test.go b/pkg/traceql/engine_metrics_test.go index 936fcba4d5b..ce71eec7396 100644 --- a/pkg/traceql/engine_metrics_test.go +++ b/pkg/traceql/engine_metrics_test.go @@ -93,6 +93,51 @@ func TestIntervalOf(t *testing.T) { } } +func TestTrimToOverlap(t *testing.T) { + tc := []struct { + start1, end1, start2, end2 string + step time.Duration + expectedStart, expectedEnd string + }{ + { + // Inner range of 33 to 38 + // gets rounded at 5m intervals to 30 to 40 + "2024-01-01 01:00:00", "2024-01-01 02:00:00", + "2024-01-01 01:33:00", "2024-01-01 01:38:00", + 5 * time.Minute, + "2024-01-01 01:30:00", "2024-01-01 01:40:00", + }, + { + // Partially Overlapping + // Overlap between 1:01-2:01 and 1:31-2:31 + // in 5m intervals is only 1:30-2:05 + // Start is pushed back + // and end is pushed out + "2024-01-01 01:01:00", "2024-01-01 02:01:00", + "2024-01-01 01:31:00", "2024-01-01 02:31:00", + 5 * time.Minute, + "2024-01-01 01:30:00", "2024-01-01 02:05:00", + }, + } + + for _, c := range tc { + start1, _ := time.Parse(time.DateTime, c.start1) + end1, _ := time.Parse(time.DateTime, c.end1) + start2, _ := time.Parse(time.DateTime, c.start2) + end2, _ := time.Parse(time.DateTime, c.end2) + + actualStart, actualEnd := TrimToOverlap( + uint64(start1.UnixNano()), + uint64(end1.UnixNano()), + uint64(c.step.Nanoseconds()), + uint64(start2.UnixNano()), + uint64(end2.UnixNano())) + + require.Equal(t, c.expectedStart, time.Unix(0, int64(actualStart)).UTC().Format(time.DateTime)) + require.Equal(t, c.expectedEnd, time.Unix(0, int64(actualEnd)).UTC().Format(time.DateTime)) + } +} + func TestTimeRangeOverlap(t *testing.T) { tc := []struct { reqStart, reqEnd, dataStart, dataEnd uint64