diff --git a/CHANGELOG.md b/CHANGELOG.md index ee36fb74765..47a09527f26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## main / unreleased * [FEATURE] Add TLS support for Memcached Client [#3585](https://github.com/grafana/tempo/pull/3585) (@sonisr) +* [FEATURE] TraceQL metrics queries: add quantile_over_time [#3605](https://github.com/grafana/tempo/pull/3605) [#3633](https://github.com/grafana/tempo/pull/3633) (@mdisibio) * [ENHANCEMENT] Add querier metrics for requests executed [#3524](https://github.com/grafana/tempo/pull/3524) (@electron0zero) * [FEATURE] Added gRPC streaming endpoints for Tempo APIs. * Added gRPC streaming endpoints for all tag queries. [#3460](https://github.com/grafana/tempo/pull/3460) (@joe-elliott) diff --git a/modules/frontend/combiner/metrics_query_range.go b/modules/frontend/combiner/metrics_query_range.go index a2df8fcdd7a..81cf18649a8 100644 --- a/modules/frontend/combiner/metrics_query_range.go +++ b/modules/frontend/combiner/metrics_query_range.go @@ -11,8 +11,11 @@ import ( var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil) // NewQueryRange returns a query range combiner. -func NewQueryRange() Combiner { - combiner := traceql.QueryRangeCombiner{} +func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) { + combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal) + if err != nil { + return nil, err + } return &genericCombiner[*tempopb.QueryRangeResponse]{ httpStatusCode: 200, @@ -65,11 +68,16 @@ func NewQueryRange() Combiner { sortResponse(resp) return resp, nil }, - } + }, nil } -func NewTypedQueryRange() GRPCCombiner[*tempopb.QueryRangeResponse] { - return NewQueryRange().(GRPCCombiner[*tempopb.QueryRangeResponse]) +func NewTypedQueryRange(req *tempopb.QueryRangeRequest) (GRPCCombiner[*tempopb.QueryRangeResponse], error) { + c, err := NewQueryRange(req) + if err != nil { + return nil, err + } + + return c.(GRPCCombiner[*tempopb.QueryRangeResponse]), nil } func sortResponse(res *tempopb.QueryRangeResponse) { diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index b19482eda3e..69058521e78 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -38,14 +38,18 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp start := time.Now() var finalResponse *tempopb.QueryRangeResponse - c := combiner.NewTypedQueryRange() + c, err := combiner.NewTypedQueryRange(req) + if err != nil { + return err + } + collector := pipeline.NewGRPCCollector(next, c, func(qrr *tempopb.QueryRangeResponse) error { finalResponse = qrr // sadly we can't pass srv.Send directly into the collector. we need bytesProcessed for the SLO calculations return srv.Send(qrr) }) logQueryRangeRequest(logger, tenant, req) - err := collector.RoundTrip(httpReq) + err = collector.RoundTrip(httpReq) duration := time.Since(start) bytesProcessed := uint64(0) @@ -80,7 +84,15 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper logQueryRangeRequest(logger, tenant, queryRangeReq) // build and use roundtripper - combiner := combiner.NewTypedQueryRange() + combiner, err := combiner.NewTypedQueryRange(queryRangeReq) + if err != nil { + level.Error(logger).Log("msg", "query range: query range combiner failed", "err", err) + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Status: http.StatusText(http.StatusInternalServerError), + Body: io.NopCloser(strings.NewReader(err.Error())), + }, nil + } rt := pipeline.NewHTTPCollector(next, combiner) resp, err := rt.RoundTrip(req) diff --git a/modules/frontend/metrics_query_range_handler_test.go b/modules/frontend/metrics_query_range_handler_test.go index df4e608a3ad..299ec41f35c 100644 --- a/modules/frontend/metrics_query_range_handler_test.go +++ b/modules/frontend/metrics_query_range_handler_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/dskit/user" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" "github.com/stretchr/testify/require" ) @@ -22,13 +23,16 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) { Series: []*tempopb.TimeSeries{ { PromLabels: "foo", + Labels: []v1.KeyValue{ + {Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}}, + }, Samples: []tempopb.Sample{ { - TimestampMs: 2, + TimestampMs: 1200_000, Value: 2, }, { - TimestampMs: 1, + TimestampMs: 1100_000, Value: 1, }, }, @@ -40,15 +44,17 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) { responseFn: func() proto.Message { return resp }, - }, nil, nil, nil) + }, nil, nil, nil, func(c *Config) { + c.Metrics.Sharder.Interval = time.Hour + }) tenant := "foo" httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil) httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{ Query: "{} | rate()", - Start: 1, - End: uint64(10000 * time.Second), - Step: uint64(1 * time.Second), + Start: uint64(1100 * time.Second), + End: uint64(1200 * time.Second), + Step: uint64(100 * time.Second), }) ctx := user.InjectOrgID(httpReq.Context(), tenant) @@ -63,24 +69,27 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) { // for reasons I don't understand, this query turns into 408 jobs. expectedResp := &tempopb.QueryRangeResponse{ Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 408, - InspectedTraces: 408, - InspectedBytes: 408, - TotalJobs: 408, + CompletedJobs: 4, + InspectedTraces: 4, + InspectedBytes: 4, + TotalJobs: 4, TotalBlocks: 2, TotalBlockBytes: 419430400, }, Series: []*tempopb.TimeSeries{ { PromLabels: "foo", + Labels: []v1.KeyValue{ + {Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}}, + }, Samples: []tempopb.Sample{ { - TimestampMs: 1, - Value: 408, + TimestampMs: 1100_000, + Value: 4, }, { - TimestampMs: 2, - Value: 816, + TimestampMs: 1200_000, + Value: 8, }, }, }, @@ -102,13 +111,16 @@ func TestQueryRangeHandlerRespectsSamplingRate(t *testing.T) { Series: []*tempopb.TimeSeries{ { PromLabels: "foo", + Labels: []v1.KeyValue{ + {Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}}, + }, Samples: []tempopb.Sample{ { - TimestampMs: 2, + TimestampMs: 1200_000, Value: 2, }, { - TimestampMs: 1, + TimestampMs: 1100_000, Value: 1, }, }, @@ -120,15 +132,17 @@ func TestQueryRangeHandlerRespectsSamplingRate(t *testing.T) { responseFn: func() proto.Message { return resp }, - }, nil, nil, nil) + }, nil, nil, nil, func(c *Config) { + c.Metrics.Sharder.Interval = time.Hour + }) tenant := "foo" httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil) httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{ Query: "{} | rate() with (sample=.2)", - Start: 1, - End: uint64(10000 * time.Second), - Step: uint64(1 * time.Second), + Start: uint64(1100 * time.Second), + End: uint64(1200 * time.Second), + Step: uint64(100 * time.Second), }) ctx := user.InjectOrgID(httpReq.Context(), tenant) @@ -140,27 +154,29 @@ func TestQueryRangeHandlerRespectsSamplingRate(t *testing.T) { require.Equal(t, 200, httpResp.Code) - // for reasons I don't understand, this query turns into 408 jobs. expectedResp := &tempopb.QueryRangeResponse{ Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 102, - InspectedTraces: 102, - InspectedBytes: 102, - TotalJobs: 102, + CompletedJobs: 1, + InspectedTraces: 1, + InspectedBytes: 1, + TotalJobs: 1, TotalBlocks: 2, TotalBlockBytes: 419430400, }, Series: []*tempopb.TimeSeries{ { PromLabels: "foo", + Labels: []v1.KeyValue{ + {Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}}, + }, Samples: []tempopb.Sample{ { - TimestampMs: 1, - Value: 510, + TimestampMs: 1100_000, + Value: 5, }, { - TimestampMs: 2, - Value: 1020, + TimestampMs: 1200_000, + Value: 10, }, }, }, diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 13baddab9bd..1551134a43a 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -68,7 +68,7 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin return pipeline.NewBadRequest(err), nil } - expr, err := traceql.Parse(req.Query) + expr, _, _, _, err := traceql.NewEngine().Compile(req.Query) if err != nil { return pipeline.NewBadRequest(err), nil } diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 5df2b6ece96..6a288b00d37 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -659,7 +659,9 @@ func cacheResponsesEqual(t *testing.T, cacheResponse *tempopb.SearchResponse, pi // frontendWithSettings returns a new frontend with the given settings. any nil options // are given "happy path" defaults -func frontendWithSettings(t *testing.T, next http.RoundTripper, rdr tempodb.Reader, cfg *Config, cacheProvider cache.Provider) *QueryFrontend { +func frontendWithSettings(t *testing.T, next http.RoundTripper, rdr tempodb.Reader, cfg *Config, cacheProvider cache.Provider, + opts ...func(*Config), +) *QueryFrontend { if next == nil { next = &mockRoundTripper{ responseFn: func() proto.Message { @@ -721,6 +723,10 @@ func frontendWithSettings(t *testing.T, next http.RoundTripper, rdr tempodb.Read } } + for _, o := range opts { + o(cfg) + } + o, err := overrides.NewOverrides(overrides.Config{}, nil, prometheus.DefaultRegisterer) require.NoError(t, err) diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index d19be0dadfd..0a872253e1c 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -466,6 +466,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque concurrency = uint(v) } + // Compile the sharded version of the query eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe) if err != nil { return nil, err @@ -519,7 +520,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque return nil, err } - return eval.Results() + return eval.Results(), nil } func (p *Processor) metricsCacheGet(key string) *traceqlmetrics.MetricsResults { diff --git a/modules/querier/querier_query_range.go b/modules/querier/querier_query_range.go index f1e7cd4722a..43308c43859 100644 --- a/modules/querier/querier_query_range.go +++ b/modules/querier/querier_query_range.go @@ -47,7 +47,11 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR return nil, fmt.Errorf("error querying generators in Querier.MetricsQueryRange: %w", err) } - c := traceql.QueryRangeCombiner{} + c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum) + if err != nil { + return nil, err + } + for _, result := range lookupResults { c.Combine(result.response.(*tempopb.QueryRangeResponse)) } @@ -98,6 +102,7 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque concurrency = v } + // Compile the sharded version of the query eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, dedupe, timeOverlapCutoff, unsafe) if err != nil { return nil, err @@ -139,10 +144,7 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque return nil, err } - res, err := eval.Results() - if err != nil { - return nil, err - } + res := eval.Results() inspectedBytes, spansTotal, _ := eval.Metrics() diff --git a/pkg/traceql/ast.go b/pkg/traceql/ast.go index b2543681758..4158cc76dc2 100644 --- a/pkg/traceql/ast.go +++ b/pkg/traceql/ast.go @@ -17,8 +17,9 @@ type Element interface { type metricsFirstStageElement interface { Element extractConditions(request *FetchSpansRequest) - init(*tempopb.QueryRangeRequest) - observe(Span) // TODO - batching? + init(req *tempopb.QueryRangeRequest, mode AggregateMode) + observe(Span) // TODO - batching? + observeSeries([]*tempopb.TimeSeries) // Re-entrant metrics on the query-frontend. Using proto version for efficiency result() SeriesSet } @@ -756,13 +757,15 @@ var ( // MetricsAggregate is a placeholder in the AST for a metrics aggregation // pipeline element. It has a superset of the properties of them all, and // builds them later via init() so that appropriate buffers can be allocated -// for the query time range and step. +// for the query time range and step, and different implementations for +// shardable and unshardable pipelines. type MetricsAggregate struct { - op MetricsAggregateOp - by []Attribute - attr Attribute - floats []float64 - agg SpanAggregator + op MetricsAggregateOp + by []Attribute + attr Attribute + floats []float64 + agg SpanAggregator + seriesAgg SeriesAggregator } func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggregate { @@ -785,6 +788,12 @@ func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) { switch a.op { case metricsAggregateRate, metricsAggregateCountOverTime: // No extra conditions, start time is already enough + case metricsAggregateQuantileOverTime: + if !request.HasAttribute(a.attr) { + request.SecondPassConditions = append(request.SecondPassConditions, Condition{ + Attribute: a.attr, + }) + } } for _, b := range a.by { @@ -796,33 +805,119 @@ func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) { } } -func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest) { +func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode) { + switch mode { + case AggregateModeSum: + a.initSum(q) + return + + case AggregateModeFinal: + a.initFinal(q) + return + } + + // Raw mode: + var innerAgg func() VectorAggregator + var byFunc func(Span) (Static, bool) + var byFuncLabel string switch a.op { case metricsAggregateCountOverTime: innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() } + case metricsAggregateRate: innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) } + + case metricsAggregateQuantileOverTime: + // Quantiles are implemented as count_over_time() by(log2(attr)) for now + innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() } + byFuncLabel = internalLabelBucket + switch a.attr { + case IntrinsicDurationAttribute: + // Optimal implementation for duration attribute + byFunc = func(s Span) (Static, bool) { + d := s.DurationNanos() + if d < 2 { + return Static{}, false + } + // Bucket is in seconds + return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true + } + default: + // Basic implementation for all other attributes + byFunc = func(s Span) (Static, bool) { + v, ok := s.AttributeFor(a.attr) + if !ok { + return Static{}, false + } + + // TODO(mdisibio) - Add quantile support for floats + if v.Type != TypeInt { + return Static{}, false + } + + if v.N < 2 { + return Static{}, false + } + return NewStaticFloat(Log2Bucketize(uint64(v.N))), true + } + } } a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator { return NewStepAggregator(q.Start, q.End, q.Step, innerAgg) - }, a.by) + }, a.by, byFunc, byFuncLabel) +} + +func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) { + // Currently all metrics are summed by job to produce + // intermediate results. This will change when adding min/max/topk/etc + a.seriesAgg = NewSimpleAdditionCombiner(q) +} + +func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) { + switch a.op { + case metricsAggregateQuantileOverTime: + a.seriesAgg = NewHistogramAggregator(q, a.floats) + default: + // These are simple additions by series + a.seriesAgg = NewSimpleAdditionCombiner(q) + } } func (a *MetricsAggregate) observe(span Span) { a.agg.Observe(span) } +func (a *MetricsAggregate) observeSeries(ss []*tempopb.TimeSeries) { + a.seriesAgg.Combine(ss) +} + func (a *MetricsAggregate) result() SeriesSet { - return a.agg.Series() + if a.agg != nil { + return a.agg.Series() + } + + // In the frontend-version the results come from + // the job-level aggregator + return a.seriesAgg.Results() } func (a *MetricsAggregate) validate() error { switch a.op { case metricsAggregateCountOverTime: case metricsAggregateRate: + case metricsAggregateQuantileOverTime: + if len(a.by) >= maxGroupBys { + // We reserve a spot for the bucket so quantile has 1 less group by + return newUnsupportedError(fmt.Sprintf("metrics group by %v values", len(a.by))) + } + for _, q := range a.floats { + if q < 0 || q > 1 { + return fmt.Errorf("quantile must be between 0 and 1: %v", q) + } + } default: return newUnsupportedError(fmt.Sprintf("metrics aggregate operation (%v)", a.op)) } diff --git a/pkg/traceql/ast_stringer.go b/pkg/traceql/ast_stringer.go index 2bf8bafdd1e..d7c846f4aa7 100644 --- a/pkg/traceql/ast_stringer.go +++ b/pkg/traceql/ast_stringer.go @@ -85,7 +85,7 @@ func (n Static) EncodeToString(quotes bool) string { case TypeInt: return strconv.Itoa(n.N) case TypeFloat: - return strconv.FormatFloat(n.F, 'f', 5, 64) + return strconv.FormatFloat(n.F, 'g', -1, 64) case TypeString: if quotes { return "`" + n.S + "`" diff --git a/pkg/traceql/ast_test.go b/pkg/traceql/ast_test.go index 2019602aaa8..8c2ef13f172 100644 --- a/pkg/traceql/ast_test.go +++ b/pkg/traceql/ast_test.go @@ -359,6 +359,16 @@ func newMockSpan(id []byte) *mockSpan { } } +func (m *mockSpan) WithStartTime(nanos uint64) *mockSpan { + m.startTimeUnixNanos = nanos + return m +} + +func (m *mockSpan) WithDuration(nanos uint64) *mockSpan { + m.durationNanos = nanos + return m +} + func (m *mockSpan) WithNestedSetInfo(parentid, left, right int) *mockSpan { m.parentID = parentid m.left = left @@ -366,6 +376,11 @@ func (m *mockSpan) WithNestedSetInfo(parentid, left, right int) *mockSpan { return m } +func (m *mockSpan) WithSpanString(key string, value string) *mockSpan { + m.attributes[NewScopedAttribute(AttributeScopeSpan, false, key)] = NewStaticString(value) + return m +} + func (m *mockSpan) WithAttrBool(key string, value bool) *mockSpan { m.attributes[NewAttribute(key)] = NewStaticBool(value) return m diff --git a/pkg/traceql/combine.go b/pkg/traceql/combine.go index 4fd3a7f5121..767d8c1fec5 100644 --- a/pkg/traceql/combine.go +++ b/pkg/traceql/combine.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/grafana/tempo/pkg/tempopb" - "golang.org/x/exp/maps" ) type MetadataCombiner struct { @@ -127,33 +126,31 @@ func spansetID(ss *tempopb.SpanSet) string { } type QueryRangeCombiner struct { - ts map[string]*tempopb.TimeSeries + req *tempopb.QueryRangeRequest + eval *MetricsFrontendEvaluator metrics *tempopb.SearchMetrics } -func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) { - if resp == nil { - return - } - - if q.ts == nil { - q.ts = make(map[string]*tempopb.TimeSeries, len(resp.Series)) +func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode) (*QueryRangeCombiner, error) { + eval, err := NewEngine().CompileMetricsQueryRangeNonRaw(req, mode) + if err != nil { + return nil, err } - for _, series := range resp.Series { - - existing, ok := q.ts[series.PromLabels] - if !ok { - q.ts[series.PromLabels] = series - continue - } + return &QueryRangeCombiner{ + req: req, + eval: eval, + metrics: &tempopb.SearchMetrics{}, + }, nil +} - q.combine(series, existing) +func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) { + if resp == nil { + return } - if q.metrics == nil { - q.metrics = &tempopb.SearchMetrics{} - } + // Here is where the job results are reentered into the pipeline + q.eval.ObserveSeries(resp.Series) if resp.Metrics != nil { q.metrics.TotalJobs += resp.Metrics.TotalJobs @@ -166,26 +163,9 @@ func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) { } } -func (QueryRangeCombiner) combine(in *tempopb.TimeSeries, out *tempopb.TimeSeries) { -outer: - for _, sample := range in.Samples { - for i, existing := range out.Samples { - if sample.TimestampMs == existing.TimestampMs { - out.Samples[i].Value += sample.Value - continue outer - } - } - - out.Samples = append(out.Samples, sample) - } -} - func (q *QueryRangeCombiner) Response() *tempopb.QueryRangeResponse { - if q.metrics == nil { - q.metrics = &tempopb.SearchMetrics{} - } return &tempopb.QueryRangeResponse{ - Series: maps.Values(q.ts), + Series: q.eval.Results().ToProto(q.req), Metrics: q.metrics, } } diff --git a/pkg/traceql/engine.go b/pkg/traceql/engine.go index f98e915c65c..284a0cfee02 100644 --- a/pkg/traceql/engine.go +++ b/pkg/traceql/engine.go @@ -365,3 +365,18 @@ func (s Static) AsAnyValue() *common_v1.AnyValue { }, } } + +func StaticFromAnyValue(a *common_v1.AnyValue) Static { + switch v := a.Value.(type) { + case *common_v1.AnyValue_StringValue: + return NewStaticString(v.StringValue) + case *common_v1.AnyValue_IntValue: + return NewStaticInt(int(v.IntValue)) + case *common_v1.AnyValue_BoolValue: + return NewStaticBool(v.BoolValue) + case *common_v1.AnyValue_DoubleValue: + return NewStaticFloat(v.DoubleValue) + default: + return NewStaticNil() + } +} diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index a3266eb4c84..6a733a71a7c 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -7,15 +7,20 @@ import ( "fmt" "hash" "hash/fnv" + "math" + "sort" "sync" "time" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/tempo/pkg/tempopb" + commonv1proto "github.com/grafana/tempo/pkg/tempopb/common/v1" "github.com/grafana/tempo/pkg/util" ) +const internalLabelBucket = "__bucket" + func DefaultQueryRangeStep(start, end uint64) uint64 { delta := time.Duration(end - start) @@ -60,11 +65,39 @@ func IntervalOf(ts, start, end, step uint64) int { return int((ts - start) / step) } +// IntervalOfMs is the same as IntervalOf except the input timestamp is in unix milliseconds. +func IntervalOfMs(tsmills int64, start, end, step uint64) int { + ts := uint64(time.Duration(tsmills) * time.Millisecond) + return IntervalOf(ts, start, end, step) +} + type Label struct { Name string Value Static } +type Labels []Label + +// String returns the prometheus-formatted version of the labels. Which is downcasting +// the typed TraceQL values to strings, with some special casing. +func (ls Labels) String() string { + promLabels := labels.NewBuilder(nil) + for _, l := range ls { + var promValue string + switch { + case l.Value.Type == TypeNil: + promValue = "" + case l.Value.Type == TypeString && l.Value.S == "": + promValue = "" + default: + promValue = l.Value.EncodeToString(false) + } + promLabels.Set(l.Name, promValue) + } + + return promLabels.Labels().String() +} + type TimeSeries struct { Labels []Label Values []float64 @@ -74,6 +107,42 @@ type TimeSeries struct { // text description: {x="a",y="b"} for convenience. type SeriesSet map[string]TimeSeries +func (set SeriesSet) ToProto(req *tempopb.QueryRangeRequest) []*tempopb.TimeSeries { + resp := make([]*tempopb.TimeSeries, 0, len(set)) + + for promLabels, s := range set { + labels := make([]commonv1proto.KeyValue, 0, len(s.Labels)) + for _, label := range s.Labels { + labels = append(labels, + commonv1proto.KeyValue{ + Key: label.Name, + Value: label.Value.AsAnyValue(), + }, + ) + } + + intervals := IntervalCount(req.Start, req.End, req.Step) + samples := make([]tempopb.Sample, 0, intervals) + for i, value := range s.Values { + ts := TimestampOf(uint64(i), req.Start, req.Step) + samples = append(samples, tempopb.Sample{ + TimestampMs: time.Unix(0, int64(ts)).UnixMilli(), + Value: value, + }) + } + + ss := &tempopb.TimeSeries{ + PromLabels: promLabels, + Labels: labels, + Samples: samples, + } + + resp = append(resp, ss) + } + + return resp +} + // VectorAggregator turns a vector of spans into a single numeric scalar type VectorAggregator interface { Observe(s Span) @@ -176,9 +245,11 @@ type FastValues [maxGroupBys]Static // GroupingAggregator groups spans into series based on attribute values. type GroupingAggregator struct { // Config - by []Attribute // Original attributes: .foo - byLookups [][]Attribute // Lookups: span.foo resource.foo - innerAgg func() RangeAggregator + by []Attribute // Original attributes: .foo + byLookups [][]Attribute // Lookups: span.foo resource.foo + byFunc func(Span) (Static, bool) // Dynamic label calculated by a callback + byFuncLabel string // Name of the dynamic label + innerAgg func() RangeAggregator // Data series map[FastValues]RangeAggregator @@ -187,8 +258,8 @@ type GroupingAggregator struct { var _ SpanAggregator = (*GroupingAggregator)(nil) -func NewGroupingAggregator(aggName string, innerAgg func() RangeAggregator, by []Attribute) SpanAggregator { - if len(by) == 0 { +func NewGroupingAggregator(aggName string, innerAgg func() RangeAggregator, by []Attribute, byFunc func(Span) (Static, bool), byFuncLabel string) SpanAggregator { + if len(by) == 0 && byFunc == nil { return &UngroupedAggregator{ name: aggName, innerAgg: innerAgg(), @@ -210,10 +281,12 @@ func NewGroupingAggregator(aggName string, innerAgg func() RangeAggregator, by [ } return &GroupingAggregator{ - series: map[FastValues]RangeAggregator{}, - by: by, - byLookups: lookups, - innerAgg: innerAgg, + series: map[FastValues]RangeAggregator{}, + by: by, + byFunc: byFunc, + byFuncLabel: byFuncLabel, + byLookups: lookups, + innerAgg: innerAgg, } } @@ -222,10 +295,22 @@ func NewGroupingAggregator(aggName string, innerAgg func() RangeAggregator, by [ func (g *GroupingAggregator) Observe(span Span) { // Get grouping values // Reuse same buffer + // There is no need to reset, the number of group-by attributes + // is fixed after creation. for i, lookups := range g.byLookups { g.buf[i] = lookup(lookups, span) } + // If dynamic label exists calculate and append it + if g.byFunc != nil { + v, ok := g.byFunc(span) + if !ok { + // Totally drop this span + return + } + g.buf[len(g.byLookups)] = v + } + agg, ok := g.series[g.buf] if !ok { agg = g.innerAgg() @@ -264,30 +349,24 @@ func (g *GroupingAggregator) Observe(span Span) { // // Ex: rate() by (x,y,z) and all nil yields: // {x="nil"} -func (g *GroupingAggregator) labelsFor(vals FastValues) ([]Label, string) { - tempoLabels := make([]Label, 0, len(g.by)) - for i, v := range vals { - if v.Type == TypeNil { +func (g *GroupingAggregator) labelsFor(vals FastValues) (Labels, string) { + labels := make(Labels, 0, len(g.by)+1) + for i := range g.by { + if vals[i].Type == TypeNil { continue } - tempoLabels = append(tempoLabels, Label{g.by[i].String(), v}) + labels = append(labels, Label{g.by[i].String(), vals[i]}) } - - // Prometheus-style version for convenience - promLabels := labels.NewBuilder(nil) - for _, l := range tempoLabels { - promValue := l.Value.EncodeToString(false) - if promValue == "" { - promValue = "" - } - promLabels.Set(l.Name, promValue) + if g.byFunc != nil { + labels = append(labels, Label{g.byFuncLabel, vals[len(g.by)]}) } - // When all nil then force one. - if promLabels.Labels().IsEmpty() { - promLabels.Set(g.by[0].String(), "") + + if len(labels) == 0 { + // When all nil then force one + labels = append(labels, Label{g.by[0].String(), NewStaticNil()}) } - return tempoLabels, promLabels.Labels().String() + return labels, labels.String() } func (g *GroupingAggregator) Series() SeriesSet { @@ -331,6 +410,36 @@ func (u *UngroupedAggregator) Series() SeriesSet { } } +func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest, mode AggregateMode) (*MetricsFrontendEvaluator, error) { + if req.Start <= 0 { + return nil, fmt.Errorf("start required") + } + if req.End <= 0 { + return nil, fmt.Errorf("end required") + } + if req.End <= req.Start { + return nil, fmt.Errorf("end must be greater than start") + } + if req.Step <= 0 { + return nil, fmt.Errorf("step required") + } + + _, _, metricsPipeline, _, err := e.Compile(req.Query) + if err != nil { + return nil, fmt.Errorf("compiling query: %w", err) + } + + if metricsPipeline == nil { + return nil, fmt.Errorf("not a metrics query") + } + + metricsPipeline.init(req, mode) + + return &MetricsFrontendEvaluator{ + metricsPipeline: metricsPipeline, + }, nil +} + // CompileMetricsQueryRange returns an evalulator that can be reused across multiple data sources. // Dedupe spans parameter is an indicator of whether to expect duplicates in the datasource. For // example if the datasource is replication factor=1 or only a single block then we know there @@ -363,7 +472,7 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, dedupe } // This initializes all step buffers, counters, etc - metricsPipeline.init(req) + metricsPipeline.init(req, AggregateModeRaw) me := &MetricsEvalulator{ storageReq: storageReq, @@ -415,25 +524,42 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, dedupe me.start = req.Start me.end = req.End - optimize(storageReq) - - // Setup second pass callback. Only as needed. - if len(storageReq.SecondPassConditions) > 0 { - storageReq.SecondPass = func(s *Spanset) ([]*Spanset, error) { - // The traceql engine isn't thread-safe. - // But parallelization is required for good metrics performance. - // So we do external locking here. - me.mtx.Lock() - defer me.mtx.Unlock() - return eval([]*Spanset{s}) - } + // Setup second pass callback. It might be optimized away + storageReq.SecondPass = func(s *Spanset) ([]*Spanset, error) { + // The traceql engine isn't thread-safe. + // But parallelization is required for good metrics performance. + // So we do external locking here. + me.mtx.Lock() + defer me.mtx.Unlock() + return eval([]*Spanset{s}) } + optimize(storageReq) + return me, nil } // optimize numerous things within the request that is specific to metrics. func optimize(req *FetchSpansRequest) { + if !req.AllConditions { + return + } + + // There is an issue where multiple conditions &&'ed on the same + // attribute can look like AllConditions==true, but are implemented + // in the storage layer like ||'ed and require the second pass callback (engine). + // TODO(mdisibio) - This would be a big performance improvement if we can fix the storage layer + // Example: + // { span.http.status_code >= 500 && span.http.status_code < 600 } | rate() by (span.http.status_code) + exists := make(map[Attribute]struct{}, len(req.Conditions)) + for _, c := range req.Conditions { + if _, ok := exists[c.Attribute]; ok { + // Don't optimize + return + } + exists[c.Attribute] = struct{}{} + } + // Special optimization for queries like: // {} | rate() // {} | rate() by (rootName) @@ -442,7 +568,7 @@ func optimize(req *FetchSpansRequest) { // move them to the first pass and increase performance. It avoids the second pass/bridge // layer and doesn't alter the correctness of the query. // This can't be done for plain attributes or in all cases. - if req.AllConditions && len(req.SecondPassConditions) > 0 { + if len(req.SecondPassConditions) > 0 { secondLayerAlwaysPresent := true for _, cond := range req.SecondPassConditions { if cond.Attribute.Intrinsic != IntrinsicNone { @@ -461,6 +587,7 @@ func optimize(req *FetchSpansRequest) { if secondLayerAlwaysPresent { // Move all to first pass req.Conditions = append(req.Conditions, req.SecondPassConditions...) + req.SecondPass = nil req.SecondPassConditions = nil } } @@ -588,8 +715,8 @@ func (e *MetricsEvalulator) Metrics() (uint64, uint64, uint64) { return e.bytes, e.spansTotal, e.spansDeduped } -func (e *MetricsEvalulator) Results() (SeriesSet, error) { - return e.metricsPipeline.result(), nil +func (e *MetricsEvalulator) Results() SeriesSet { + return e.metricsPipeline.result() } // SpanDeduper2 is EXTREMELY LAZY. It attempts to dedupe spans for metrics @@ -633,3 +760,281 @@ func (d *SpanDeduper2) Skip(tid []byte, startTime uint64) bool { m[v] = struct{}{} return false } + +// MetricsFrontendEvaluator pipes the sharded job results back into the engine for the rest +// of the pipeline. i.e. This evaluator is for the query-frontend. +type MetricsFrontendEvaluator struct { + metricsPipeline metricsFirstStageElement +} + +func (m *MetricsFrontendEvaluator) ObserveSeries(in []*tempopb.TimeSeries) { + m.metricsPipeline.observeSeries(in) +} + +func (m *MetricsFrontendEvaluator) Results() SeriesSet { + return m.metricsPipeline.result() +} + +type SeriesAggregator interface { + Combine([]*tempopb.TimeSeries) + Results() SeriesSet +} + +type SimpleAdditionAggregator struct { + ss SeriesSet + len int + start, end, step uint64 +} + +func NewSimpleAdditionCombiner(req *tempopb.QueryRangeRequest) *SimpleAdditionAggregator { + return &SimpleAdditionAggregator{ + ss: make(SeriesSet), + len: IntervalCount(req.Start, req.End, req.Step), + start: req.Start, + end: req.End, + step: req.Step, + } +} + +func (b *SimpleAdditionAggregator) Combine(in []*tempopb.TimeSeries) { + for _, ts := range in { + existing, ok := b.ss[ts.PromLabels] + if !ok { + // Convert proto labels to traceql labels + labels := make(Labels, 0, len(ts.Labels)) + for _, l := range ts.Labels { + labels = append(labels, Label{ + Name: l.Key, + Value: StaticFromAnyValue(l.Value), + }) + } + + existing = TimeSeries{ + Labels: labels, + Values: make([]float64, b.len), + } + b.ss[ts.PromLabels] = existing + } + + for _, sample := range ts.Samples { + j := IntervalOfMs(sample.TimestampMs, b.start, b.end, b.step) + if j >= 0 && j < len(existing.Values) { + existing.Values[j] += sample.Value + } + } + } +} + +func (b *SimpleAdditionAggregator) Results() SeriesSet { + return b.ss +} + +type HistogramBucket struct { + Max float64 + Count int +} + +type Histogram struct { + Buckets []HistogramBucket +} + +func (h *Histogram) Record(bucket float64, count int) { + for i := range h.Buckets { + if h.Buckets[i].Max == bucket { + h.Buckets[i].Count += count + return + } + } + + h.Buckets = append(h.Buckets, HistogramBucket{ + Max: bucket, + Count: count, + }) +} + +type histSeries struct { + labels Labels + hist []Histogram +} + +type HistogramAggregator struct { + ss map[string]histSeries + qs []float64 + len int + start, end, step uint64 +} + +func NewHistogramAggregator(req *tempopb.QueryRangeRequest, qs []float64) *HistogramAggregator { + return &HistogramAggregator{ + qs: qs, + ss: make(map[string]histSeries), + len: IntervalCount(req.Start, req.End, req.Step), + start: req.Start, + end: req.End, + step: req.Step, + } +} + +func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) { + // var min, max time.Time + + for _, ts := range in { + // Convert proto labels to traceql labels + // while at the same time stripping the bucket label + withoutBucket := make(Labels, 0, len(ts.Labels)) + var bucket Static + for _, l := range ts.Labels { + if l.Key == internalLabelBucket { + // bucket = int(l.Value.GetIntValue()) + bucket = StaticFromAnyValue(l.Value) + continue + } + withoutBucket = append(withoutBucket, Label{ + Name: l.Key, + Value: StaticFromAnyValue(l.Value), + }) + } + + if bucket.Type == TypeNil { + // Bad __bucket label? + continue + } + + withoutBucketStr := withoutBucket.String() + + existing, ok := h.ss[withoutBucketStr] + if !ok { + existing = histSeries{ + labels: withoutBucket, + hist: make([]Histogram, h.len), + } + h.ss[withoutBucketStr] = existing + } + + b := bucket.asFloat() + + for _, sample := range ts.Samples { + if sample.Value == 0 { + continue + } + j := IntervalOfMs(sample.TimestampMs, h.start, h.end, h.step) + if j >= 0 && j < len(existing.hist) { + existing.hist[j].Record(b, int(sample.Value)) + } + } + } +} + +func (h *HistogramAggregator) Results() SeriesSet { + results := make(SeriesSet, len(h.ss)*len(h.qs)) + + for _, in := range h.ss { + // For each input series, we create a new series for each quantile. + for _, q := range h.qs { + // Append label for the quantile + labels := append((Labels)(nil), in.labels...) + labels = append(labels, Label{"p", NewStaticFloat(q)}) + s := labels.String() + + ts := TimeSeries{ + Labels: labels, + Values: make([]float64, len(in.hist)), + } + for i := range in.hist { + + buckets := in.hist[i].Buckets + sort.Slice(buckets, func(i, j int) bool { + return buckets[i].Max < buckets[j].Max + }) + + ts.Values[i] = Log2Quantile(q, buckets) + } + results[s] = ts + } + } + return results +} + +// Log2Bucketize rounds the given value to the next powers-of-two bucket. +func Log2Bucketize(v uint64) float64 { + if v < 2 { + return -1 + } + + return math.Pow(2, math.Ceil(math.Log2(float64(v)))) +} + +// Log2Quantile returns the quantile given bucket labeled with float ranges and counts. Uses +// exponential power-of-two interpolation between buckets as needed. +func Log2Quantile(p float64, buckets []HistogramBucket) float64 { + if math.IsNaN(p) || + p < 0 || + p > 1 || + len(buckets) == 0 { + return 0 + } + + totalCount := 0 + for _, b := range buckets { + totalCount += b.Count + } + + if totalCount == 0 { + return 0 + } + + // Maximum amount of samples to include. We round up to better handle + // percentiles on low sample counts (<100). + maxSamples := int(math.Ceil(p * float64(totalCount))) + + if maxSamples == 0 { + // We have to read at least one sample. + maxSamples = 1 + } + + // Find the bucket where the percentile falls in. + var total, bucket int + for i, b := range buckets { + // Next bucket + bucket = i + + // If we can't fully consume the samples in this bucket + // then we are done. + if total+b.Count > maxSamples { + break + } + + // Consume all samples in this bucket + total += b.Count + + // p100 or happen to read the exact number of samples. + // Quantile is the max range for the bucket. No reason + // to enter interpolation below. + if total == maxSamples { + return b.Max + } + } + + // Fraction to interpolate between buckets, sample-count wise. + // 0.5 means halfway + interp := float64(maxSamples-total) / float64(buckets[bucket].Count) + + // Exponential interpolation between buckets + // The current bucket represents the maximum value + max := math.Log2(buckets[bucket].Max) + var min float64 + if bucket > 0 { + // Prior bucket represents the min + min = math.Log2(buckets[bucket-1].Max) + } else { + // There is no prior bucket, assume powers of 2 + min = max - 1 + } + mid := math.Pow(2, min+(max-min)*interp) + return mid +} + +var ( + _ SeriesAggregator = (*SimpleAdditionAggregator)(nil) + _ SeriesAggregator = (*HistogramAggregator)(nil) +) diff --git a/pkg/traceql/engine_metrics_test.go b/pkg/traceql/engine_metrics_test.go index 8eec72934fe..aa00293dd19 100644 --- a/pkg/traceql/engine_metrics_test.go +++ b/pkg/traceql/engine_metrics_test.go @@ -290,3 +290,135 @@ func TestCompileMetricsQueryRangeFetchSpansRequest(t *testing.T) { }) } } + +func TestQuantileOverTime(t *testing.T) { + req := &tempopb.QueryRangeRequest{ + Start: uint64(1 * time.Second), + End: uint64(3 * time.Second), + Step: uint64(1 * time.Second), + } + + var ( + attr = IntrinsicDurationAttribute + qs = []float64{0, 0.5, 1} + by = []Attribute{NewScopedAttribute(AttributeScopeSpan, false, "foo")} + _128ns = 0.000000128 + _256ns = 0.000000256 + _512ns = 0.000000512 + ) + + // A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity + in := []Span{ + newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(128), + newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(256), + newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(512), + + newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256), + newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256), + newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256), + newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256), + + newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512), + newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512), + newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512), + } + + // Output series with quantiles per foo + // Prom labels are sorted alphabetically, traceql labels maintain original order. + out := SeriesSet{ + `{p="0", span.foo="bar"}`: TimeSeries{ + Labels: []Label{ + {Name: "span.foo", Value: NewStaticString("bar")}, + {Name: "p", Value: NewStaticFloat(0)}, + }, + Values: []float64{ + _128ns, + percentileHelper(0, _256ns, _256ns, _256ns, _256ns), + 0, + }, + }, + `{p="0.5", span.foo="bar"}`: TimeSeries{ + Labels: []Label{ + {Name: "span.foo", Value: NewStaticString("bar")}, + {Name: "p", Value: NewStaticFloat(0.5)}, + }, + Values: []float64{ + _256ns, + percentileHelper(0.5, _256ns, _256ns, _256ns, _256ns), + 0, + }, + }, + `{p="1", span.foo="bar"}`: TimeSeries{ + Labels: []Label{ + {Name: "span.foo", Value: NewStaticString("bar")}, + {Name: "p", Value: NewStaticFloat(1)}, + }, + Values: []float64{_512ns, _256ns, 0}, + }, + `{p="0", span.foo="baz"}`: TimeSeries{ + Labels: []Label{ + {Name: "span.foo", Value: NewStaticString("baz")}, + {Name: "p", Value: NewStaticFloat(0)}, + }, + Values: []float64{ + 0, 0, + percentileHelper(0, _512ns, _512ns, _512ns), + }, + }, + `{p="0.5", span.foo="baz"}`: TimeSeries{ + Labels: []Label{ + {Name: "span.foo", Value: NewStaticString("baz")}, + {Name: "p", Value: NewStaticFloat(0.5)}, + }, + Values: []float64{ + 0, 0, + percentileHelper(0.5, _512ns, _512ns, _512ns), + }, + }, + `{p="1", span.foo="baz"}`: TimeSeries{ + Labels: []Label{ + {Name: "span.foo", Value: NewStaticString("baz")}, + {Name: "p", Value: NewStaticFloat(1)}, + }, + Values: []float64{0, 0, _512ns}, + }, + } + + // 3 layers of processing matches: query-frontend -> queriers -> generators -> blocks + layer1 := newMetricsAggregateQuantileOverTime(attr, qs, by) + layer1.init(req, AggregateModeRaw) + + layer2 := newMetricsAggregateQuantileOverTime(attr, qs, by) + layer2.init(req, AggregateModeSum) + + layer3 := newMetricsAggregateQuantileOverTime(attr, qs, by) + layer3.init(req, AggregateModeFinal) + + // Pass spans to layer 1 + for _, s := range in { + layer1.observe(s) + } + + // Pass layer 1 to layer 2 + // These are partial counts over time by bucket + res := layer1.result() + layer2.observeSeries(res.ToProto(req)) + + // Pass layer 2 to layer 3 + // These are summed counts over time by bucket + res = layer2.result() + layer3.observeSeries(res.ToProto(req)) + + // Layer 3 final results + // The quantiles + final := layer3.result() + require.Equal(t, out, final) +} + +func percentileHelper(q float64, values ...float64) float64 { + h := Histogram{} + for _, v := range values { + h.Record(v, 1) + } + return Log2Quantile(q, h.Buckets) +} diff --git a/pkg/traceql/enum_aggregates.go b/pkg/traceql/enum_aggregates.go index 76a47aeae62..9a55bcef494 100644 --- a/pkg/traceql/enum_aggregates.go +++ b/pkg/traceql/enum_aggregates.go @@ -29,6 +29,26 @@ func (a AggregateOp) String() string { return fmt.Sprintf("aggregate(%d)", a) } +// AggregateMode is the different flavors of metrics queries +// as executed in different places. +type AggregateMode int + +const ( + // AggregateModeRaw is the version that runs directly on spans. + // It yields the first level of raw time series. + AggregateModeRaw = iota + + // AggregateModeSum is the version that performs the next stages + // after raw. This is how to combine results from multiple jobs or pods, but still not + // the final results. For example rate/count are simple addition, min/max compute + // another level of min/maxing. + AggregateModeSum + + // AggregateModeFinal is the version that must run in a single place and cannot be + // subdivided. This includes the computation of quantiles, averages, etc. + AggregateModeFinal +) + type MetricsAggregateOp int const ( diff --git a/pkg/traceql/test_examples.yaml b/pkg/traceql/test_examples.yaml index 1b038bd318d..db40823a311 100644 --- a/pkg/traceql/test_examples.yaml +++ b/pkg/traceql/test_examples.yaml @@ -120,6 +120,7 @@ valid: # metrics - '{} | rate()' - '{} | count_over_time() by (name) with(sample=0.1)' + - '{} | quantile_over_time(duration, 0, 0.9, 1) by (span.http.path)' # undocumented - nested set - '{ nestedSetLeft > 3 }' - '{ } >> { kind = server } | select(nestedSetLeft, nestedSetRight, nestedSetParent)' @@ -288,8 +289,6 @@ unsupported: # other scalar filters. no idea if these should be supported - '3 = 2' # naked scalar filter, technically allowed - 'avg(.field) > 1 - 3' # scalar expressions in scalar filters are currently not allowed. possible future addition - # quantiles - - '{} | quantile_over_time(duration, 0.9, 0.95) by (span.http.path)' # parsed and the ast is dumped to stdout. this is a debugging tool dump: \ No newline at end of file