Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to customize BucketStore query gate #2798

Merged
merged 1 commit into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -276,16 +278,27 @@ func runStore(
return errors.Wrap(err, "meta fetcher")
}

// Limit the concurrency on queries against the Thanos store.
if maxConcurrency < 0 {
return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency)
}

queriesGate := gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrency)
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
}).Set(float64(maxConcurrency))

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
metaFetcher,
dataDir,
indexCache,
queriesGate,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrency,
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
32 changes: 11 additions & 21 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -101,7 +100,6 @@ type bucketStoreMetrics struct {
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
seriesRefetches prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
Expand Down Expand Up @@ -184,10 +182,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_queries_dropped_total",
Help: "Number of queries that were dropped due to the sample limit.",
})
m.queriesLimit = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
})
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
Expand Down Expand Up @@ -273,9 +267,9 @@ func NewBucketStore(
fetcher block.MetadataFetcher,
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand All @@ -288,10 +282,6 @@ func NewBucketStore(
logger = log.NewNopLogger()
}

if maxConcurrent < 0 {
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
Expand All @@ -310,7 +300,7 @@ func NewBucketStore(
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
filterConfig: filterConfig,
queryGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrent),
queryGate: queryGate,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
Expand All @@ -324,8 +314,6 @@ func NewBucketStore(
return nil, errors.Wrap(err, "create dir")
}

s.metrics.queriesLimit.Set(float64(maxConcurrent))

return s, nil
}

Expand Down Expand Up @@ -844,14 +832,16 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
})
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}
if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
})
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}

defer s.queryGate.Done()
defer s.queryGate.Done()
}

matchers, err := promclient.TranslateMatchers(req.Matchers)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
metaFetcher,
dir,
s.cache,
nil,
0,
maxSampleCount,
20,
false,
20,
filterConf,
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ func TestBucketStore_Info(t *testing.T) {
nil,
dir,
noopCache{},
nil,
2e5,
0,
0,
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -825,9 +825,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
metaFetcher,
dir,
noopCache{},
nil,
0,
0,
99,
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1776,9 +1776,9 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
fetcher,
tmpDir,
indexCache,
nil,
1000000,
10000,
10,
false,
10,
nil,
Expand Down Expand Up @@ -1885,9 +1885,9 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
fetcher,
tmpDir,
indexCache,
nil,
1000000,
10000,
10,
false,
10,
nil,
Expand Down