Skip to content

Commit

Permalink
Fixes split interval for metrics queries. (#3459)
Browse files Browse the repository at this point in the history
The step needs to be taken into account:
- to be removed on each split end.
- to not split queries that have a step lower than the split interval.

The code is taken from Cortex but adjusted to keep the nanoseconds precision.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Mar 10, 2021
1 parent e383472 commit 93f2b2d
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 61 deletions.
98 changes: 64 additions & 34 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ func Test_codec_DecodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}, false},
{"ok", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/query_range?start=%d&end=%d&query={foo="bar"}&step=86400&limit=200&direction=FORWARD`, start.UnixNano(), end.UnixNano()), nil)
}, &LokiRequest{
Query: `{foo="bar"}`,
Limit: 200,
Step: 86400000, // step is expected in ms.
Direction: logproto.FORWARD,
Path: "/query_range",
StartTs: start,
EndTs: end,
}, false},
{"series", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/series?start=%d&end=%d&match={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil)
Expand Down Expand Up @@ -95,7 +107,8 @@ func Test_codec_DecodeResponse(t *testing.T) {
{"bad json", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(""))}, nil, nil, true},
{"not success", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(`{"status":"fail"}`))}, nil, nil, true},
{"unknown", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(`{"status":"success"}`))}, nil, nil, true},
{"matrix", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(matrixString))}, nil,
{
"matrix", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(matrixString))}, nil,
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Expand All @@ -105,8 +118,10 @@ func Test_codec_DecodeResponse(t *testing.T) {
},
},
Statistics: statsResult,
}, false},
{"streams v1", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))},
}, false,
},
{
"streams v1", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))},
&LokiRequest{Direction: logproto.FORWARD, Limit: 100, Path: "/loki/api/v1/query_range"},
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Expand All @@ -118,8 +133,10 @@ func Test_codec_DecodeResponse(t *testing.T) {
Result: logStreams,
},
Statistics: statsResult,
}, false},
{"streams legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))},
}, false,
},
{
"streams legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(streamsString))},
&LokiRequest{Direction: logproto.FORWARD, Limit: 100, Path: "/api/prom/query_range"},
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Expand All @@ -131,21 +148,26 @@ func Test_codec_DecodeResponse(t *testing.T) {
Result: logStreams,
},
Statistics: statsResult,
}, false},
{"series", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(seriesString))},
}, false,
},
{
"series", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(seriesString))},
&LokiSeriesRequest{Path: "/loki/api/v1/series"},
&LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: seriesData,
}, false},
{"labels legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(labelsString))},
}, false,
},
{
"labels legacy", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(labelsString))},
&LokiLabelNamesRequest{Path: "/api/prom/label"},
&LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionLegacy),
Data: labelsData,
}, false},
}, false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -160,7 +182,6 @@ func Test_codec_DecodeResponse(t *testing.T) {
}

func Test_codec_EncodeRequest(t *testing.T) {

// we only accept LokiRequest.
got, err := lokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{})
require.Error(t, err)
Expand All @@ -170,7 +191,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
toEncode := &LokiRequest{
Query: `{foo="bar"}`,
Limit: 200,
Step: 1010,
Step: 86400000,
Direction: logproto.FORWARD,
Path: "/query_range",
StartTs: start,
Expand All @@ -185,7 +206,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))
require.Equal(t, fmt.Sprintf("%d", 200), got.URL.Query().Get("limit"))
require.Equal(t, `FORWARD`, got.URL.Query().Get("direction"))
require.Equal(t, "1.010000", got.URL.Query().Get("step"))
require.Equal(t, "86400.000000", got.URL.Query().Get("step"))

// testing a full roundtrip
req, err := lokiCodec.DecodeRequest(context.TODO(), got)
Expand Down Expand Up @@ -229,7 +250,6 @@ func Test_codec_series_EncodeRequest(t *testing.T) {
}

func Test_codec_labels_EncodeRequest(t *testing.T) {

ctx := context.Background()
toEncode := &LokiLabelNamesRequest{
Path: "/loki/api/v1/labels",
Expand All @@ -252,7 +272,6 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
}

func Test_codec_EncodeResponse(t *testing.T) {

tests := []struct {
name string
res queryrange.Response
Expand All @@ -270,7 +289,8 @@ func Test_codec_EncodeResponse(t *testing.T) {
},
Statistics: statsResult,
}, matrixString, false},
{"loki v1",
{
"loki v1",
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
Expand All @@ -281,8 +301,10 @@ func Test_codec_EncodeResponse(t *testing.T) {
Result: logStreams,
},
Statistics: statsResult,
}, streamsString, false},
{"loki legacy",
}, streamsString, false,
},
{
"loki legacy",
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
Expand All @@ -293,25 +315,32 @@ func Test_codec_EncodeResponse(t *testing.T) {
Result: logStreams,
},
Statistics: statsResult,
}, streamsStringLegacy, false},
{"loki series",
}, streamsStringLegacy, false,
},
{
"loki series",
&LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: seriesData,
}, seriesString, false},
{"loki labels",
}, seriesString, false,
},
{
"loki labels",
&LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: labelsData,
}, labelsString, false},
{"loki labels legacy",
}, labelsString, false,
},
{
"loki labels legacy",
&LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionLegacy),
Data: labelsData,
}, labelsLegacyString, false},
}, labelsLegacyString, false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -340,17 +369,19 @@ func Test_codec_MergeResponse(t *testing.T) {
}{
{"empty", []queryrange.Response{}, nil, true},
{"unknown response", []queryrange.Response{&badResponse{}}, nil, true},
{"prom", []queryrange.Response{
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStreams,
{
"prom",
[]queryrange.Response{
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStreams,
},
},
},
},
},
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Expand Down Expand Up @@ -1032,7 +1063,6 @@ func BenchmarkResponseMerge(b *testing.B) {
}
})
}

}

func mkResps(nResps, nStreams, nLogs int, direction logproto.Direction) (resps []*LokiResponse) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
req := ParamsToLokiRequest(qry.Params).WithShards(qry.Shards).WithQuery(qry.Expr.String()).(*LokiRequest)
logger, ctx := spanlogger.New(ctx, "DownstreamHandler.instance")
defer logger.Finish()
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", req.Shards), "query", req.Query)
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", req.Shards), "query", req.Query, "step", req.GetStep())

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 6 with 4 in // max.
// split in 7 with 2 in // max.
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
Expand Down Expand Up @@ -96,7 +96,7 @@ func Test_seriesLimiter(t *testing.T) {

_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
require.Equal(t, 6, *count)
require.Equal(t, 7, *count)

// 2 series should not be allowed.
c := new(int)
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func NewLogFilterTripperware(
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByMetrics))
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics))
}

if cfg.ShardedQueries {
Expand Down Expand Up @@ -270,7 +270,7 @@ func NewSeriesTripperware(
queryRangeMiddleware = append(queryRangeMiddleware,
queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics),
// The Series API needs to pull one chunk per series to extract the label set, which is much cheaper than iterating through all matching chunks.
SplitByIntervalMiddleware(WithSplitByLimits(limits, cfg.SplitQueriesByInterval), codec, splitByMetrics),
SplitByIntervalMiddleware(WithSplitByLimits(limits, cfg.SplitQueriesByInterval), codec, splitByTime, splitByMetrics),
)
}
if cfg.MaxRetries > 0 {
Expand Down Expand Up @@ -301,7 +301,7 @@ func NewLabelsTripperware(
queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics),
// Force a 24 hours split by for labels API, this will be more efficient with our static daily bucket storage.
// This is because the labels API is an index-only operation.
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByMetrics),
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, splitByMetrics),
)
}
if cfg.MaxRetries > 0 {
Expand Down Expand Up @@ -348,7 +348,7 @@ func NewMetricTripperware(
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics),
SplitByIntervalMiddleware(limits, codec, splitByMetrics),
SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics),
)

var c cache.Cache
Expand Down
Loading

0 comments on commit 93f2b2d

Please sign in to comment.