diff --git a/CHANGELOG.md b/CHANGELOG.md index eae8477f48..0f573b42ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,23 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver +- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples. + +New options: + +* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources. +* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment. + +New metrics: +* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; +* `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many Series() calls can concurrently be executed by Thanos Store; +* `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; +* `thanos_bucket_store_gate_duration_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. + +New tracing span: +* `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate. + +:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query. ### Fixed - [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far @@ -23,7 +40,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#851](https://github.com/improbable-eng/thanos/pull/851) New read API endpoint for api/v1/rules and api/v1/alerts. - [#873](https://github.com/improbable-eng/thanos/pull/873) Store: fix set index cache LRU -:warning: **WARING** :warning: #873 fix fixes actual handling of `index-cache-size`. Handling of limit for this cache was +:warning: **WARNING** :warning: #873 fix fixes actual handling of `index-cache-size`. Handling of limit for this cache was broken so it was unbounded all the time. From this release actual value matters and is extremely low by default. To "revert" the old behaviour (no boundary), use a large enough value. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ddf9893061..7f77e15412 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,6 +36,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() + maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). + Default("0").Uint() + + maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() + objStoreConfig := regCommonObjStoreFlags(cmd, "", true) syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). @@ -63,6 +69,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string peer, uint64(*indexCacheSize), uint64(*chunkPoolSize), + uint64(*maxSampleCount), + int(*maxConcurrent), name, debugLogging, *syncInterval, @@ -87,6 +95,8 @@ func runStore( peer cluster.Peer, indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, + maxSampleCount uint64, + maxConcurrent int, component string, verbose bool, syncInterval time.Duration, @@ -117,6 +127,8 @@ func runStore( dataDir, indexCacheSizeBytes, chunkPoolSizeBytes, + maxSampleCount, + maxConcurrent, verbose, blockSyncConcurrency, ) diff --git a/docs/components/store.md b/docs/components/store.md index 8b9d5fc79c..7d26d79627 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -104,6 +104,15 @@ Flags: --index-cache-size=250MB Maximum size of items held in the index cache. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes for chunks. + --store.grpc.series-sample-limit=0 + Maximum amount of samples returned via a single + Series call. 0 means no limit. NOTE: for + efficiency we take 120 as the number of samples + in chunk (it cannot be bigger than that), so + the actual number of samples might be lower, + even though the maximum could be hit. + --store.grpc.series-max-concurrency=20 + Maximum number of concurrent Series calls. --objstore.config-file= Path to YAML file that contains object store configuration. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6c38ee81a7..d03167499e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -22,6 +22,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/component" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/pool" "github.com/improbable-eng/thanos/pkg/runutil" @@ -42,6 +43,14 @@ import ( "google.golang.org/grpc/status" ) +// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed +// for precalculating the number of samples that we may have to retrieve and decode for any given query +// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know +// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way +// because you barely get any improvements in compression when the number of samples is beyond this. +// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. +const maxSamplesPerChunk = 120 + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -57,6 +66,8 @@ type bucketStoreMetrics struct { seriesMergeDuration prometheus.Histogram resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram + queriesDropped prometheus.Counter + queriesLimit prometheus.Gauge } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -132,6 +143,15 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }, }) + m.queriesDropped = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_queries_dropped_total", + Help: "Number of queries that were dropped due to the sample limit.", + }) + m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_concurrent_max", + Help: "Number of maximum concurrent queries.", + }) + if reg != nil { reg.MustRegister( m.blockLoads, @@ -148,6 +168,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.seriesMergeDuration, m.resultSeriesCount, m.chunkSizeBytes, + m.queriesDropped, + m.queriesLimit, ) } return &m @@ -173,7 +195,12 @@ type BucketStore struct { // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int - partitioner partitioner + // Query gate which limits the maximum amount of concurrent queries. + queryGate *Gate + + // samplesLimiter limits the number of samples per each Series() call. + samplesLimiter *Limiter + partitioner partitioner } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -185,12 +212,19 @@ func NewBucketStore( dir string, indexCacheSizeBytes uint64, maxChunkPoolBytes uint64, + maxSampleCount uint64, + maxConcurrent int, debugLogging bool, blockSyncConcurrency int, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() } + + if maxConcurrent < 0 { + return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) + } + indexCache, err := newIndexCache(reg, indexCacheSizeBytes) if err != nil { return nil, errors.Wrap(err, "create index cache") @@ -202,6 +236,7 @@ func NewBucketStore( const maxGapSize = 512 * 1024 + metrics := newBucketStoreMetrics(reg) s := &BucketStore{ logger: logger, bucket: bucket, @@ -212,14 +247,18 @@ func NewBucketStore( blockSets: map[uint64]*bucketBlockSet{}, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + queryGate: NewGate(maxConcurrent, extprom.NewSubsystem(reg, "thanos_bucket_store")), + samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, } - s.metrics = newBucketStoreMetrics(reg) + s.metrics = metrics if err := os.MkdirAll(dir, 0777); err != nil { return nil, errors.Wrap(err, "create dir") } + s.metrics.queriesLimit.Set(float64(maxConcurrent)) + return s, nil } @@ -472,7 +511,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } -func (s *BucketStore) blockSeries( +func blockSeries( ctx context.Context, ulid ulid.ULID, extLset map[string]string, @@ -480,6 +519,7 @@ func (s *BucketStore) blockSeries( chunkr *bucketChunkReader, matchers []labels.Matcher, req *storepb.SeriesRequest, + samplesLimiter *Limiter, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -557,7 +597,7 @@ func (s *BucketStore) blockSeries( } // Preload all chunks that were marked in the previous stage. - if err := chunkr.preload(); err != nil { + if err := chunkr.preload(samplesLimiter); err != nil { return nil, nil, errors.Wrap(err, "preload chunks") } @@ -661,10 +701,17 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels } // Series implements the storepb.StoreServer interface. -// TODO(bwplotka): It buffers all chunks in memory and only then streams to client. -// 1. Either count chunk sizes and error out too big query. -// 2. Stream posting -> series -> chunk all together. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { + { + span, _ := tracing.StartSpan(srv.Context(), "store_query_gate_ismyturn") + err := s.queryGate.IsMyTurn(srv.Context()) + span.Finish() + if err != nil { + return errors.Wrapf(err, "failed to wait for turn") + } + } + defer s.queryGate.Done() + matchers, err := translateMatchers(req.Matchers) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -703,13 +750,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") g.Add(func() error { - part, pstats, err := s.blockSeries(ctx, + part, pstats, err := blockSeries(ctx, b.meta.ULID, b.meta.Thanos.Labels, indexr, chunkr, blockMatchers, req, + s.samplesLimiter, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1589,11 +1637,21 @@ func (r *bucketChunkReader) addPreload(id uint64) error { } // preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload() error { +func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { const maxChunkSize = 16000 var g run.Group + numChunks := uint64(0) + for _, offsets := range r.preloads { + for range offsets { + numChunks++ + } + } + if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil { + return errors.Wrap(err, "exceeded samples limit") + } + for seq, offsets := range r.preloads { sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 8133e9bc86..997767d055 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -35,7 +35,7 @@ func (s *storeSuite) Close() { s.wg.Wait() } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20) testutil.Ok(t, err) s.store = store @@ -334,7 +334,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0) defer s.Close() testBucketStore_e2e(t, ctx, s) @@ -363,7 +363,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0) defer s.Close() testBucketStore_e2e(t, ctx, s) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index b0d43f23ce..18f953c298 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "prometheus-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) diff --git a/pkg/store/gate.go b/pkg/store/gate.go new file mode 100644 index 0000000000..dbbcb7d72a --- /dev/null +++ b/pkg/store/gate.go @@ -0,0 +1,64 @@ +package store + +import ( + "context" + "time" + + "github.com/improbable-eng/thanos/pkg/extprom" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/gate" +) + +// Gate wraps the Prometheus gate with extra metrics. +type Gate struct { + g *gate.Gate + inflightQueries prometheus.Gauge + gateTiming prometheus.Histogram +} + +// NewGate returns a new query gate. +func NewGate(maxConcurrent int, reg *extprom.SubsystemRegisterer) *Gate { + g := &Gate{ + g: gate.New(maxConcurrent), + } + g.inflightQueries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "queries_in_flight", + Help: "Number of queries that are currently in flight.", + Subsystem: reg.Subsystem(), + }) + g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gate_duration_seconds", + Help: "How many seconds it took for queries to wait at the gate.", + Buckets: []float64{ + 0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 10, + }, + Subsystem: reg.Subsystem(), + }) + + if r := reg.Registerer(); r != nil { + r.MustRegister(g.inflightQueries, g.gateTiming) + } + + return g +} + +// IsMyTurn iniates a new query and waits until it's our turn to fulfill a query request. +func (g *Gate) IsMyTurn(ctx context.Context) error { + start := time.Now() + defer func() { + g.gateTiming.Observe(float64(time.Now().Sub(start))) + }() + + if err := g.g.Start(ctx); err != nil { + return err + } + + g.inflightQueries.Inc() + return nil +} + +// Done finishes a query. +func (g *Gate) Done() { + g.inflightQueries.Dec() + g.g.Done() +} diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go new file mode 100644 index 0000000000..2c332a2c6b --- /dev/null +++ b/pkg/store/limiter.go @@ -0,0 +1,31 @@ +package store + +import ( + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) + +// Limiter is a simple mechanism for checking if something has passed a certain threshold. +type Limiter struct { + limit uint64 + + // Counter metric which we will increase if Check() fails. + failedCounter prometheus.Counter +} + +// NewLimiter returns a new limiter with a specified limit. 0 disables the limit. +func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { + return &Limiter{limit: limit, failedCounter: ctr} +} + +// Check checks if the passed number exceeds the limits or not. +func (l *Limiter) Check(num uint64) error { + if l.limit == 0 { + return nil + } + if num > l.limit { + l.failedCounter.Inc() + return errors.Errorf("limit %v violated (got %v)", l.limit, num) + } + return nil +}