From 802d1eaebced68add4d266965ee8146d66bee8bc Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 27 Aug 2020 14:10:19 +0300 Subject: [PATCH] make the lookBack configurable Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 2 +- cmd/thanos/query.go | 5 + docs/components/query.md | 3 + pkg/query/iter.go | 6 +- pkg/query/querier_test.go | 229 ++------------------------------------ 5 files changed, 18 insertions(+), 227 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ceba1f6c7cd..5b9bc186f45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,6 @@ sse_config: ### Fixed -- [#3010](https://github.com/thanos-io/thanos/pull/3010) Querier: Prevent rare data gaps while switching replicas in deduplication algorithm. - [#2937](https://github.com/thanos-io/thanos/pull/2937) Receive: Fixing auto-configuration of --receive.local-endpoint - [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers. - [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix` @@ -53,6 +52,7 @@ sse_config: - [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails. - [#2865](https://github.com/thanos-io/thanos/pull/2865) ui: Migrate Thanos Ruler UI to React - [#2964](https://github.com/thanos-io/thanos/pull/2964) Query: Add time range parameters to label APIs. Add `start` and `end` fields to Store API `LabelNamesRequest` and `LabelValuesRequest`. +- [#3010](https://github.com/thanos-io/thanos/pull/3010) Querier: Added a flag to set the default look back delta. - [#2996](https://github.com/thanos-io/thanos/pull/2996) Sidecar: Add `reloader_config_apply_errors_total` metric. Add new flags `--reloader.watch-interval`, and `--reloader.retry-interval`. - [#2973](https://github.com/thanos-io/thanos/pull/2973) Add Thanos Query Frontend component. - [#2980](https://github.com/thanos-io/thanos/pull/2980) Bucket Viewer: Migrate block viewer to React. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index f967d8538ca..409c856b186 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -70,6 +70,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() + lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations.").Duration() + maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query."). Default("4").Int() @@ -175,6 +177,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { *maxConcurrentQueries, *maxConcurrentSelects, time.Duration(*queryTimeout), + *lookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), *queryReplicaLabels, @@ -222,6 +225,7 @@ func runQuery( maxConcurrentQueries int, maxConcurrentSelects int, queryTimeout time.Duration, + lookbackDelta time.Duration, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, queryReplicaLabels []string, @@ -312,6 +316,7 @@ func runQuery( NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return defaultEvaluationInterval.Milliseconds() }, + LookbackDelta: lookbackDelta, }, ) ) diff --git a/docs/components/query.md b/docs/components/query.md index 1ef59c1f67f..11a1021bc33 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -367,6 +367,9 @@ Flags: --query.timeout=2m Maximum time to process query by query node. --query.max-concurrent=20 Maximum number of queries processed concurrently by query node. + --query.lookback-delta=QUERY.LOOKBACK-DELTA + The maximum lookback duration for retrieving + metrics during expression evaluations. --query.max-concurrent-select=4 Maximum number of select requests made concurrently per a query. diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 8c7ccc6c762..86a06af1b39 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -577,10 +577,6 @@ func (it *dedupSeriesIterator) Next() bool { if it.aok { it.aok = it.a.Seek(it.lastT + 1 + it.penA) - // When A is exhausted the seek for B below should be without any penalty. - if !it.aok { - it.penB = 0 - } } if it.bok { it.bok = it.b.Seek(it.lastT + 1 + it.penB) @@ -614,7 +610,7 @@ func (it *dedupSeriesIterator) Next() bool { // samples to the next seek against it. // This ensures that we don't pick a sample too close, which would increase the overall // sample frequency. It also guards against clock drift and inaccuracies during - + // timestamp assignment. // If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge // that timestamps are in milliseconds and sampling frequencies typically multiple seconds long. const initialPenalty = 5000 diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 642a0e601b6..18d24f6efa6 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -5,7 +5,6 @@ package query import ( "context" - "encoding/json" "fmt" "io/ioutil" "math" @@ -19,7 +18,6 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" @@ -294,136 +292,6 @@ func (s series) Iterator() chunkenc.Iterator { return newMockedSeriesIterator(s.samples) } -// TestQuerier_Select_After_promql tests expected results with and without deduplication after passing all data to promql. -// To test with real data: -// Collect the expected results from Prometheus or Thanos through "/api/v1/query_range" and save to a file. -// Collect raw data to be used for local storage: -// scripts/insecure_grpcurl_series.sh queriesGrpcIP:port '[{"name": "__name__", "value":"cluster_version"},{"name":"_id","value":"xxx"}]' 1597823000000 1597824600000 > localStorage.json -// Remove all white space from the file and put each series in a new line. -// When collecting the raw data mint should be Prometheus query time minus the default look back delta(default is 5min or 300000ms) -// For example if the Prometheus query mint is 1597823700000 the grpccurl query mint should be 1597823400000. -// This is because when promql displays data for a given range it looks back 5min before the requested time window. -func TestQuerier_Select_After_promql(t *testing.T) { - logger := log.NewLogfmtLogger(os.Stderr) - - for _, tcase := range []struct { - name string - storeAPI storepb.StoreServer - replicaLabels []string // Replica label groups chunks by the label value and strips it from the final result. - hints *storage.SelectHints - equivalentQuery string - - expected []series - expectedAfterDedup series - expectedWarning string - }{ - - { - // Simulate Prom with 1m scrape interval scraping 30s apart. - // This should start with replica-1 until a brief outage, - // then switch to replica-2 after not seeing a value for 2 * interval = 120s. - name: "switch to replica 2 after an outage", - storeAPI: &storeServer{ - resps: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 1}, {60000, 3}, {120000, 5} /* outage for 3 minutes */, {300000, 11}, {360000, 13}}), - storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{30000, 2}, {90000, 5}, {150000, 6}, {210000, 8}, {270000, 10}, {330000, 12}}), - }, - }, - hints: &storage.SelectHints{ - Start: 0, - End: 360000, - Step: 60000, - }, - replicaLabels: []string{"a"}, - equivalentQuery: `{a=~"a|b"}`, - expected: []series{ - { - lset: labels.FromStrings("a", "a"), - samples: []sample{{0, 1}, {60000, 3}, {120000, 5}, {t: 180000, v: 5}, {t: 240000, v: 5}, {t: 300000, v: 11}, {t: 360000, v: 13}}, - }, - { - lset: labels.FromStrings("a", "b"), - samples: []sample{{t: 60000, v: 2}, {t: 120000, v: 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}}, - }, - }, - expectedAfterDedup: series{ - lset: labels.Labels{}, - samples: []sample{{0, 1}, {60000, 2}, {120000, 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}}, - }, - }, - - { - // // Regression test against https://github.com/thanos-io/thanos/issues/2890. - name: "when switching replicas make sure the time window between samples is never bigger then the lookback delta", - storeAPI: func() storepb.StoreServer { - s, err := store.NewLocalStoreFromJSONMmappableFile(logger, component.Debug, nil, "./testdata/issue2890-seriesresponses.json", store.ScanGRPCCurlProtoStreamMessages) - testutil.Ok(t, err) - return s - }(), - equivalentQuery: `cluster_version{}`, - replicaLabels: []string{"replica"}, - hints: &storage.SelectHints{ - Start: 1597823700000, - End: 1597824600000, - Step: 3000, - }, - expected: jsonToSeries(t, "testdata/issue2890-expected.json"), - expectedAfterDedup: jsonToSeries(t, "testdata/issue2890-expected-dedup.json")[0], - }, - } { - timeout := 5 * time.Minute - e := promql.NewEngine(promql.EngineOpts{ - Logger: logger, - Timeout: timeout, - MaxSamples: math.MaxInt64, - }) - - t.Run(tcase.name, func(t *testing.T) { - for _, sc := range []struct { - dedup bool - expected []series - }{ - {dedup: false, expected: tcase.expected}, - {dedup: true, expected: []series{tcase.expectedAfterDedup}}, - } { - - resolution := time.Duration(tcase.hints.Step) * time.Millisecond - t.Run(fmt.Sprintf("dedup=%v, resolution=%v", sc.dedup, resolution.String()), func(t *testing.T) { - var actual []series - // Boostrap a local store and pass the data through promql. - { - g := gate.New(2) - mq := &mockedQueryable{ - Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) - }, - } - t.Cleanup(func() { - testutil.Ok(t, mq.Close()) - }) - q, err := e.NewRangeQuery(mq, tcase.equivalentQuery, timestamp.Time(tcase.hints.Start), timestamp.Time(tcase.hints.End), resolution) - testutil.Ok(t, err) - t.Cleanup(q.Close) - res := q.Exec(context.Background()) - testutil.Ok(t, res.Err) - actual = promqlResToSeries(res) - if tcase.expectedWarning != "" { - warns := res.Warnings - testutil.Assert(t, len(warns) == 1, "expected only single warnings") - testutil.Equals(t, tcase.expectedWarning, warns[0].Error()) - } - } - - testutil.Equals(t, sc.expected, actual, "promql result doesn't match the expected output") - if sc.dedup { - testutil.Assert(t, len(actual) == 1, "expected only single response, subqueries?") - } - }) - } - }) - } -} - func TestQuerier_Select(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) @@ -656,7 +524,7 @@ func TestQuerier_Select(t *testing.T) { // Integration test: Make sure the PromQL would select exactly the same. t.Run("through PromQL with 100s step", func(t *testing.T) { catcher := &querierResponseCatcher{t: t, Querier: q} - q, err := e.NewRangeQuery(&mockedQueryable{querier: catcher}, tcase.equivalentQuery, timestamp.Time(tcase.mint), timestamp.Time(tcase.maxt), 100*time.Second) + q, err := e.NewRangeQuery(&mockedQueryable{catcher}, tcase.equivalentQuery, timestamp.Time(tcase.mint), timestamp.Time(tcase.maxt), 100*time.Second) testutil.Ok(t, err) t.Cleanup(q.Close) @@ -708,93 +576,12 @@ func testSelectResponse(t *testing.T, expected []series, res storage.SeriesSet) } } -func jsonToSeries(t *testing.T, filename string) []series { - file, err := ioutil.ReadFile(filename) - testutil.Ok(t, err) - - data := Response{} - testutil.Ok(t, json.Unmarshal(file, &data)) - - var ss []series - for _, ser := range data.Data.Results { - var lbls labels.Labels - for n, v := range ser.Metric { - lbls = append(lbls, labels.Label{ - Name: string(n), - Value: string(v), - }) - } - // Label names need to be sorted. - sort.Sort(lbls) - - var smpls []sample - for _, smp := range ser.Values { - smpls = append(smpls, sample{ - t: int64(smp.Timestamp), - v: float64(smp.Value), - }) - } - - ss = append(ss, series{ - lset: lbls, - samples: smpls, - }) - } - - // Sort the series by their labels. - sort.Slice(ss, func(i, j int) bool { - return labels.Compare(ss[i].lset, ss[j].lset) <= 0 - }) - - return ss -} - -type Response struct { - Status string `json:"status"` - Data struct { - ResultType string `json:"resultType"` - Results model.Matrix `json:"result"` - } `json:"data"` -} - -func promqlResToSeries(res *promql.Result) []series { - matrix := res.Value.(promql.Matrix) - series := make([]series, len(matrix)) - - for i, ser := range matrix { - series[i].lset = ser.Metric - for _, point := range ser.Points { - series[i].samples = append(series[i].samples, sample{t: point.T, v: point.V}) - } - } - return series -} - type mockedQueryable struct { - Creator func(int64, int64) storage.Querier - querier storage.Querier + q storage.Querier } -// Querier creates a queirier with the provided min and maxt. -// The promq engine sets mint and it is calculated based on the default lookback delta. -func (q *mockedQueryable) Querier(_ context.Context, mint int64, maxt int64) (storage.Querier, error) { - if q.Creator == nil { - return q.querier, nil - } - qq := q.Creator(mint, maxt) - q.querier = qq - return q.querier, nil -} - -func (q *mockedQueryable) Close() error { - defer func() { - q.querier = nil - }() - - if q.querier != nil { - return q.querier.Close() - } - return nil +func (q *mockedQueryable) Querier(context.Context, int64, int64) (storage.Querier, error) { + return q.q, nil } type querierResponseCatcher struct { @@ -897,7 +684,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { MaxSamples: math.MaxInt64, }) t.Run("Rate=5mStep=100s", func(t *testing.T) { - q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second) testutil.Ok(t, err) r := q.Exec(context.Background()) @@ -926,7 +713,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { }, vec) }) t.Run("Rate=30mStep=500s", func(t *testing.T) { - q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second) testutil.Ok(t, err) r := q.Exec(context.Background()) @@ -967,7 +754,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { MaxSamples: math.MaxInt64, }) t.Run("Rate=5mStep=100s", func(t *testing.T) { - q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second) testutil.Ok(t, err) r := q.Exec(context.Background()) @@ -991,7 +778,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { }, vec) }) t.Run("Rate=30mStep=500s", func(t *testing.T) { - q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second) testutil.Ok(t, err) r := q.Exec(context.Background())