From c566595b6197b72d23e46e4096db02054c5b7aa8 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Fri, 5 Jun 2020 17:55:57 +0200 Subject: [PATCH] Convert asyncSeriesSet to lazySeriesSet Signed-off-by: Kemal Akkoyun --- cmd/thanos/query.go | 9 +- docs/components/query.md | 3 + pkg/query/api/v1_test.go | 2 +- pkg/query/iter.go | 85 ++++++----------- pkg/query/iter_test.go | 140 ---------------------------- pkg/query/querier.go | 189 +++++++++++++++++++++++--------------- pkg/query/querier_test.go | 10 +- 7 files changed, 156 insertions(+), 282 deletions(-) delete mode 100644 pkg/query/iter_test.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 10b3c0c62bd..2f7f22e2594 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -66,6 +66,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). Default("20").Int() + maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query."). + Default("4").Int() + queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules."). Strings() @@ -158,6 +161,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { *webExternalPrefix, *webPrefixHeaderName, *maxConcurrentQueries, + *maxConcurrentSelects, time.Duration(*queryTimeout), time.Duration(*storeResponseTimeout), *queryReplicaLabels, @@ -200,7 +204,8 @@ func runQuery( webRoutePrefix string, webExternalPrefix string, webPrefixHeaderName string, - maxConcurrentQueries int, + _ int, // maxConcurrentQueries. + maxConcurrentSelects int, queryTimeout time.Duration, storeResponseTimeout time.Duration, queryReplicaLabels []string, @@ -279,7 +284,7 @@ func runQuery( ) proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) - queryableCreator = query.NewQueryableCreator(logger, reg, proxy) + queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects) engine = promql.NewEngine( promql.EngineOpts{ Logger: logger, diff --git a/docs/components/query.md b/docs/components/query.md index fb334164868..da5f4b06f29 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -314,6 +314,9 @@ Flags: --query.timeout=2m Maximum time to process query by query node. --query.max-concurrent=20 Maximum number of queries processed concurrently by query node. + --query.max-concurrent-select=4 + Maximum number of select requests made + concurrently per a query. --query.replica-label=QUERY.REPLICA-LABEL ... Labels to treat as a replica indicator along which data is deduplicated. Still you will be diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index e8374b016ee..2904d8c552e 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -107,7 +107,7 @@ func TestEndpoints(t *testing.T) { now := time.Now() api := &API{ - queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)), + queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2), queryEngine: promql.NewEngine(promql.EngineOpts{ Logger: nil, Reg: nil, diff --git a/pkg/query/iter.go b/pkg/query/iter.go index b401a59a41e..c51f81fb22e 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -4,7 +4,6 @@ package query import ( - "context" "math" "sort" @@ -12,8 +11,8 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/thanos-io/thanos/pkg/compact/downsample" - "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -29,6 +28,8 @@ type promSeriesSet struct { currLset []storepb.Label currChunks []storepb.AggrChunk + + warns storage.Warnings } func (s *promSeriesSet) Next() bool { @@ -101,8 +102,7 @@ func (s *promSeriesSet) Err() error { } func (s *promSeriesSet) Warnings() storage.Warnings { - // TODO(kakkoyun): Implement me! - return nil + return s.warns } // storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series. @@ -431,8 +431,7 @@ func (s *dedupSeriesSet) Err() error { } func (s *dedupSeriesSet) Warnings() storage.Warnings { - // TODO(kakkoyun): Implement me! - return nil + return s.set.Warnings() } type seriesWithLabels struct { @@ -668,71 +667,39 @@ func (it *dedupSeriesIterator) Err() error { return it.b.Err() } -var errPrematurelyClosedPromise = errors.New("promise channel closed before result received") - -type asyncSeriesSet struct { - ctx context.Context - promise chan storage.SeriesSet - result storage.SeriesSet -} - -func newAsyncSeriesSet(ctx context.Context, gate *gate.Gate, f func(ctx context.Context) (storage.SeriesSet, storage.Warnings, error)) storage.SeriesSet { - promise := make(chan storage.SeriesSet, 1) - go func() { - defer close(promise) - - if err := gate.IsMyTurn(ctx); err != nil { - promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) - } - defer gate.Done() - - set, _, err := f(ctx) - // TODO(kakkoyun): Handle warnings after Prometheus changes. - if err != nil { - promise <- storage.ErrSeriesSet(err) - } - promise <- set - }() +type lazySeriesSet struct { + create func() (s storage.SeriesSet, ok bool) - return &asyncSeriesSet{ctx: ctx, promise: promise} + set storage.SeriesSet } -func (s *asyncSeriesSet) Next() bool { - if s.result == nil { - select { - case <-s.ctx.Done(): - return false - case res, ok := <-s.promise: - if !ok { - return false - } - s.result = res - return res.Next() - } +func (c *lazySeriesSet) Next() bool { + if c.set != nil { + return c.set.Next() } - return s.result.Next() + var ok bool + c.set, ok = c.create() + return ok } -func (s *asyncSeriesSet) At() storage.Series { - return s.result.At() -} - -func (s *asyncSeriesSet) Err() error { - if err := s.ctx.Err(); err != nil { - return err +func (c *lazySeriesSet) Err() error { + if c.set != nil { + return c.set.Err() } + return nil +} - if s.result == nil { - return errPrematurelyClosedPromise +func (c *lazySeriesSet) At() storage.Series { + if c.set != nil { + return c.set.At() } - - return s.result.Err() + return nil } -func (s *asyncSeriesSet) Warnings() storage.Warnings { - if s.result != nil { - return s.result.Warnings() +func (c *lazySeriesSet) Warnings() storage.Warnings { + if c.set != nil { + return c.set.Warnings() } return nil } diff --git a/pkg/query/iter_test.go b/pkg/query/iter_test.go deleted file mode 100644 index 9f76b4950ef..00000000000 --- a/pkg/query/iter_test.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package query - -import ( - "context" - "testing" - - "github.com/pkg/errors" - "github.com/prometheus/prometheus/storage" - "github.com/thanos-io/thanos/pkg/testutil" -) - -type errSeriesSet struct { - ws storage.Warnings - err error -} - -func (s errSeriesSet) Next() bool { return false } -func (s errSeriesSet) At() storage.Series { return nil } -func (s errSeriesSet) Err() error { return s.err } -func (s errSeriesSet) Warnings() storage.Warnings { return s.ws } - -func TestAsyncSeriesSet_Next(t *testing.T) { - type fields struct { - ctx context.Context - promise chan storage.SeriesSet - } - - cancelledContext, cancel := context.WithCancel(context.Background()) - cancel() - - closedChannel := make(chan storage.SeriesSet) - close(closedChannel) - - channel := make(chan storage.SeriesSet, 1) - channel <- storage.EmptySeriesSet() - close(channel) - - tests := []struct { - name string - fields fields - want bool - }{ - { - name: "returns false when context cancelled", - fields: fields{ - ctx: cancelledContext, - }, - want: false, - }, - { - name: "returns false when channel closed", - fields: fields{ - ctx: context.TODO(), - promise: closedChannel, - }, - want: false, - }, - { - name: "proxies call to underlying set", - fields: fields{ - ctx: context.TODO(), - promise: channel, - }, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &asyncSeriesSet{ - ctx: tt.fields.ctx, - promise: tt.fields.promise, - } - testutil.Equals(t, tt.want, s.Next()) - }) - } -} - -func TestAsyncSeriesSet_Err(t *testing.T) { - type fields struct { - ctx context.Context - promise chan storage.SeriesSet - } - - cancelledContext, cancel := context.WithCancel(context.Background()) - cancel() - - closedChannel := make(chan storage.SeriesSet) - close(closedChannel) - - errRemote := errors.New("remote error") - channel := make(chan storage.SeriesSet, 1) - channel <- errSeriesSet{err: errRemote} - close(channel) - - tests := []struct { - name string - fields fields - want error - }{ - { - name: "returns error from context when context cancelled", - fields: fields{ - ctx: cancelledContext, - }, - want: context.Canceled, - }, - { - name: "returns sentinel error when channel closed", - fields: fields{ - ctx: context.TODO(), - promise: closedChannel, - }, - want: errPrematurelyClosedPromise, - }, - { - name: "proxies call to underlying set", - fields: fields{ - ctx: context.TODO(), - promise: channel, - }, - want: errRemote, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &asyncSeriesSet{ - ctx: tt.fields.ctx, - promise: tt.fields.promise, - } - for s.Next() { - } - if err := s.Err(); !errors.Is(err, tt.want) { - t.Errorf("asyncSeriesSet.Err() error = %v, want %v", err, tt.want) - } - }) - } -} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 37f3ec31fd2..06d0ffd954d 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -28,35 +28,37 @@ import ( type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer) QueryableCreator { +func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int) QueryableCreator { return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ - logger: logger, - reg: reg, - replicaLabels: replicaLabels, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponse: partialResponse, - skipChunks: skipChunks, + logger: logger, + reg: reg, + replicaLabels: replicaLabels, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponse: partialResponse, + skipChunks: skipChunks, + maxConcurrentSelects: maxConcurrentSelects, } } } type queryable struct { - logger log.Logger - reg prometheus.Registerer - replicaLabels []string - proxy storepb.StoreServer - deduplicate bool - maxResolutionMillis int64 - partialResponse bool - skipChunks bool + logger log.Logger + reg prometheus.Registerer + replicaLabels []string + proxy storepb.StoreServer + deduplicate bool + maxResolutionMillis int64 + partialResponse bool + skipChunks bool + maxConcurrentSelects int } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil + return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.maxConcurrentSelects), nil } type querier struct { @@ -71,6 +73,7 @@ type querier struct { maxResolutionMillis int64 partialResponse bool skipChunks bool + gate *gate.Gate } // newQuerier creates implementation of storage.Querier that fetches data from the proxy @@ -84,8 +87,8 @@ func newQuerier( proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, - partialResponse bool, - skipChunks bool, + partialResponse, skipChunks bool, + maxConcurrentSelects int, ) *querier { if logger == nil { logger = log.NewNopLogger() @@ -101,6 +104,7 @@ func newQuerier( logger: logger, reg: reg, cancel: cancel, + gate: gate.NewGate(maxConcurrentSelects, reg), mint: mint, maxt: maxt, replicaLabels: rl, @@ -167,7 +171,42 @@ func aggrsFromFunc(f string) []storepb.Aggr { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } -func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { +func (q *querier) Select(sort bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { + promise := make(chan storage.SeriesSet, 1) + + go func() { + defer close(promise) + + var err error + tracing.DoInSpan(q.ctx, "querier_select_ismyturn", func(ctx context.Context) { + err = q.gate.IsMyTurn(ctx) + }) + if err != nil { + promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn")) + } + defer q.gate.Done() + + set, err := q.selectFn(sort, hints, ms...) + if err != nil { + promise <- storage.ErrSeriesSet(err) + } + promise <- set + }() + + return &lazySeriesSet{create: func() (storage.SeriesSet, bool) { + select { + case <-q.ctx.Done(): + return storage.ErrSeriesSet(q.ctx.Err()), false + case val, ok := <-promise: + if !ok { + return storage.ErrSeriesSet(errors.New("TODO")), false + } + return val, true + } + }} +} + +func (q *querier) selectFn(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) { if hints == nil { hints = &storage.SelectHints{ Start: q.mint, @@ -175,70 +214,70 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match } } - // TODO(kakkoyun): Introduce a flag for maxConcurrentSelects? - return newAsyncSeriesSet(q.ctx, gate.NewGate(4, q.reg), func(ctx context.Context) (storage.SeriesSet, storage.Warnings, error) { - matchers := make([]string, len(ms)) - for i, m := range ms { - matchers[i] = m.String() - } - span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ - "minTime": hints.Start, - "maxTime": hints.End, - "matchers": "{" + strings.Join(matchers, ",") + "}", - }) - defer span.Finish() - - sms, err := storepb.TranslatePromMatchers(ms...) - if err != nil { - return nil, nil, errors.Wrap(err, "convert matchers") - } + matchers := make([]string, len(ms)) + for i, m := range ms { + matchers[i] = m.String() + } + span, ctx := tracing.StartSpan(q.ctx, "querier_select", opentracing.Tags{ + "minTime": hints.Start, + "maxTime": hints.End, + "matchers": "{" + strings.Join(matchers, ",") + "}", + }) + defer span.Finish() - aggrs := aggrsFromFunc(hints.Func) - - resp := &seriesServer{ctx: ctx} - if err := q.proxy.Series(&storepb.SeriesRequest{ - MinTime: hints.Start, - MaxTime: hints.End, - Matchers: sms, - MaxResolutionWindow: q.maxResolutionMillis, - Aggregates: aggrs, - PartialResponseDisabled: !q.partialResponse, - SkipChunks: q.skipChunks, - }, resp); err != nil { - return nil, nil, errors.Wrap(err, "proxy Series()") - } + sms, err := storepb.TranslatePromMatchers(ms...) + if err != nil { + return nil, errors.Wrap(err, "convert matchers") + } - var warns storage.Warnings - for _, w := range resp.warnings { - warns = append(warns, errors.New(w)) - } + aggrs := aggrsFromFunc(hints.Func) + + resp := &seriesServer{ctx: ctx} + if err := q.proxy.Series(&storepb.SeriesRequest{ + MinTime: hints.Start, + MaxTime: hints.End, + Matchers: sms, + MaxResolutionWindow: q.maxResolutionMillis, + Aggregates: aggrs, + PartialResponseDisabled: !q.partialResponse, + SkipChunks: q.skipChunks, + }, resp); err != nil { + return nil, errors.Wrap(err, "proxy Series()") + } - if !q.isDedupEnabled() { - // Return data without any deduplication. - return &promSeriesSet{ - mint: q.mint, - maxt: q.maxt, - set: newStoreSeriesSet(resp.seriesSet), - aggrs: aggrs, - }, warns, nil - } + var warns storage.Warnings + for _, w := range resp.warnings { + warns = append(warns, errors.New(w)) + } - // TODO(fabxc): this could potentially pushed further down into the store API - // to make true streaming possible. - sortDedupLabels(resp.seriesSet, q.replicaLabels) + if !q.isDedupEnabled() { + // Return data without any deduplication. - set := &promSeriesSet{ + return &promSeriesSet{ mint: q.mint, maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), aggrs: aggrs, - } + warns: warns, + }, nil + } - // The merged series set assembles all potentially-overlapping time ranges - // of the same series into a single one. The series are ordered so that equal series - // from different replicas are sequential. We can now deduplicate those. - return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), warns, nil - }) + // TODO(fabxc): this could potentially pushed further down into the store API + // to make true streaming possible. + sortDedupLabels(resp.seriesSet, q.replicaLabels) + + set := &promSeriesSet{ + mint: q.mint, + maxt: q.maxt, + set: newStoreSeriesSet(resp.seriesSet), + aggrs: aggrs, + warns: warns, + } + + // The merged series set assembles all potentially-overlapping time ranges + // of the same series into a single one. The series are ordered so that equal series + // from different replicas are sequential. We can now deduplicate those. + return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil } // sortDedupLabels re-sorts the set so that the same series with different replica diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index b4fefb519b9..140c1b35fe2 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -39,7 +39,7 @@ type sample struct { func TestQueryableCreator_MaxResolution(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() testProxy := &storeServer{resps: []*storepb.SeriesResponse{}} - queryableCreator := NewQueryableCreator(nil, nil, testProxy) + queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) queryable := queryableCreator(false, nil, oneHourMillis, false, false) @@ -69,7 +69,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, nil, testProxy)(false, nil, 9999999, false, false) + q := NewQueryableCreator(nil, nil, testProxy, 2)(false, nil, 9999999, false, false) engine := promql.NewEngine( promql.EngineOpts{ @@ -510,7 +510,7 @@ func TestQuerier_Select(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) - q := newQuerier(ctx, nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false) + q := newQuerier(ctx, nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false, 2) defer testutil.Ok(t, q.Close()) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -676,7 +676,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { "gprd", "fqdn", "web-08-sv-gprd.c.gitlab-production.internal", "instance", "web-08-sv-gprd.c.gitlab-production.internal:8083", "job", "gitlab-rails", "monitor", "app", "provider", "gcp", "region", "us-east", "replica", "02", "shard", "default", "stage", "main", "tier", "sv", "type", "web", ) - q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false) + q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false, 2) defer func() { testutil.Ok(t, q.Close()) }() e := promql.NewEngine(promql.EngineOpts{ @@ -743,7 +743,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { "gprd", "fqdn", "web-08-sv-gprd.c.gitlab-production.internal", "instance", "web-08-sv-gprd.c.gitlab-production.internal:8083", "job", "gitlab-rails", "monitor", "app", "provider", "gcp", "region", "us-east", "shard", "default", "stage", "main", "tier", "sv", "type", "web", ) - q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false) + q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false, 2) defer func() { testutil.Ok(t, q.Close()) }() e := promql.NewEngine(promql.EngineOpts{