Skip to content

Commit

Permalink
make the lookBack configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
krasi-georgiev committed Aug 27, 2020
1 parent 08084a8 commit 802d1ea
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 227 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ sse_config:
### Fixed
- [#3010](https://github.com/thanos-io/thanos/pull/3010) Querier: Prevent rare data gaps while switching replicas in deduplication algorithm.
- [#2937](https://github.com/thanos-io/thanos/pull/2937) Receive: Fixing auto-configuration of --receive.local-endpoint
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
Expand All @@ -53,6 +52,7 @@ sse_config:
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.
- [#2865](https://github.com/thanos-io/thanos/pull/2865) ui: Migrate Thanos Ruler UI to React
- [#2964](https://github.com/thanos-io/thanos/pull/2964) Query: Add time range parameters to label APIs. Add `start` and `end` fields to Store API `LabelNamesRequest` and `LabelValuesRequest`.
- [#3010](https://github.com/thanos-io/thanos/pull/3010) Querier: Added a flag to set the default look back delta.
- [#2996](https://github.com/thanos-io/thanos/pull/2996) Sidecar: Add `reloader_config_apply_errors_total` metric. Add new flags `--reloader.watch-interval`, and `--reloader.retry-interval`.
- [#2973](https://github.com/thanos-io/thanos/pull/2973) Add Thanos Query Frontend component.
- [#2980](https://github.com/thanos-io/thanos/pull/2980) Bucket Viewer: Migrate block viewer to React.
Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

lookbackDelta := cmd.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations.").Duration()

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()

Expand Down Expand Up @@ -175,6 +177,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*maxConcurrentQueries,
*maxConcurrentSelects,
time.Duration(*queryTimeout),
*lookbackDelta,
time.Duration(*defaultEvaluationInterval),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -222,6 +225,7 @@ func runQuery(
maxConcurrentQueries int,
maxConcurrentSelects int,
queryTimeout time.Duration,
lookbackDelta time.Duration,
defaultEvaluationInterval time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -312,6 +316,7 @@ func runQuery(
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
LookbackDelta: lookbackDelta,
},
)
)
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.lookback-delta=QUERY.LOOKBACK-DELTA
The maximum lookback duration for retrieving
metrics during expression evaluations.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
Expand Down
6 changes: 1 addition & 5 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,6 @@ func (it *dedupSeriesIterator) Next() bool {
if it.aok {

it.aok = it.a.Seek(it.lastT + 1 + it.penA)
// When A is exhausted the seek for B below should be without any penalty.
if !it.aok {
it.penB = 0
}
}
if it.bok {
it.bok = it.b.Seek(it.lastT + 1 + it.penB)
Expand Down Expand Up @@ -614,7 +610,7 @@ func (it *dedupSeriesIterator) Next() bool {
// samples to the next seek against it.
// This ensures that we don't pick a sample too close, which would increase the overall
// sample frequency. It also guards against clock drift and inaccuracies during

// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenalty = 5000
Expand Down
229 changes: 8 additions & 221 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package query

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math"
Expand All @@ -19,7 +18,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
Expand Down Expand Up @@ -294,136 +292,6 @@ func (s series) Iterator() chunkenc.Iterator {
return newMockedSeriesIterator(s.samples)
}

// TestQuerier_Select_After_promql tests expected results with and without deduplication after passing all data to promql.
// To test with real data:
// Collect the expected results from Prometheus or Thanos through "/api/v1/query_range" and save to a file.
// Collect raw data to be used for local storage:
// scripts/insecure_grpcurl_series.sh queriesGrpcIP:port '[{"name": "__name__", "value":"cluster_version"},{"name":"_id","value":"xxx"}]' 1597823000000 1597824600000 > localStorage.json
// Remove all white space from the file and put each series in a new line.
// When collecting the raw data mint should be Prometheus query time minus the default look back delta(default is 5min or 300000ms)
// For example if the Prometheus query mint is 1597823700000 the grpccurl query mint should be 1597823400000.
// This is because when promql displays data for a given range it looks back 5min before the requested time window.
func TestQuerier_Select_After_promql(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)

for _, tcase := range []struct {
name string
storeAPI storepb.StoreServer
replicaLabels []string // Replica label groups chunks by the label value and strips it from the final result.
hints *storage.SelectHints
equivalentQuery string

expected []series
expectedAfterDedup series
expectedWarning string
}{

{
// Simulate Prom with 1m scrape interval scraping 30s apart.
// This should start with replica-1 until a brief outage,
// then switch to replica-2 after not seeing a value for 2 * interval = 120s.
name: "switch to replica 2 after an outage",
storeAPI: &storeServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 1}, {60000, 3}, {120000, 5} /* outage for 3 minutes */, {300000, 11}, {360000, 13}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{30000, 2}, {90000, 5}, {150000, 6}, {210000, 8}, {270000, 10}, {330000, 12}}),
},
},
hints: &storage.SelectHints{
Start: 0,
End: 360000,
Step: 60000,
},
replicaLabels: []string{"a"},
equivalentQuery: `{a=~"a|b"}`,
expected: []series{
{
lset: labels.FromStrings("a", "a"),
samples: []sample{{0, 1}, {60000, 3}, {120000, 5}, {t: 180000, v: 5}, {t: 240000, v: 5}, {t: 300000, v: 11}, {t: 360000, v: 13}},
},
{
lset: labels.FromStrings("a", "b"),
samples: []sample{{t: 60000, v: 2}, {t: 120000, v: 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}},
},
},
expectedAfterDedup: series{
lset: labels.Labels{},
samples: []sample{{0, 1}, {60000, 2}, {120000, 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}},
},
},

{
// // Regression test against https://github.com/thanos-io/thanos/issues/2890.
name: "when switching replicas make sure the time window between samples is never bigger then the lookback delta",
storeAPI: func() storepb.StoreServer {
s, err := store.NewLocalStoreFromJSONMmappableFile(logger, component.Debug, nil, "./testdata/issue2890-seriesresponses.json", store.ScanGRPCCurlProtoStreamMessages)
testutil.Ok(t, err)
return s
}(),
equivalentQuery: `cluster_version{}`,
replicaLabels: []string{"replica"},
hints: &storage.SelectHints{
Start: 1597823700000,
End: 1597824600000,
Step: 3000,
},
expected: jsonToSeries(t, "testdata/issue2890-expected.json"),
expectedAfterDedup: jsonToSeries(t, "testdata/issue2890-expected-dedup.json")[0],
},
} {
timeout := 5 * time.Minute
e := promql.NewEngine(promql.EngineOpts{
Logger: logger,
Timeout: timeout,
MaxSamples: math.MaxInt64,
})

t.Run(tcase.name, func(t *testing.T) {
for _, sc := range []struct {
dedup bool
expected []series
}{
{dedup: false, expected: tcase.expected},
{dedup: true, expected: []series{tcase.expectedAfterDedup}},
} {

resolution := time.Duration(tcase.hints.Step) * time.Millisecond
t.Run(fmt.Sprintf("dedup=%v, resolution=%v", sc.dedup, resolution.String()), func(t *testing.T) {
var actual []series
// Boostrap a local store and pass the data through promql.
{
g := gate.New(2)
mq := &mockedQueryable{
Creator: func(mint, maxt int64) storage.Querier {
return newQuerier(context.Background(), nil, nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
},
}
t.Cleanup(func() {
testutil.Ok(t, mq.Close())
})
q, err := e.NewRangeQuery(mq, tcase.equivalentQuery, timestamp.Time(tcase.hints.Start), timestamp.Time(tcase.hints.End), resolution)
testutil.Ok(t, err)
t.Cleanup(q.Close)
res := q.Exec(context.Background())
testutil.Ok(t, res.Err)
actual = promqlResToSeries(res)
if tcase.expectedWarning != "" {
warns := res.Warnings
testutil.Assert(t, len(warns) == 1, "expected only single warnings")
testutil.Equals(t, tcase.expectedWarning, warns[0].Error())
}
}

testutil.Equals(t, sc.expected, actual, "promql result doesn't match the expected output")
if sc.dedup {
testutil.Assert(t, len(actual) == 1, "expected only single response, subqueries?")
}
})
}
})
}
}

func TestQuerier_Select(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)

Expand Down Expand Up @@ -656,7 +524,7 @@ func TestQuerier_Select(t *testing.T) {
// Integration test: Make sure the PromQL would select exactly the same.
t.Run("through PromQL with 100s step", func(t *testing.T) {
catcher := &querierResponseCatcher{t: t, Querier: q}
q, err := e.NewRangeQuery(&mockedQueryable{querier: catcher}, tcase.equivalentQuery, timestamp.Time(tcase.mint), timestamp.Time(tcase.maxt), 100*time.Second)
q, err := e.NewRangeQuery(&mockedQueryable{catcher}, tcase.equivalentQuery, timestamp.Time(tcase.mint), timestamp.Time(tcase.maxt), 100*time.Second)
testutil.Ok(t, err)
t.Cleanup(q.Close)

Expand Down Expand Up @@ -708,93 +576,12 @@ func testSelectResponse(t *testing.T, expected []series, res storage.SeriesSet)
}
}

func jsonToSeries(t *testing.T, filename string) []series {
file, err := ioutil.ReadFile(filename)
testutil.Ok(t, err)

data := Response{}
testutil.Ok(t, json.Unmarshal(file, &data))

var ss []series
for _, ser := range data.Data.Results {
var lbls labels.Labels
for n, v := range ser.Metric {
lbls = append(lbls, labels.Label{
Name: string(n),
Value: string(v),
})
}
// Label names need to be sorted.
sort.Sort(lbls)

var smpls []sample
for _, smp := range ser.Values {
smpls = append(smpls, sample{
t: int64(smp.Timestamp),
v: float64(smp.Value),
})
}

ss = append(ss, series{
lset: lbls,
samples: smpls,
})
}

// Sort the series by their labels.
sort.Slice(ss, func(i, j int) bool {
return labels.Compare(ss[i].lset, ss[j].lset) <= 0
})

return ss
}

type Response struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Results model.Matrix `json:"result"`
} `json:"data"`
}

func promqlResToSeries(res *promql.Result) []series {
matrix := res.Value.(promql.Matrix)
series := make([]series, len(matrix))

for i, ser := range matrix {
series[i].lset = ser.Metric
for _, point := range ser.Points {
series[i].samples = append(series[i].samples, sample{t: point.T, v: point.V})
}
}
return series
}

type mockedQueryable struct {
Creator func(int64, int64) storage.Querier
querier storage.Querier
q storage.Querier
}

// Querier creates a queirier with the provided min and maxt.
// The promq engine sets mint and it is calculated based on the default lookback delta.
func (q *mockedQueryable) Querier(_ context.Context, mint int64, maxt int64) (storage.Querier, error) {
if q.Creator == nil {
return q.querier, nil
}
qq := q.Creator(mint, maxt)
q.querier = qq
return q.querier, nil
}

func (q *mockedQueryable) Close() error {
defer func() {
q.querier = nil
}()

if q.querier != nil {
return q.querier.Close()
}
return nil
func (q *mockedQueryable) Querier(context.Context, int64, int64) (storage.Querier, error) {
return q.q, nil
}

type querierResponseCatcher struct {
Expand Down Expand Up @@ -897,7 +684,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
MaxSamples: math.MaxInt64,
})
t.Run("Rate=5mStep=100s", func(t *testing.T) {
q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second)
q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second)
testutil.Ok(t, err)

r := q.Exec(context.Background())
Expand Down Expand Up @@ -926,7 +713,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
}, vec)
})
t.Run("Rate=30mStep=500s", func(t *testing.T) {
q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second)
q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second)
testutil.Ok(t, err)

r := q.Exec(context.Background())
Expand Down Expand Up @@ -967,7 +754,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
MaxSamples: math.MaxInt64,
})
t.Run("Rate=5mStep=100s", func(t *testing.T) {
q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second)
q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second)
testutil.Ok(t, err)

r := q.Exec(context.Background())
Expand All @@ -991,7 +778,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
}, vec)
})
t.Run("Rate=30mStep=500s", func(t *testing.T) {
q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second)
q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second)
testutil.Ok(t, err)

r := q.Exec(context.Background())
Expand Down

0 comments on commit 802d1ea

Please sign in to comment.