From d6eeef0782f0b54a3a526a57789f66330a3f5eb1 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Fri, 12 Jun 2020 11:51:25 +0200 Subject: [PATCH] Implement async select for Querier Signed-off-by: Kemal Akkoyun --- cmd/thanos/query.go | 7 ++- docs/components/query.md | 3 ++ pkg/query/api/v1_test.go | 2 +- pkg/query/iter.go | 37 ++++++++++++++ pkg/query/querier.go | 103 ++++++++++++++++++++++++++++---------- pkg/query/querier_test.go | 58 ++++++++++++--------- 6 files changed, 157 insertions(+), 53 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 70ea2cac7f2..9f85ae970d5 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -67,6 +67,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() + maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query."). + Default("4").Int() + queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules."). Strings() @@ -159,6 +162,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { *webExternalPrefix, *webPrefixHeaderName, *maxConcurrentQueries, + *maxConcurrentSelects, time.Duration(*queryTimeout), time.Duration(*storeResponseTimeout), *queryReplicaLabels, @@ -202,6 +206,7 @@ func runQuery( webExternalPrefix string, webPrefixHeaderName string, maxConcurrentQueries int, + maxConcurrentSelects int, queryTimeout time.Duration, storeResponseTimeout time.Duration, queryReplicaLabels []string, @@ -280,7 +285,7 @@ func runQuery( ) proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) - queryableCreator = query.NewQueryableCreator(logger, proxy) + queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects) engine = promql.NewEngine( promql.EngineOpts{ Logger: logger, diff --git a/docs/components/query.md b/docs/components/query.md index 17525a51114..b84d32c10ec 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -333,6 +333,9 @@ Flags: --query.timeout=2m Maximum time to process query by query node. --query.max-concurrent=20 Maximum number of queries processed concurrently by query node. + --query.max-concurrent-select=4 + Maximum number of select requests made + concurrently per a query. --query.replica-label=QUERY.REPLICA-LABEL ... Labels to treat as a replica indicator along which data is deduplicated. Still you will be diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 84f514ba91c..2551508021b 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -109,7 +109,7 @@ func TestEndpoints(t *testing.T) { now := time.Now() api := &API{ - queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)), + queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2), queryEngine: promql.NewEngine(promql.EngineOpts{ Logger: nil, Reg: nil, diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 6c6a10b759d..901aba0f7f3 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -670,3 +670,40 @@ func (it *dedupSeriesIterator) Err() error { } return it.b.Err() } + +type lazySeriesSet struct { + create func() (s storage.SeriesSet, ok bool) + + set storage.SeriesSet +} + +func (c *lazySeriesSet) Next() bool { + if c.set != nil { + return c.set.Next() + } + + var ok bool + c.set, ok = c.create() + return ok +} + +func (c *lazySeriesSet) Err() error { + if c.set != nil { + return c.set.Err() + } + return nil +} + +func (c *lazySeriesSet) At() storage.Series { + if c.set != nil { + return c.set.At() + } + return nil +} + +func (c *lazySeriesSet) Warnings() storage.Warnings { + if c.set != nil { + return c.set.Warnings() + } + return nil +} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 9829e327dcf..20feb182c20 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -11,9 +11,12 @@ import ( "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -27,38 +30,43 @@ import ( type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator { +func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int) QueryableCreator { return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ - logger: logger, - replicaLabels: replicaLabels, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponse: partialResponse, - skipChunks: skipChunks, + logger: logger, + reg: reg, + replicaLabels: replicaLabels, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponse: partialResponse, + skipChunks: skipChunks, + maxConcurrentSelects: maxConcurrentSelects, } } } type queryable struct { - logger log.Logger - replicaLabels []string - proxy storepb.StoreServer - deduplicate bool - maxResolutionMillis int64 - partialResponse bool - skipChunks bool + logger log.Logger + reg prometheus.Registerer + replicaLabels []string + proxy storepb.StoreServer + deduplicate bool + maxResolutionMillis int64 + partialResponse bool + skipChunks bool + maxConcurrentSelects int } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil + return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.maxConcurrentSelects), nil } type querier struct { ctx context.Context logger log.Logger + reg prometheus.Registerer cancel func() mint, maxt int64 replicaLabels map[string]struct{} @@ -67,6 +75,7 @@ type querier struct { maxResolutionMillis int64 partialResponse bool skipChunks bool + gate *gate.Gate } // newQuerier creates implementation of storage.Querier that fetches data from the proxy @@ -74,13 +83,14 @@ type querier struct { func newQuerier( ctx context.Context, logger log.Logger, + reg prometheus.Registerer, mint, maxt int64, replicaLabels []string, proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, - partialResponse bool, - skipChunks bool, + partialResponse, skipChunks bool, + maxConcurrentSelects int, ) *querier { if logger == nil { logger = log.NewNopLogger() @@ -92,9 +102,15 @@ func newQuerier( rl[replicaLabel] = struct{}{} } return &querier{ - ctx: ctx, - logger: logger, - cancel: cancel, + ctx: ctx, + logger: logger, + reg: reg, + cancel: cancel, + gate: gate.NewGate( + maxConcurrentSelects, + extprom.WrapRegistererWithPrefix("thanos_concurrent_select", reg), + ), + mint: mint, maxt: maxt, replicaLabels: rl, @@ -161,7 +177,42 @@ func aggrsFromFunc(f string) []storepb.Aggr { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } -func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { +func (q *querier) Select(sort bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { + promise := make(chan storage.SeriesSet, 1) + + go func() { + defer close(promise) + + var err error + tracing.DoInSpan(q.ctx, "querier_select_ismyturn", func(ctx context.Context) { + err = q.gate.IsMyTurn(ctx) + }) + if err != nil { + promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) + return + } + defer q.gate.Done() + + set, err := q.selectFn(sort, hints, ms...) + if err != nil { + promise <- storage.ErrSeriesSet(err) + return + } + + promise <- set + }() + + return &lazySeriesSet{create: func() (storage.SeriesSet, bool) { + // Only gets called once, for the first Next() call of the series set. + set, ok := <-promise + if !ok { + return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false + } + return set, set.Next() + }} +} + +func (q *querier) selectFn(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) { if hints == nil { hints = &storage.SelectHints{ Start: q.mint, @@ -182,7 +233,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match sms, err := storepb.TranslatePromMatchers(ms...) if err != nil { - return storage.ErrSeriesSet(errors.Wrap(err, "convert matchers")) + return nil, errors.Wrap(err, "convert matchers") } aggrs := aggrsFromFunc(hints.Func) @@ -197,7 +248,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match PartialResponseDisabled: !q.partialResponse, SkipChunks: q.skipChunks, }, resp); err != nil { - return storage.ErrSeriesSet(errors.Wrap(err, "proxy Series()")) + return nil, errors.Wrap(err, "proxy Series()") } var warns storage.Warnings @@ -213,7 +264,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match set: newStoreSeriesSet(resp.seriesSet), aggrs: aggrs, warns: warns, - } + }, nil } // TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible. @@ -228,7 +279,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match // The merged series set assembles all potentially-overlapping time ranges of the same series into a single one. // TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available. - return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER) + return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil } // sortDedupLabels re-sorts the set so that the same series with different replica diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 8fb05cd3f71..b800aca01ff 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -38,16 +38,16 @@ type sample struct { } func TestQueryableCreator_MaxResolution(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) testProxy := &storeServer{resps: []*storepb.SeriesResponse{}} - queryableCreator := NewQueryableCreator(nil, testProxy) + queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) queryable := queryableCreator(false, nil, oneHourMillis, false, false) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) - defer func() { testutil.Ok(t, q.Close()) }() + t.Cleanup(func() { testutil.Ok(t, q.Close()) }) querierActual, ok := q.(*querier) @@ -58,7 +58,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { // Tests E2E how PromQL works with downsampled data. func TestQuerier_DownsampledData(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) testProxy := &storeServer{ resps: []*storepb.SeriesResponse{ storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store. @@ -70,7 +70,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false, false) + q := NewQueryableCreator(nil, nil, testProxy, 2)(false, nil, 9999999, false, false) engine := promql.NewEngine( promql.EngineOpts{ @@ -507,16 +507,14 @@ func TestQuerier_Select(t *testing.T) { {dedup: false, expected: tcase.expected}, {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { - q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false) - defer testutil.Ok(t, q.Close()) + q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false, 2) + t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { t.Run("querier.Select", func(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() - defer testutil.Ok(t, q.Close()) + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) res := q.Select(false, tcase.hints, tcase.matchers...) - testSelectResponse(t, sc.expected, res) if tcase.expectedWarning != "" { @@ -527,12 +525,12 @@ func TestQuerier_Select(t *testing.T) { }) // Integration test: Make sure the PromQL would select exactly the same. t.Run("through PromQL with 100s step", func(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) catcher := &querierResponseCatcher{t: t, Querier: q} q, err := e.NewRangeQuery(&mockedQueryable{catcher}, tcase.equivalentQuery, timestamp.Time(tcase.mint), timestamp.Time(tcase.maxt), 100*time.Second) testutil.Ok(t, err) - defer q.Close() + t.Cleanup(q.Close) r := q.Exec(context.Background()) testutil.Ok(t, r.Err) @@ -676,18 +674,22 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { "gprd", "fqdn", "web-08-sv-gprd.c.gitlab-production.internal", "instance", "web-08-sv-gprd.c.gitlab-production.internal:8083", "job", "gitlab-rails", "monitor", "app", "provider", "gcp", "region", "us-east", "replica", "02", "shard", "default", "stage", "main", "tier", "sv", "type", "web", ) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false) - defer func() { testutil.Ok(t, q.Close()) }() + + q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false, 2) + t.Cleanup(func() { + testutil.Ok(t, q.Close()) + }) e := promql.NewEngine(promql.EngineOpts{ Logger: logger, - Timeout: 5 * time.Second, + Timeout: 100 * time.Second, MaxSamples: math.MaxInt64, }) t.Run("Rate=5mStep=100s", func(t *testing.T) { + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second) testutil.Ok(t, err) - defer q.Close() r := q.Exec(context.Background()) testutil.Ok(t, r.Err) @@ -715,9 +717,10 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { }, vec) }) t.Run("Rate=30mStep=500s", func(t *testing.T) { + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second) testutil.Ok(t, err) - defer q.Close() r := q.Exec(context.Background()) testutil.Ok(t, r.Err) @@ -743,8 +746,11 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { "gprd", "fqdn", "web-08-sv-gprd.c.gitlab-production.internal", "instance", "web-08-sv-gprd.c.gitlab-production.internal:8083", "job", "gitlab-rails", "monitor", "app", "provider", "gcp", "region", "us-east", "shard", "default", "stage", "main", "tier", "sv", "type", "web", ) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false) - defer func() { testutil.Ok(t, q.Close()) }() + + q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false, 2) + t.Cleanup(func() { + testutil.Ok(t, q.Close()) + }) e := promql.NewEngine(promql.EngineOpts{ Logger: logger, @@ -752,9 +758,10 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { MaxSamples: math.MaxInt64, }) t.Run("Rate=5mStep=100s", func(t *testing.T) { + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second) testutil.Ok(t, err) - defer q.Close() r := q.Exec(context.Background()) testutil.Ok(t, r.Err) @@ -777,9 +784,10 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { }, vec) }) t.Run("Rate=30mStep=500s", func(t *testing.T) { + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) + q, err := e.NewRangeQuery(&mockedQueryable{q}, `rate(gitlab_transaction_cache_read_hit_count_total[30m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(30*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 500*time.Second) testutil.Ok(t, err) - defer q.Close() r := q.Exec(context.Background()) testutil.Ok(t, r.Err) @@ -800,7 +808,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { } func TestSortReplicaLabel(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) tests := []struct { input []storepb.Series @@ -867,7 +875,7 @@ func expandSeries(t testing.TB, it chunkenc.Iterator) (res []sample) { } func TestDedupSeriesSet(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) tests := []struct { input []series @@ -1198,7 +1206,7 @@ func TestDedupSeriesSet(t *testing.T) { } func TestDedupSeriesIterator(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) // The deltas between timestamps should be at least 10000 to not be affected // by the initial penalty of 5000, that will cause the second iterator to seek @@ -1300,7 +1308,7 @@ type storeServer struct { resps []*storepb.SeriesResponse } -func (s *storeServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (s *storeServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { for _, resp := range s.resps { err := srv.Send(resp) if err != nil {