diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 940bec6307..580f868603 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -15,6 +15,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/relabel" "github.com/thanos-io/thanos/pkg/block" @@ -22,6 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -276,6 +278,17 @@ func runStore( return errors.Wrap(err, "meta fetcher") } + // Limit the concurrency on queries against the Thanos store. + if maxConcurrency < 0 { + return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency) + } + + queriesGate := gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrency) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_concurrent_max", + Help: "Number of maximum concurrent queries.", + }).Set(float64(maxConcurrency)) + bs, err := store.NewBucketStore( logger, reg, @@ -283,9 +296,9 @@ func runStore( metaFetcher, dataDir, indexCache, + queriesGate, chunkPoolSizeBytes, maxSampleCount, - maxConcurrency, verbose, blockSyncConcurrency, filterConf, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e5793d78c0..82513889f0 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -40,7 +40,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" @@ -101,7 +100,6 @@ type bucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesDropped prometheus.Counter - queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec @@ -184,10 +182,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the sample limit.", }) - m.queriesLimit = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_concurrent_max", - Help: "Number of maximum concurrent queries.", - }) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), @@ -273,9 +267,9 @@ func NewBucketStore( fetcher block.MetadataFetcher, dir string, indexCache storecache.IndexCache, + queryGate gate.Gate, maxChunkPoolBytes uint64, maxSampleCount uint64, - maxConcurrent int, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -288,10 +282,6 @@ func NewBucketStore( logger = log.NewNopLogger() } - if maxConcurrent < 0 { - return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) - } - chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) if err != nil { return nil, errors.Wrap(err, "create chunk pool") @@ -310,7 +300,7 @@ func NewBucketStore( debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, filterConfig: filterConfig, - queryGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrent), + queryGate: queryGate, samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, @@ -324,8 +314,6 @@ func NewBucketStore( return nil, errors.Wrap(err, "create dir") } - s.metrics.queriesLimit.Set(float64(maxConcurrent)) - return s, nil } @@ -844,14 +832,16 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { - tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { - err = s.queryGate.Start(srv.Context()) - }) - if err != nil { - return errors.Wrapf(err, "failed to wait for turn") - } + if s.queryGate != nil { + tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { + err = s.queryGate.Start(srv.Context()) + }) + if err != nil { + return errors.Wrapf(err, "failed to wait for turn") + } - defer s.queryGate.Done() + defer s.queryGate.Done() + } matchers, err := promclient.TranslateMatchers(req.Matchers) if err != nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6664c30825..1b25d6b9d8 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -159,9 +159,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m metaFetcher, dir, s.cache, + nil, 0, maxSampleCount, - 20, false, 20, filterConf, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c21dfbf29d..8b0f71ec64 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -575,9 +575,9 @@ func TestBucketStore_Info(t *testing.T) { nil, dir, noopCache{}, + nil, 2e5, 0, - 0, false, 20, allowAllFilterConf, @@ -825,9 +825,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul metaFetcher, dir, noopCache{}, + nil, 0, 0, - 99, false, 20, allowAllFilterConf, @@ -1776,9 +1776,9 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { fetcher, tmpDir, indexCache, + nil, 1000000, 10000, - 10, false, 10, nil, @@ -1885,9 +1885,9 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { fetcher, tmpDir, indexCache, + nil, 1000000, 10000, - 10, false, 10, nil,