Skip to content

Commit

Permalink
feat: TraceQL metrics: max over time (#4065)
Browse files Browse the repository at this point in the history
* max over time

* changelog

* fix typo

* fix another typo and examples

* Update docs/sources/tempo/traceql/metrics-queries.md

Co-authored-by: Martin Disibio <[email protected]>

---------

Co-authored-by: Martin Disibio <[email protected]>
  • Loading branch information
javiermolinar and mdisibio authored Sep 10, 2024
1 parent 7421936 commit 4013355
Show file tree
Hide file tree
Showing 10 changed files with 623 additions and 392 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
* [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar)
* [ENHANCEMENT] TraceQL metrics queries: add max_over_time [#3975](https://github.com/grafana/tempo/pull/4065) (@javiermolinar)

# v2.6.0

Expand Down
17 changes: 16 additions & 1 deletion docs/sources/tempo/traceql/metrics-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ These functions can be added as an operator at the end of any TraceQL query.
`min_over_time`
: Returns the minimum value of matching spans values per time interval (see the `step` API parameter)

`max_over_time`
: Returns the maximum value of matching spans values per time interval (see the `step` API parameter)

`quantile_over_time`
: The quantile of the values in the specified interval

Expand Down Expand Up @@ -94,14 +97,15 @@ down by HTTP route.
This might let you determine that `/api/sad` had a higher rate of erroring
spans than `/api/happy`, for example.

### The `count_over_time` and `min_over_time` functions
### The `count_over_time`, `min_over_time` and `max_over_time` functions

The `count_over_time()` let you counts the number of matching spans per time interval.

```
{ name = "GET /:endpoint" } | count_over_time() by (span.http.status_code)
```

The `min_over_time()` let you aggregate numerical values by computing the minimum value of them, such as the all important span duration.

```
Expand All @@ -113,6 +117,17 @@ Any numerical attribute on the span is fair game.
```
{ name = "GET /:endpoint" } | min_over_time(span.http.status_code)
```

The `max_over_time()` let you aggregate numerical values by computing the maximum value of them, such as the all important span duration.

```
{ name = "GET /:endpoint" } | max_over_time(duration) by (span.http.target)
```

```
{ name = "GET /:endpoint" } | max_over_time(span.http.status_code)
```

### The `quantile_over_time` and `histogram_over_time` functions

The `quantile_over_time()` and `histogram_over_time()` functions let you aggregate numerical values, such as the all important span duration.
Expand Down
10 changes: 8 additions & 2 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,12 +1110,17 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
return math.NaN(), a.spanStartTimeMs(s)
}
case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewMinOverTimeAggregator(a.attr) }
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, minAggregation) }
a.simpleAggregationOp = minAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateMaxOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, maxAggregation) }
a.simpleAggregationOp = maxAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
a.simpleAggregationOp = sumAggregation
Expand Down Expand Up @@ -1241,6 +1246,7 @@ func (a *MetricsAggregate) validate() error {
switch a.op {
case metricsAggregateCountOverTime:
case metricsAggregateMinOverTime:
case metricsAggregateMaxOverTime:
case metricsAggregateRate:
case metricsAggregateHistogramOverTime:
if len(a.by) >= maxGroupBys {
Expand Down
59 changes: 43 additions & 16 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,35 @@ func (c *CountOverTimeAggregator) Sample() float64 {

// MinOverTimeAggregator it calculates the mininum value over time. It can also
// calculate the rate when given a multiplier.
type MinOverTimeAggregator struct {
type OverTimeAggregator struct {
getSpanAttValue func(s Span) float64
min float64
agg func(current, new float64) float64
val float64
}

var _ VectorAggregator = (*MinOverTimeAggregator)(nil)
var _ VectorAggregator = (*OverTimeAggregator)(nil)

func NewMinOverTimeAggregator(attr Attribute) *MinOverTimeAggregator {
func NewOverTimeAggregator(attr Attribute, op SimpleAggregationOp) *OverTimeAggregator {
var fn func(s Span) float64
var agg func(current, new float64) float64

switch op {
case maxAggregation:
agg = func(current, new float64) float64 {
if math.IsNaN(current) || new > current {
return new
}
return current
}
case minAggregation:
agg = func(current, new float64) float64 {
if math.IsNaN(current) || new < current {
return new
}
return current
}
}

switch attr {
case IntrinsicDurationAttribute:
fn = func(s Span) float64 {
Expand All @@ -366,23 +386,20 @@ func NewMinOverTimeAggregator(attr Attribute) *MinOverTimeAggregator {
return f
}
}
return &MinOverTimeAggregator{

return &OverTimeAggregator{
getSpanAttValue: fn,
min: math.Float64frombits(normalNaN),
agg: agg,
val: math.Float64frombits(normalNaN),
}
}

func (c *MinOverTimeAggregator) Observe(s Span) {
val := c.getSpanAttValue(s)
if math.IsNaN(c.min) {
c.min = val
} else if val < c.min {
c.min = val
}
func (c *OverTimeAggregator) Observe(s Span) {
c.val = c.agg(c.val, c.getSpanAttValue(s))
}

func (c *MinOverTimeAggregator) Sample() float64 {
return c.min
func (c *OverTimeAggregator) Sample() float64 {
return c.val
}

// StepAggregator sorts spans into time slots using a step interval like 30s or 1m
Expand Down Expand Up @@ -1078,6 +1095,7 @@ type SimpleAggregationOp int
const (
sumAggregation SimpleAggregationOp = iota
minAggregation
maxAggregation
)

type SimpleAggregator struct {
Expand All @@ -1095,14 +1113,23 @@ func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp) *
var f func(existingValue float64, newValue float64) float64
switch op {
case minAggregation:
// Simple min aggregator. It calculates the minumun between existing values and a new sample
// Simple min aggregator. It calculates the minimum between existing values and a new sample
f = func(existingValue float64, newValue float64) float64 {
if math.IsNaN(existingValue) || newValue < existingValue {
return newValue
}
return existingValue
}
initWithNaN = true
case maxAggregation:
// Simple max aggregator. It calculates the maximum between existing values and a new sample
f = func(existingValue float64, newValue float64) float64 {
if math.IsNaN(existingValue) || newValue > existingValue {
return newValue
}
return existingValue
}
initWithNaN = true
default:
// Simple addition aggregator. It adds existing values with the new sample.
f = func(existingValue float64, newValue float64) float64 { return existingValue + newValue }
Expand Down
143 changes: 143 additions & 0 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,149 @@ func TestMinOverTimeForSpanAttribute(t *testing.T) {
}
}

func TestMaxOverTimeForDuration(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | max_over_time(duration) by (span.foo)",
}

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

result := runTraceQLMetric(t, req, in)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]

// We cannot compare with require.Equal because NaN != NaN
// foo.baz = (NaN, NaN, 0.000000512)
assert.True(t, math.IsNaN(fooBaz.Values[0]))
assert.True(t, math.IsNaN(fooBaz.Values[1]))
assert.Equal(t, 1024/float64(time.Second), fooBaz.Values[2])

// foo.bar = (0.000000128, 0.000000128, NaN)
assert.Equal(t, 512/float64(time.Second), fooBar.Values[0])
assert.Equal(t, 256/float64(time.Second), fooBar.Values[1])
assert.True(t, math.IsNaN(fooBar.Values[2]))
}

func TestMaxOverTimeWithNoMatch(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | max_over_time(span.buu)",
}

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 404).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 201).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 401).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}

result := runTraceQLMetric(t, req, in)

// Test that empty timeseries are not included
ts := result.ToProto(req)

assert.True(t, len(ts) == 0)
}

func TestMaxOverTimeForSpanAttribute(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | max_over_time(span.http.status_code) by (span.foo)",
}

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 404).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 201).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 401).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}

in2 := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 100).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 300).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 204).WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 400).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 401).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 402).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 403).WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 300).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 400).WithDuration(512),
}

result := runTraceQLMetric(t, req, in, in2)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]

// Alas,we cannot compare with require.Equal because NaN != NaN
// foo.baz = (204, NaN, 500)
assert.Equal(t, 204.0, fooBaz.Values[0])
assert.True(t, math.IsNaN(fooBaz.Values[1]))
assert.Equal(t, 500.0, fooBaz.Values[2])

// foo.bar = (404,403, NaN)
assert.Equal(t, 404.0, fooBar.Values[0])
assert.Equal(t, 403.0, fooBar.Values[1])
assert.True(t, math.IsNaN(fooBar.Values[2]))

// Test that NaN values are not included in the samples after casting to proto
ts := result.ToProto(req)
fooBarSamples := []tempopb.Sample{{TimestampMs: 1000, Value: 404}, {TimestampMs: 2000, Value: 403}}
fooBazSamples := []tempopb.Sample{{TimestampMs: 1000, Value: 204}, {TimestampMs: 3000, Value: 500}}

for _, s := range ts {
if s.PromLabels == "{span.foo=\"bar\"}" {
assert.Equal(t, fooBarSamples, s.Samples)
} else {
assert.Equal(t, fooBazSamples, s.Samples)
}
}
}

func TestHistogramOverTime(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
Expand Down
3 changes: 3 additions & 0 deletions pkg/traceql/enum_aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
metricsAggregateRate MetricsAggregateOp = iota
metricsAggregateCountOverTime
metricsAggregateMinOverTime
metricsAggregateMaxOverTime
metricsAggregateQuantileOverTime
metricsAggregateHistogramOverTime
)
Expand All @@ -67,6 +68,8 @@ func (a MetricsAggregateOp) String() string {
return "count_over_time"
case metricsAggregateMinOverTime:
return "min_over_time"
case metricsAggregateMaxOverTime:
return "max_over_time"
case metricsAggregateQuantileOverTime:
return "quantile_over_time"
case metricsAggregateHistogramOverTime:
Expand Down
4 changes: 3 additions & 1 deletion pkg/traceql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ import (
COUNT AVG MAX MIN SUM
BY COALESCE SELECT
END_ATTRIBUTE
RATE COUNT_OVER_TIME MIN_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME COMPARE
RATE COUNT_OVER_TIME MIN_OVER_TIME MAX_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME COMPARE
WITH

// Operators are listed with increasing precedence.
Expand Down Expand Up @@ -300,6 +300,8 @@ metricsAggregation:
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, $6) }
| MIN_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMinOverTime, $3, nil) }
| MIN_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMinOverTime, $3, $7) }
| MAX_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMaxOverTime, $3, nil) }
| MAX_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMaxOverTime, $3, $7) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, nil) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, $9) }
| HISTOGRAM_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateHistogramOverTime, $3, nil) }
Expand Down
Loading

0 comments on commit 4013355

Please sign in to comment.