diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b31caa0e422..d2c1eea2df1 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -56,6 +56,7 @@ type storeConfig struct { httpConfig httpConfig indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes + seriesBatchSize int maxSampleCount uint64 maxTouchedSeriesCount uint64 maxDownloadedBytes units.Base2Bytes @@ -129,6 +130,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&sc.blockMetaFetchConcurrency) + cmd.Flag("debug.series-batch-size", "The batch size when fetching series from object storage."). + Hidden().Default("10000").IntVar(&sc.seriesBatchSize) + sc.filterConf = &store.FilterConfig{} cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). @@ -338,6 +342,7 @@ func runStore( store.WithChunkPool(chunkPool), store.WithFilterConfig(conf.filterConf), store.WithChunkHashCalculation(true), + store.WithSeriesBatchSize(conf.seriesBatchSize), } if conf.debugLogging { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5bc798ca7fc..3b42ea1d062 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -103,6 +103,9 @@ const ( minBlockSyncConcurrency = 1 enableChunkHashCalculation = true + + // The default batch size when fetching series from object storage. + defaultSeriesBatchSize = 10000 ) var ( @@ -309,6 +312,7 @@ type BucketStore struct { indexReaderPool *indexheader.ReaderPool buffers sync.Pool chunkPool pool.Bytes + seriesBatchSize int // Sets of blocks that have the same labels. They are indexed by a hash over their label set. mtx sync.RWMutex @@ -423,6 +427,12 @@ func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption } } +func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption { + return func(s *BucketStore) { + s.seriesBatchSize = seriesBatchSize + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -464,6 +474,7 @@ func NewBucketStore( postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: defaultSeriesBatchSize, } for _, option := range options { @@ -790,25 +801,20 @@ type seriesEntry struct { chks []storepb.AggrChunk } -const batchSize = 10000 - // blockSeriesClient is a storepb.Store_SeriesClient for a // single TSDB block in object storage. type blockSeriesClient struct { grpc.ClientStream - ctx context.Context - logger log.Logger - extLset labels.Labels - blockMatchers []*labels.Matcher + ctx context.Context + logger log.Logger + extLset labels.Labels - postings []storage.SeriesRef - i int mint int64 maxt int64 indexr *bucketIndexReader chunkr *bucketChunkReader loadAggregates []storepb.Aggr - limiter ChunksLimiter + chunksLimiter ChunksLimiter bytesLimiter BytesLimiter skipChunks bool @@ -816,6 +822,8 @@ type blockSeriesClient struct { calculateChunkHash bool // Internal state + i int + postings []storage.SeriesRef symbolizedLset []symbolizedLabel entries []seriesEntry batch []*storepb.SeriesResponse @@ -831,6 +839,7 @@ func newBlockSeriesClient( bytesLimiter BytesLimiter, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, + batchSize int, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -838,16 +847,16 @@ func newBlockSeriesClient( } return &blockSeriesClient{ - ctx: ctx, - logger: logger, - extLset: b.extLset, - mint: req.MinTime, - maxt: req.MaxTime, - indexr: b.indexReader(), - chunkr: chunkr, - limiter: limiter, - bytesLimiter: bytesLimiter, - skipChunks: req.SkipChunks, + ctx: ctx, + logger: logger, + extLset: b.extLset, + mint: req.MinTime, + maxt: req.MaxTime, + indexr: b.indexReader(), + chunkr: chunkr, + chunksLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: req.SkipChunks, loadAggregates: req.Aggregates, shardMatcher: shardMatcher, @@ -866,7 +875,11 @@ func (b *blockSeriesClient) Close() { } func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { - return stats.merge(b.indexr.stats) + stats = stats.merge(b.indexr.stats) + if !b.skipChunks { + stats = stats.merge(b.chunkr.stats) + } + return stats } func (b *blockSeriesClient) ExpandPostings( @@ -912,7 +925,7 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i - end := start + batchSize + end := start + defaultSeriesBatchSize if end > len(b.postings) { end = len(b.postings) } @@ -940,8 +953,7 @@ func (b *blockSeriesClient) nextBatch() error { var chks []chunks.Meta ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) if err != nil { - b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series"))) - continue + return errors.Wrap(err, "read series") } if !ok { continue @@ -982,7 +994,7 @@ func (b *blockSeriesClient) nextBatch() error { } // Ensure sample limit through chunksLimiter if we return chunks. - if err := b.limiter.Reserve(uint64(len(chks))); err != nil { + if err := b.chunksLimiter.Reserve(uint64(len(chks))); err != nil { return errors.Wrap(err, "exceeded chunks limit") } @@ -1149,7 +1161,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} - res []respSet + respSets []respSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1202,6 +1214,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter, shardMatcher, s.enableChunkHashCalculation, + s.seriesBatchSize, ) defer blockClient.Close() @@ -1236,7 +1249,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie ) mtx.Lock() - res = append(res, part) + respSets = append(respSets, part) mtx.Unlock() return nil @@ -1288,7 +1301,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } return status.Error(code, err.Error()) } - stats.blocksQueried = len(res) + stats.blocksQueried = len(respSets) stats.GetAllDuration = time.Since(begin) s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) @@ -1296,10 +1309,24 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { + defer func() { + for _, resp := range respSets { + resp.Close() + } + }() begin := time.Now() - set := NewProxyResponseHeap(res...) + set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...)) for set.Next() { at := set.At() + warn := at.GetWarning() + if warn != "" { + // TODO(fpetkovski): Consider deprecating string based warnings in favor of a + // separate protobuf message containing the grpc code and + // a human readable error message. + err = status.Error(storepb.GRPCCodeFromWarn(warn), at.GetWarning()) + return + } + series := at.GetSeries() if series != nil { stats.mergedSeriesCount++ @@ -1318,6 +1345,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie err = nil }) + if err != nil { + return err + } if s.enableSeriesResponseHints { var anyHints *types.Any @@ -1429,7 +1459,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, @@ -1604,7 +1634,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 4efebf6e145..3e42539cc63 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2444,7 +2444,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false) + blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, defaultSeriesBatchSize) testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, dummyCounter)) defer blockClient.Close() diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index eaa96f1ede6..c1f4b9b8bfe 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "google.golang.org/grpc/codes" "github.com/thanos-io/thanos/pkg/store/labelpb" ) @@ -51,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { } } +func GRPCCodeFromWarn(warn string) codes.Code { + if strings.Contains(warn, "rpc error: code = ResourceExhausted") { + return codes.ResourceExhausted + } + if strings.Contains(warn, "rpc error: code = Code(422)") { + return 422 + } + return codes.Unknown +} + type emptySeriesSet struct{} func (emptySeriesSet) Next() bool { return false }