Skip to content

Commit

Permalink
Implement async select for Querier
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Jun 12, 2020
1 parent 48ee1f8 commit d6eeef0
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 53 deletions.
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()

Expand Down Expand Up @@ -159,6 +162,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*maxConcurrentQueries,
*maxConcurrentSelects,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -202,6 +206,7 @@ func runQuery(
webExternalPrefix string,
webPrefixHeaderName string,
maxConcurrentQueries int,
maxConcurrentSelects int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -280,7 +285,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestEndpoints(t *testing.T) {

now := time.Now()
api := &API{
queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
Expand Down
37 changes: 37 additions & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,40 @@ func (it *dedupSeriesIterator) Err() error {
}
return it.b.Err()
}

type lazySeriesSet struct {
create func() (s storage.SeriesSet, ok bool)

set storage.SeriesSet
}

func (c *lazySeriesSet) Next() bool {
if c.set != nil {
return c.set.Next()
}

var ok bool
c.set, ok = c.create()
return ok
}

func (c *lazySeriesSet) Err() error {
if c.set != nil {
return c.set.Err()
}
return nil
}

func (c *lazySeriesSet) At() storage.Series {
if c.set != nil {
return c.set.At()
}
return nil
}

func (c *lazySeriesSet) Warnings() storage.Warnings {
if c.set != nil {
return c.set.Warnings()
}
return nil
}
103 changes: 77 additions & 26 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand All @@ -27,38 +30,43 @@ import (
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int) QueryableCreator {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
logger: logger,
reg: reg,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
maxConcurrentSelects: maxConcurrentSelects,
}
}
}

type queryable struct {
logger log.Logger
replicaLabels []string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
logger log.Logger
reg prometheus.Registerer
replicaLabels []string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
maxConcurrentSelects int
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.maxConcurrentSelects), nil
}

type querier struct {
ctx context.Context
logger log.Logger
reg prometheus.Registerer
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
Expand All @@ -67,20 +75,22 @@ type querier struct {
maxResolutionMillis int64
partialResponse bool
skipChunks bool
gate *gate.Gate
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
// store API endpoints.
func newQuerier(
ctx context.Context,
logger log.Logger,
reg prometheus.Registerer,
mint, maxt int64,
replicaLabels []string,
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
skipChunks bool,
partialResponse, skipChunks bool,
maxConcurrentSelects int,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -92,9 +102,15 @@ func newQuerier(
rl[replicaLabel] = struct{}{}
}
return &querier{
ctx: ctx,
logger: logger,
cancel: cancel,
ctx: ctx,
logger: logger,
reg: reg,
cancel: cancel,
gate: gate.NewGate(
maxConcurrentSelects,
extprom.WrapRegistererWithPrefix("thanos_concurrent_select", reg),
),

mint: mint,
maxt: maxt,
replicaLabels: rl,
Expand Down Expand Up @@ -161,7 +177,42 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
func (q *querier) Select(sort bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
promise := make(chan storage.SeriesSet, 1)

go func() {
defer close(promise)

var err error
tracing.DoInSpan(q.ctx, "querier_select_ismyturn", func(ctx context.Context) {
err = q.gate.IsMyTurn(ctx)
})
if err != nil {
promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn"))
return
}
defer q.gate.Done()

set, err := q.selectFn(sort, hints, ms...)
if err != nil {
promise <- storage.ErrSeriesSet(err)
return
}

promise <- set
}()

return &lazySeriesSet{create: func() (storage.SeriesSet, bool) {
// Only gets called once, for the first Next() call of the series set.
set, ok := <-promise
if !ok {
return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false
}
return set, set.Next()
}}
}

func (q *querier) selectFn(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
if hints == nil {
hints = &storage.SelectHints{
Start: q.mint,
Expand All @@ -182,7 +233,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match

sms, err := storepb.TranslatePromMatchers(ms...)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "convert matchers"))
return nil, errors.Wrap(err, "convert matchers")
}

aggrs := aggrsFromFunc(hints.Func)
Expand All @@ -197,7 +248,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "proxy Series()"))
return nil, errors.Wrap(err, "proxy Series()")
}

var warns storage.Warnings
Expand All @@ -213,7 +264,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
warns: warns,
}
}, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible.
Expand All @@ -228,7 +279,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match

// The merged series set assembles all potentially-overlapping time ranges of the same series into a single one.
// TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available.
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER)
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
Expand Down
Loading

0 comments on commit d6eeef0

Please sign in to comment.