Skip to content

Commit

Permalink
Convert asyncSeriesSet to lazySeriesSet
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Jun 5, 2020
1 parent 0399f81 commit c566595
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 282 deletions.
9 changes: 7 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()

Expand Down Expand Up @@ -158,6 +161,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*maxConcurrentQueries,
*maxConcurrentSelects,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -200,7 +204,8 @@ func runQuery(
webRoutePrefix string,
webExternalPrefix string,
webPrefixHeaderName string,
maxConcurrentQueries int,
_ int, // maxConcurrentQueries.
maxConcurrentSelects int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -279,7 +284,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestEndpoints(t *testing.T) {

now := time.Now()
api := &API{
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
Expand Down
85 changes: 26 additions & 59 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package query

import (
"context"
"math"
"sort"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

Expand All @@ -29,6 +28,8 @@ type promSeriesSet struct {

currLset []storepb.Label
currChunks []storepb.AggrChunk

warns storage.Warnings
}

func (s *promSeriesSet) Next() bool {
Expand Down Expand Up @@ -101,8 +102,7 @@ func (s *promSeriesSet) Err() error {
}

func (s *promSeriesSet) Warnings() storage.Warnings {
// TODO(kakkoyun): Implement me!
return nil
return s.warns
}

// storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series.
Expand Down Expand Up @@ -431,8 +431,7 @@ func (s *dedupSeriesSet) Err() error {
}

func (s *dedupSeriesSet) Warnings() storage.Warnings {
// TODO(kakkoyun): Implement me!
return nil
return s.set.Warnings()
}

type seriesWithLabels struct {
Expand Down Expand Up @@ -668,71 +667,39 @@ func (it *dedupSeriesIterator) Err() error {
return it.b.Err()
}

var errPrematurelyClosedPromise = errors.New("promise channel closed before result received")

type asyncSeriesSet struct {
ctx context.Context
promise chan storage.SeriesSet
result storage.SeriesSet
}

func newAsyncSeriesSet(ctx context.Context, gate *gate.Gate, f func(ctx context.Context) (storage.SeriesSet, storage.Warnings, error)) storage.SeriesSet {
promise := make(chan storage.SeriesSet, 1)
go func() {
defer close(promise)

if err := gate.IsMyTurn(ctx); err != nil {
promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn"))
}
defer gate.Done()

set, _, err := f(ctx)
// TODO(kakkoyun): Handle warnings after Prometheus changes.
if err != nil {
promise <- storage.ErrSeriesSet(err)
}
promise <- set
}()
type lazySeriesSet struct {
create func() (s storage.SeriesSet, ok bool)

return &asyncSeriesSet{ctx: ctx, promise: promise}
set storage.SeriesSet
}

func (s *asyncSeriesSet) Next() bool {
if s.result == nil {
select {
case <-s.ctx.Done():
return false
case res, ok := <-s.promise:
if !ok {
return false
}
s.result = res
return res.Next()
}
func (c *lazySeriesSet) Next() bool {
if c.set != nil {
return c.set.Next()
}

return s.result.Next()
var ok bool
c.set, ok = c.create()
return ok
}

func (s *asyncSeriesSet) At() storage.Series {
return s.result.At()
}

func (s *asyncSeriesSet) Err() error {
if err := s.ctx.Err(); err != nil {
return err
func (c *lazySeriesSet) Err() error {
if c.set != nil {
return c.set.Err()
}
return nil
}

if s.result == nil {
return errPrematurelyClosedPromise
func (c *lazySeriesSet) At() storage.Series {
if c.set != nil {
return c.set.At()
}

return s.result.Err()
return nil
}

func (s *asyncSeriesSet) Warnings() storage.Warnings {
if s.result != nil {
return s.result.Warnings()
func (c *lazySeriesSet) Warnings() storage.Warnings {
if c.set != nil {
return c.set.Warnings()
}
return nil
}
140 changes: 0 additions & 140 deletions pkg/query/iter_test.go

This file was deleted.

Loading

0 comments on commit c566595

Please sign in to comment.