Skip to content

Commit

Permalink
Apply @ modifier start and end in QF split interval middleware (thano…
Browse files Browse the repository at this point in the history
…s-io#5844)

* apply @ modifier start and end in QF split interval middleware

Signed-off-by: Ben Ye <[email protected]>

* fix lint

Signed-off-by: Ben Ye <[email protected]>

* fix unit tests

Signed-off-by: Ben Ye <[email protected]>

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored and Nathaniel Graham committed May 18, 2023
1 parent 04dad70 commit 84183c0
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.

### Added

- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
Expand Down
6 changes: 3 additions & 3 deletions internal/cortex/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {

// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
query, err := EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
Expand All @@ -93,10 +93,10 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {
return reqs, nil
}

// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// EvaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func evaluateAtModifierFunction(query string, start, end int64) (string, error) {
func EvaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func Test_evaluateAtModifier(t *testing.T) {
tt := tt
t.Run(tt.in, func(t *testing.T) {
t.Parallel()
out, err := evaluateAtModifierFunction(tt.in, start, end)
out, err := EvaluateAtModifierFunction(tt.in, start, end)
if tt.expectedErrorCode != 0 {
require.Error(t, err)
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/queryfrontend/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestRoundTripRetryMiddleware(t *testing.T) {
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Query: "foo",
}

testLabelsRequest := &ThanosLabelsRequest{Path: "/api/v1/labels", Start: 0, End: 2 * hour}
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) {
Start: 0,
End: 2 * hour,
Step: 10 * seconds,
Query: "foo",
}

testLabelsRequest := &ThanosLabelsRequest{
Expand Down Expand Up @@ -395,6 +397,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 1 * seconds,
Dedup: true, // Deduplication is enabled by default.
Query: "foo",
}

testRequestWithoutDedup := &ThanosQueryRangeRequest{
Expand All @@ -404,6 +407,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 1 * seconds,
Dedup: false,
Query: "foo",
}

// Same query params as testRequest, different maxSourceResolution
Expand All @@ -415,6 +419,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 10 * seconds,
Dedup: true,
Query: "foo",
}

// Same query params as testRequest, different maxSourceResolution
Expand All @@ -426,6 +431,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 1 * hour,
Dedup: true,
Query: "foo",
}

// Same query params as testRequest, but with storeMatchers
Expand All @@ -437,6 +443,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
MaxSourceResolution: 1 * seconds,
StoreMatchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Dedup: true,
Query: "foo",
}

cacheConf := &queryrange.ResultsCacheConfig{
Expand Down Expand Up @@ -487,6 +494,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
End: 25 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "foo",
},
expected: 6,
},
Expand All @@ -498,6 +506,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) {
End: 25 * hour,
Step: 10 * seconds,
Dedup: true,
Query: "foo",
},
expected: 6,
},
Expand Down
17 changes: 13 additions & 4 deletions pkg/queryfrontend/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ type splitByInterval struct {
func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs := splitQuery(r, s.interval(r))
reqs, err := splitQuery(r, s.interval(r))
if err != nil {
return nil, err
}
s.splitByCounter.Add(float64(len(reqs)))

reqResps, err := queryrange.DoRequests(ctx, s.next, reqs, s.limits)
Expand All @@ -66,9 +69,15 @@ func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryran
return response, nil
}

func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Request {
func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Request, error) {
var reqs []queryrange.Request
if _, ok := r.(*ThanosQueryRangeRequest); ok {
// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := queryrange.EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
if start := r.GetStart(); start == r.GetEnd() {
reqs = append(reqs, r.WithStartEnd(start, start))
} else {
Expand All @@ -78,7 +87,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Reque
end = r.GetEnd()
}

reqs = append(reqs, r.WithStartEnd(start, end))
reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end))
}
}
} else {
Expand All @@ -93,7 +102,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Reque
}
}

return reqs
return reqs, nil
}

// Round up to the step before the next interval boundary.
Expand Down
240 changes: 240 additions & 0 deletions pkg/queryfrontend/split_by_interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package queryfrontend

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
)

func TestSplitQuery(t *testing.T) {
for i, tc := range []struct {
input queryrange.Request
expected []queryrange.Request
interval time.Duration
}{
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 60 * 60 * seconds,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 60 * 60 * seconds,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ start()",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo @ 0.000",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ 0.000",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ end()",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo @ 172800.000",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ 172800.000",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: 2 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: 3 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: (2 * 24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 2 * 24 * 3600 * seconds,
End: 3 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 2 * 3600 * seconds,
End: 3 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 2 * 3600 * seconds,
End: (3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: (2 * 3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 2 * 3 * 3600 * seconds,
End: 3 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
queries, err := splitQuery(tc.input, tc.interval)
require.NoError(t, err)
require.Equal(t, tc.expected, queries)
})
}
}

0 comments on commit 84183c0

Please sign in to comment.