Skip to content

Commit

Permalink
fix(logqlmetric): make range aggregation Loki-compatiable
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 18, 2024
1 parent fbb5ade commit 87ce232
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 30 deletions.
194 changes: 166 additions & 28 deletions internal/logql/logqlengine/logqlmetric/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/iterators"
Expand Down Expand Up @@ -65,9 +66,15 @@ func TestInstantAggregation(t *testing.T) {
Step: 0,
}
testSamples = []SampledEntry{
{Sample: 1, Timestamp: 1700000002_000000000, Set: &emptyLabels{}},
// Would not be used.
{Sample: 10000, Timestamp: 1700000002_000000000, Set: &emptyLabels{}},

// Step 1.
// 2s Window.
{Sample: 2, Timestamp: 1700000003_000000000, Set: &emptyLabels{}},
{Sample: 3, Timestamp: 1700000004_000000000, Set: &emptyLabels{}},
// Window ends.

// Would not be used.
{Sample: 10000, Timestamp: 1700000005_000000000, Set: &emptyLabels{}},
}
Expand All @@ -78,24 +85,39 @@ func TestInstantAggregation(t *testing.T) {
expected string
}{
// Range aggregation.
{`count_over_time({} [2s])`, "3"},
{`rate({} [2s])`, "1.5"}, // count per log range interval
{`rate({} | unwrap foo [2s])`, "3"}, // sum per log range interval
{`bytes_over_time({} [2s])`, "6"}, // same as sum
{`bytes_rate({} [2s])`, "3"}, // sum per log range interval
{`avg_over_time({} | unwrap foo [2s])`, "2"},
{`sum_over_time({} | unwrap foo [2s])`, "6"},
{`min_over_time({} | unwrap foo [2s])`, "1"},
{`count_over_time({} [2s])`, "2"},
{`rate({} [2s])`, "1"}, // count per log range interval
{`rate({} | unwrap foo [2s])`, "2.5"}, // sum per log range interval
{`bytes_over_time({} [2s])`, "5"}, // same as sum
{`bytes_rate({} [2s])`, "2.5"}, // sum per log range interval
{`avg_over_time({} | unwrap foo [2s])`, "2.5"},
{`sum_over_time({} | unwrap foo [2s])`, "5"},
{`min_over_time({} | unwrap foo [2s])`, "2"},
{`max_over_time({} | unwrap foo [2s])`, "3"},
{`stdvar_over_time({} | unwrap foo [2s])`, "0.6666666666666666"},
{`stddev_over_time({} | unwrap foo [2s])`, "0.816496580927726"},
{`quantile_over_time(0.99, {} | unwrap foo [2s])`, "2.98"},
{`first_over_time({} | unwrap foo [2s])`, "1"},
//
// Mean is 2.5.
// Count is 2.
//
// stdvar =
// ( (2-mean)^2 + (3-mean)^2 ) / count =>
// ( (2-2.5)^2 + (3-2.5)^2 ) / 2 =>
// (0.25 + 0.25) / 2 =>
// stdvar = 0.25
//
{`stdvar_over_time({} | unwrap foo [2s])`, "0.25"},
//
// stddev = sqrt(stdvar) =>
// sqrt(0.25) =>
// stddev = 0.5
//
{`stddev_over_time({} | unwrap foo [2s])`, "0.5"},
{`quantile_over_time(0.99, {} | unwrap foo [2s])`, "2.9899999999999998"},
{`first_over_time({} | unwrap foo [2s])`, "2"},
{`last_over_time({} | unwrap foo [2s])`, "3"},
// Vector aggregation.
{`count(count_over_time({} [2s]))`, "1"},
{`sum(count_over_time({} [2s]))`, "3"},
{`avg(count_over_time({} [2s]))`, "3"},
{`sum(count_over_time({} [2s]))`, "2"},
{`avg(count_over_time({} [2s]))`, "2"},

// Vector function.
{`vector(1)`, "1"},
Expand Down Expand Up @@ -142,8 +164,8 @@ func TestInstantAggregation(t *testing.T) {
{`vector(2) <= 1`, "0"},

// Operations with range.
{`count_over_time({} [2s]) * 2`, "6"},
{`2 * count_over_time({} [2s])`, "6"},
{`count_over_time({} [2s]) * 2`, "4"},
{`2 * count_over_time({} [2s])`, "4"},
// Between vectors.
{`vector(2) * vector(2)`, "4"},
// Between ranges.
Expand All @@ -163,6 +185,113 @@ func TestInstantAggregation(t *testing.T) {
}
}

func TestRangeAggregationStep(t *testing.T) {
var (
ts = func(s, ns int64) pcommon.Timestamp {
return pcommon.NewTimestampFromTime(time.Unix(1700000000+s, ns))
}
testSamples = []SampledEntry{
{Timestamp: ts(2, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(5, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(6, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(10, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(10, 1), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(11, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(35, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(35, 1), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(40, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(100, 0), Set: &emptyLabels{}, Sample: 1.},
{Timestamp: ts(100, 1), Set: &emptyLabels{}, Sample: 1.},
}
)

tests := []struct {
interval time.Duration
step time.Duration
expected []lokiapi.FPoint
start, end pcommon.Timestamp
}{
{
5 * time.Second,
30 * time.Second,
[]lokiapi.FPoint{
{
T: getPrometheusTimestamp(ts(10, 0).AsTime()),
V: "2",
},
{
T: getPrometheusTimestamp(ts(40, 0).AsTime()),
V: "2",
},
{
T: getPrometheusTimestamp(ts(100, 0).AsTime()),
V: "1",
},
},
ts(10, 0), ts(110, 0),
},
{
35 * time.Second, // will overlap by 5 sec
30 * time.Second,
[]lokiapi.FPoint{
{
T: getPrometheusTimestamp(ts(10, 0).AsTime()),
V: "4",
},
{
T: getPrometheusTimestamp(ts(40, 0).AsTime()),
V: "7",
},
{
T: getPrometheusTimestamp(ts(70, 0).AsTime()),
V: "2",
},
{
T: getPrometheusTimestamp(ts(100, 0).AsTime()),
V: "1",
},
},
ts(10, 0), ts(110, 0),
},
{
50 * time.Second,
10 * time.Second,
[]lokiapi.FPoint{
{
T: getPrometheusTimestamp(ts(110, 0).AsTime()),
V: "2",
},
{
T: getPrometheusTimestamp(ts(120, 0).AsTime()),
V: "2",
},
},
ts(110, 0), ts(120, 0),
},
}
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
var (
query = fmt.Sprintf(`count_over_time({} [%s])`, tt.interval)
testParams = EvalParams{
Start: tt.start.AsTime(),
End: tt.end.AsTime(),
Step: tt.step,
}
)
data := evaluateQuery(t, testSamples, query, testParams, false)

v, ok := data.GetMatrixResult()
require.True(t, ok)
require.Len(t, v.Result, 1)

group := v.Result[0].Values
require.Equal(t, tt.expected, group)
})
}
}

func TestRangeAggregation(t *testing.T) {
var (
testParams = EvalParams{
Expand All @@ -172,20 +301,29 @@ func TestRangeAggregation(t *testing.T) {
}
testSamples = []SampledEntry{
// Step 1.
// No samples.

// Step 2.
{Sample: 1, Timestamp: 1700000001_000000000, Set: &emptyLabels{}},
// 2s window.
{Sample: 2, Timestamp: 1700000002_000000000, Set: &emptyLabels{}},
{Sample: 3, Timestamp: 1700000003_000000000, Set: &emptyLabels{}},
// Window ends.

// Step 2.
// Step 3.
{Sample: 4, Timestamp: 1700000004_000000000, Set: &emptyLabels{}},
// 2s window.
{Sample: 5, Timestamp: 1700000005_000000000, Set: &emptyLabels{}},
{Sample: 6, Timestamp: 1700000006_000000000, Set: &emptyLabels{}},
// Window ends.

// Step 3.
// Step 4.
{Sample: 1, Timestamp: 1700000007_000000000, Set: &emptyLabels{}},
// 2s window.
{Sample: 2, Timestamp: 1700000008_000000000, Set: &emptyLabels{}},
{Sample: 3, Timestamp: 1700000008_100000000, Set: &emptyLabels{}},
{Sample: 4, Timestamp: 1700000009_000000000, Set: &emptyLabels{}},
// Window ends.
}
)

Expand All @@ -194,16 +332,16 @@ func TestRangeAggregation(t *testing.T) {
expected []string
}{
// Range aggregation.
{`count_over_time({} [2s])`, []string{"3", "3", "4"}},
{`rate({} [2s])`, []string{"1.5", "1.5", "2"}}, // count per log range interval
{`rate({} | unwrap foo [2s])`, []string{"3", "7.5", "5"}}, // sum per log range interval
{`bytes_over_time({} [2s])`, []string{"6", "15", "10"}}, // same as sum
{`bytes_rate({} [2s])`, []string{"3", "7.5", "5"}}, // sum per log range interval
{`avg_over_time({} | unwrap foo [2s])`, []string{"2", "5", "2.5"}},
{`sum_over_time({} | unwrap foo [2s])`, []string{"6", "15", "10"}},
{`min_over_time({} | unwrap foo [2s])`, []string{"1", "4", "1"}},
{`count_over_time({} [2s])`, []string{"2", "2", "3"}},
{`rate({} [2s])`, []string{"1", "1", "1.5"}}, // count per log range interval
{`rate({} | unwrap foo [2s])`, []string{"2.5", "5.5", "4.5"}}, // sum per log range interval
{`bytes_over_time({} [2s])`, []string{"5", "11", "9"}}, // same as sum
{`bytes_rate({} [2s])`, []string{"2.5", "5.5", "4.5"}}, // sum per log range interval
{`avg_over_time({} | unwrap foo [2s])`, []string{"2.5", "5.5", "3"}},
{`sum_over_time({} | unwrap foo [2s])`, []string{"5", "11", "9"}},
{`min_over_time({} | unwrap foo [2s])`, []string{"2", "5", "2"}},
{`max_over_time({} | unwrap foo [2s])`, []string{"3", "6", "4"}},
{`first_over_time({} | unwrap foo [2s])`, []string{"1", "4", "1"}},
{`first_over_time({} | unwrap foo [2s])`, []string{"2", "5", "2"}},
{`last_over_time({} | unwrap foo [2s])`, []string{"3", "6", "4"}},
}
for i, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion internal/logql/logqlengine/logqlmetric/range_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (i *rangeAggIterator) fillWindow(windowStart, windowEnd time.Time) {
// Entry is after the end of current window: buffer for the next window.
i.buffered = true
return
case ts.Before(windowStart):
case ts.Before(windowStart) || ts.Equal(windowStart):
// Entry is before the start of current window: just skip it.
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/logql/logqlengine/logqlmetric/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func newStepper(
) stepper {
return stepper{
current: start.Add(-step),
end: end.Add(step),
end: end,
step: step,
}
}
Expand Down

0 comments on commit 87ce232

Please sign in to comment.