From 9ed9a91d4b52ddd16d92a61702d3de9387917c22 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 28 Oct 2022 09:39:52 +0200 Subject: [PATCH 01/13] Implement lazy retrieval of series from object store. The bucket store fetches series in a single blocking operation from object storage. This is likely not an ideal strategy when it comes to latency and resource usage. In addition, it causes the store to buffer everything in memory before starting to send results to queriers. This commit modifies the series retrieval to use the proxy response heap and take advantage of the k-way merge used in the proxy store. Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 314 ++++++++++++++++++++++++--------------- pkg/store/bucket_test.go | 3 +- pkg/store/proxy_heap.go | 36 +++-- 3 files changed, 217 insertions(+), 136 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a7a5d3abd9..e0fc710482 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -37,6 +37,8 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/sync/errgroup" + + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -128,6 +130,7 @@ type bucketStoreMetrics struct { queriesDropped *prometheus.CounterVec seriesRefetches prometheus.Counter emptyPostingCount prometheus.Counter + emptyStreamResponses prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -276,6 +279,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of empty postings when fetching block series.", }) + m.emptyStreamResponses = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_empty_stream_responses_total", + Help: "Total number of empty responses received.", + }) + return &m } @@ -782,33 +790,133 @@ type seriesEntry struct { chks []storepb.AggrChunk } -type bucketSeriesSet struct { - set []seriesEntry - i int - err error +// blockSeriesClient is a storepb.Store_SeriesClient for a +// single TSDB block in object storage. +type blockSeriesClient struct { + grpc.ClientStream + ctx context.Context + extLset labels.Labels + + ps []storage.SeriesRef + i int + mint int64 + maxt int64 + indexr *bucketIndexReader + chunkr *bucketChunkReader + chkLimiter ChunksLimiter + bytesLimiter BytesLimiter + loadAggregates []storepb.Aggr + skipChunks bool + + shardMatcher *storepb.ShardMatcher + calculateChunkHash bool + + // Transform all series into the response types and mark their relevant chunks + // for preloading. + symbolizedLset []symbolizedLabel } -func newBucketSeriesSet(set []seriesEntry) *bucketSeriesSet { - return &bucketSeriesSet{ - set: set, - i: -1, +func emptyBlockSeriesClient() *blockSeriesClient { + return &blockSeriesClient{ + ps: nil, } } -func (s *bucketSeriesSet) Next() bool { - if s.i >= len(s.set)-1 { - return false +func newBlockSeriesClient( + ctx context.Context, + extLset labels.Labels, + ps []storage.SeriesRef, + minTime int64, + maxTime int64, + indexr *bucketIndexReader, + chunkr *bucketChunkReader, + limiter ChunksLimiter, + bytesLimiter BytesLimiter, + skipChunks bool, + loadAggregates []storepb.Aggr, + shardMatcher *storepb.ShardMatcher, + calculateChunkHash bool, +) *blockSeriesClient { + return &blockSeriesClient{ + ctx: ctx, + extLset: extLset, + ps: ps, + i: -1, + mint: minTime, + maxt: maxTime, + indexr: indexr, + chunkr: chunkr, + chkLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: skipChunks, + + loadAggregates: loadAggregates, + shardMatcher: shardMatcher, + calculateChunkHash: calculateChunkHash, + } +} + +func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { + b.i++ + if b.i >= len(b.ps) { + return nil, io.EOF + } + + var chks []chunks.Meta + ok, err := b.indexr.LoadSeriesForTime(b.ps[b.i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) + if err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")), nil + } + if !ok { + return b.Recv() } - s.i++ - return true -} -func (s *bucketSeriesSet) At() (labels.Labels, []storepb.AggrChunk) { - return s.set[s.i].lset, s.set[s.i].chks -} + var lset labels.Labels + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols")), nil + } -func (s *bucketSeriesSet) Err() error { - return s.err + completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) + if !b.shardMatcher.MatchesLabels(completeLabelset) { + return b.Recv() + } + + if b.skipChunks { + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), + }), nil + } + + s := seriesEntry{lset: completeLabelset} + //entries := []seriesEntry{s} + + // Schedule loading chunks. + s.refs = make([]chunks.ChunkRef, 0, len(chks)) + s.chks = make([]storepb.AggrChunk, 0, len(chks)) + b.chunkr.reset() + for j, meta := range chks { + if err := b.chunkr.addLoad(meta.Ref, 0, j); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load")), nil + } + s.chks = append(s.chks, storepb.AggrChunk{ + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }) + s.refs = append(s.refs, meta.Ref) + } + + // Ensure sample limit through chunksLimiter if we return chunks. + if err := b.chkLimiter.Reserve(uint64(len(chks))); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit")), nil + } + if err := b.chunkr.load(b.ctx, []seriesEntry{s}, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks")), nil + } + + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(s.lset), + Chunks: s.chks, + }), nil } // blockSeries returns series matching given matchers, that have some data in given time range. @@ -827,7 +935,7 @@ func blockSeries( shardMatcher *storepb.ShardMatcher, emptyPostingsCount prometheus.Counter, calculateChunkHash bool, -) (storepb.SeriesSet, *queryStats, error) { +) (*blockSeriesClient, *queryStats, error) { ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter) if err != nil { return nil, nil, errors.Wrap(err, "expanded matching posting") @@ -835,7 +943,7 @@ func blockSeries( if len(ps) == 0 { emptyPostingsCount.Inc() - return storepb.EmptySeriesSet(), indexr.stats, nil + return emptyBlockSeriesClient(), indexr.stats, nil } // Reserve series seriesLimiter @@ -850,72 +958,7 @@ func blockSeries( return nil, nil, errors.Wrap(err, "preload series") } - // Transform all series into the response types and mark their relevant chunks - // for preloading. - var ( - res []seriesEntry - symbolizedLset []symbolizedLabel - lset labels.Labels - chks []chunks.Meta - ) - - for _, id := range ps { - ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime) - if err != nil { - return nil, nil, errors.Wrap(err, "read series") - } - if !ok { - // No matching chunks for this time duration, skip series. - continue - } - - if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil { - return nil, nil, errors.Wrap(err, "Lookup labels symbols") - } - - completeLabelset := labelpb.ExtendSortedLabels(lset, extLset) - if !shardMatcher.MatchesLabels(completeLabelset) { - continue - } - - s := seriesEntry{} - s.lset = completeLabelset - - if !skipChunks { - // Schedule loading chunks. - s.refs = make([]chunks.ChunkRef, 0, len(chks)) - s.chks = make([]storepb.AggrChunk, 0, len(chks)) - for j, meta := range chks { - // seriesEntry s is appended to res, but not at every outer loop iteration, - // therefore len(res) is the index we need here, not outer loop iteration number. - if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil { - return nil, nil, errors.Wrap(err, "add chunk load") - } - s.chks = append(s.chks, storepb.AggrChunk{ - MinTime: meta.MinTime, - MaxTime: meta.MaxTime, - }) - s.refs = append(s.refs, meta.Ref) - } - - // Ensure sample limit through chunksLimiter if we return chunks. - if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil { - return nil, nil, errors.Wrap(err, "exceeded chunks limit") - } - } - - res = append(res, s) - } - - if skipChunks { - return newBucketSeriesSet(res), indexr.stats, nil - } - - if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash, bytesLimiter); err != nil { - return nil, nil, errors.Wrap(err, "load chunks") - } - - return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil + return newBlockSeriesClient(ctx, extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), indexr.stats, nil } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { @@ -1062,7 +1105,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} - res []storepb.SeriesSet + res []respSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1125,9 +1168,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie }) defer span.Finish() + ctx, cancel := context.WithCancel(ctx) + shardMatcher := req.ShardInfo.Matcher(&s.buffers) defer shardMatcher.Close() - part, pstats, err := blockSeries( + blockClient, pstats, err := blockSeries( newCtx, b.extLset, indexr, @@ -1144,8 +1189,21 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.enableChunkHashCalculation, ) if err != nil { + defer cancel() return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) } + part := newLazyRespSet( + ctx, + span, + 10*time.Minute, + "object-store-block", + []labels.Labels{b.extLset}, + cancel, + blockClient, + shardMatcher, + true, + b.metrics.emptyStreamResponses, + ) mtx.Lock() res = append(res, part) @@ -1155,6 +1213,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // No info about samples exactly, so pass at least chunks. span.SetTag("processed.series", len(indexr.loadedSeries)) span.SetTag("processed.chunks", pstats.chunksFetched) + return nil }) } @@ -1209,37 +1268,29 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) } + // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { begin := time.Now() - - // NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by - // blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later. - set := storepb.MergeSeriesSets(res...) + set := NewDedupResponseHeap(NewProxyResponseHeap(res...)) for set.Next() { var series storepb.Series stats.mergedSeriesCount++ - var lset labels.Labels if req.SkipChunks { - lset, _ = set.At() - } else { - lset, series.Chunks = set.At() - stats.mergedChunksCount += len(series.Chunks) s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) } - series.Labels = labelpb.ZLabelsFromPromLabels(lset) - if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil { + at := set.At() + if at == nil { + continue + } + if err = srv.Send(at); err != nil { err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) return } } - if set.Err() != nil { - err = status.Error(codes.Unknown, errors.Wrap(set.Err(), "expand series set").Error()) - return - } stats.MergeDuration = time.Since(begin) s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds()) @@ -1342,7 +1393,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } // Add a set for the external labels as well. - // We're not adding them directly to res because there could be duplicates. + // We're not adding them directly to refs because there could be duplicates. // b.extLset is already sorted by label name, no need to sort it again. extRes := make([]string, 0, len(b.extLset)) for _, l := range b.extLset { @@ -1376,15 +1427,25 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq // Note that label names will already include external labels (passed to blockSeries), so we don't need // to add them again. labelNames := map[string]struct{}{} - for seriesSet.Next() { - ls, _ := seriesSet.At() - for _, l := range ls { + for { + ls, err := seriesSet.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "iterate series for block %s", b.meta.ULID) + } + + if ls.GetWarning() != "" { + return errors.Wrapf(errors.New(ls.GetWarning()), "iterate series for block %s", b.meta.ULID) + } + if ls.GetSeries() == nil { + continue + } + for _, l := range ls.GetSeries().Labels { labelNames[l.Name] = struct{}{} } } - if seriesSet.Err() != nil { - return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) - } result = make([]string, 0, len(labelNames)) for n := range labelNames { @@ -1546,16 +1607,27 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR // Extract given label's value from all series and deduplicate them. // We don't need to deal with external labels, since they are already added by blockSeries. values := map[string]struct{}{} - for seriesSet.Next() { - ls, _ := seriesSet.At() - val := ls.Get(req.Label) + for { + ls, err := seriesSet.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "iterate series for block %s", b.meta.ULID) + } + + if ls.GetWarning() != "" { + return errors.Wrapf(errors.New(ls.GetWarning()), "iterate series for block %s", b.meta.ULID) + } + if ls.GetSeries() == nil { + continue + } + + val := labelpb.ZLabelsToPromLabels(ls.GetSeries().Labels).Get(req.Label) if val != "" { // Should never be empty since we added labelName!="" matcher to the list of matchers. values[val] = struct{}{} } } - if seriesSet.Err() != nil { - return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) - } result = make([]string, 0, len(values)) for n := range values { @@ -2610,6 +2682,10 @@ func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { } } +func (r *bucketChunkReader) reset() { + r.toLoad = make([][]loadIdx, len(r.block.chunkObjs)) +} + func (r *bucketChunkReader) Close() error { r.block.pendingReaders.Done() @@ -2620,7 +2696,7 @@ func (r *bucketChunkReader) Close() error { } // addLoad adds the chunk with id to the data set to be fetched. -// Chunk will be fetched and saved to res[seriesEntry][chunk] upon r.load(res, <...>) call. +// Chunk will be fetched and saved to refs[seriesEntry][chunk] upon r.load(refs, <...>) call. func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) error { var ( seq = int(id >> 32) @@ -2633,7 +2709,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) return nil } -// load loads all added chunks and saves resulting aggrs to res. +// load loads all added chunks and saves resulting aggrs to refs. func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a186b32376..bff4c4b6bb 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2451,7 +2451,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet testutil.Ok(b, err) // Ensure at least 1 series has been returned (as expected). - testutil.Equals(b, true, seriesSet.Next()) + _, err = seriesSet.Recv() + testutil.Ok(b, err) testutil.Ok(b, indexReader.Close()) testutil.Ok(b, chunkReader.Close()) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d354d4db06..a63e54dd24 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -260,11 +260,11 @@ func (h *ProxyResponseHeap) At() *storepb.SeriesResponse { } func (l *lazyRespSet) StoreID() string { - return l.st.String() + return l.storeName } func (l *lazyRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.st.LabelSets()) + return labelpb.PromLabelSetsToString(l.storeLabelSets) } // lazyRespSet is a lazy storepb.SeriesSet that buffers @@ -273,12 +273,13 @@ func (l *lazyRespSet) Labelset() string { // in Next(). type lazyRespSet struct { // Generic parameters. - span opentracing.Span - cl storepb.Store_SeriesClient - closeSeries context.CancelFunc - st Client - frameTimeout time.Duration - ctx context.Context + span opentracing.Span + cl storepb.Store_SeriesClient + closeSeries context.CancelFunc + storeName string + storeLabelSets []labels.Labels + frameTimeout time.Duration + ctx context.Context // Internal bookkeeping. dataOrFinishEvent *sync.Cond @@ -358,7 +359,8 @@ func newLazyRespSet( ctx context.Context, span opentracing.Span, frameTimeout time.Duration, - st Client, + storeName string, + storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, shardMatcher *storepb.ShardMatcher, @@ -373,7 +375,8 @@ func newLazyRespSet( respSet := &lazyRespSet{ frameTimeout: frameTimeout, cl: cl, - st: st, + storeName: storeName, + storeLabelSets: storeLabelSets, closeSeries: closeSeries, span: span, ctx: ctx, @@ -383,7 +386,7 @@ func newLazyRespSet( shardMatcher: shardMatcher, } - go func(st Client, l *lazyRespSet) { + go func(st string, l *lazyRespSet) { bytesProcessed := 0 seriesStats := &storepb.SeriesStatsCounter{} @@ -409,7 +412,7 @@ func newLazyRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st) l.span.SetTag("err", err.Error()) l.bufferedResponsesMtx.Lock() @@ -434,9 +437,9 @@ func newLazyRespSet( // Most likely the per-Recv timeout has been reached. // There's a small race between canceling and the Recv() // but this is most likely true. - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String()) + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) } else { - rerr = errors.Wrapf(err, "receive series from %s", st.String()) + rerr = errors.Wrapf(err, "receive series from %s", st) } l.span.SetTag("err", rerr.Error()) @@ -478,7 +481,7 @@ func newLazyRespSet( return } } - }(st, respSet) + }(storeName, respSet) return respSet } @@ -552,7 +555,8 @@ func newAsyncRespSet(ctx context.Context, seriesCtx, span, frameTimeout, - st, + st.String(), + st.LabelSets(), closeSeries, cl, shardMatcher, From ca126445037696caeae0523a90b5d8835f530ec0 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 28 Oct 2022 17:06:34 +0200 Subject: [PATCH 02/13] Add batching Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 155 +++++++++++++++++++++++++++++--------------- 1 file changed, 101 insertions(+), 54 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e0fc710482..4731ff9d26 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -790,6 +790,8 @@ type seriesEntry struct { chks []storepb.AggrChunk } +const batchSize = 1000 + // blockSeriesClient is a storepb.Store_SeriesClient for a // single TSDB block in object storage. type blockSeriesClient struct { @@ -803,17 +805,20 @@ type blockSeriesClient struct { maxt int64 indexr *bucketIndexReader chunkr *bucketChunkReader - chkLimiter ChunksLimiter + limiter ChunksLimiter bytesLimiter BytesLimiter loadAggregates []storepb.Aggr - skipChunks bool - shardMatcher *storepb.ShardMatcher + skipChunks bool + shardMatcher *storepb.ShardMatcher + calculateChunkHash bool // Transform all series into the response types and mark their relevant chunks // for preloading. symbolizedLset []symbolizedLabel + entries []seriesEntry + batch []*storepb.SeriesResponse } func emptyBlockSeriesClient() *blockSeriesClient { @@ -841,82 +846,122 @@ func newBlockSeriesClient( ctx: ctx, extLset: extLset, ps: ps, - i: -1, mint: minTime, maxt: maxTime, indexr: indexr, chunkr: chunkr, - chkLimiter: limiter, + limiter: limiter, bytesLimiter: bytesLimiter, skipChunks: skipChunks, loadAggregates: loadAggregates, shardMatcher: shardMatcher, calculateChunkHash: calculateChunkHash, + entries: make([]seriesEntry, 0, batchSize), } } func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { - b.i++ - if b.i >= len(b.ps) { - return nil, io.EOF + for len(b.batch) == 0 { + if done := b.nextBatch(); done { + return nil, io.EOF + } } - var chks []chunks.Meta - ok, err := b.indexr.LoadSeriesForTime(b.ps[b.i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) - if err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")), nil - } - if !ok { - return b.Recv() - } + next := b.batch[0] + b.batch = b.batch[1:] - var lset labels.Labels - if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols")), nil - } + return next, nil +} - completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) - if !b.shardMatcher.MatchesLabels(completeLabelset) { - return b.Recv() +func (b *blockSeriesClient) nextBatch() bool { + start := b.i + end := start + batchSize + if end > len(b.ps) { + end = len(b.ps) } + b.i = end - if b.skipChunks { - return storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), - }), nil + b.batch = b.batch[:0] + ps := b.ps[start:end] + if len(ps) == 0 { + return true } + if !b.skipChunks { + b.chunkr.reset() + } + b.entries = b.entries[:0] + for i := 0; i < len(ps); i++ { + var chks []chunks.Meta + ok, err := b.indexr.LoadSeriesForTime(ps[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) + if err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series"))) + continue + } + if !ok { + continue + } - s := seriesEntry{lset: completeLabelset} - //entries := []seriesEntry{s} + var lset labels.Labels + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols"))) + continue + } - // Schedule loading chunks. - s.refs = make([]chunks.ChunkRef, 0, len(chks)) - s.chks = make([]storepb.AggrChunk, 0, len(chks)) - b.chunkr.reset() - for j, meta := range chks { - if err := b.chunkr.addLoad(meta.Ref, 0, j); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load")), nil + completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) + if !b.shardMatcher.MatchesLabels(completeLabelset) { + continue } - s.chks = append(s.chks, storepb.AggrChunk{ - MinTime: meta.MinTime, - MaxTime: meta.MaxTime, - }) - s.refs = append(s.refs, meta.Ref) + + if b.skipChunks { + b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), + })) + continue + } + + s := seriesEntry{} + // Schedule loading chunks. + s.lset = completeLabelset + s.refs = make([]chunks.ChunkRef, 0, len(chks)) + s.chks = make([]storepb.AggrChunk, 0, len(chks)) + + for j, meta := range chks { + if err := b.chunkr.addLoad(meta.Ref, len(b.entries), j); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load"))) + return true + } + s.chks = append(s.chks, storepb.AggrChunk{ + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }) + s.refs = append(s.refs, meta.Ref) + } + + // Ensure sample limit through chunksLimiter if we return chunks. + if err := b.limiter.Reserve(uint64(len(chks))); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit"))) + return true + } + + b.entries = append(b.entries, s) } - // Ensure sample limit through chunksLimiter if we return chunks. - if err := b.chkLimiter.Reserve(uint64(len(chks))); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit")), nil + if !b.skipChunks { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks"))) + return true + } } - if err := b.chunkr.load(b.ctx, []seriesEntry{s}, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks")), nil + + for _, entry := range b.entries { + b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(entry.lset), + Chunks: entry.chks, + })) } - return storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(s.lset), - Chunks: s.chks, - }), nil + return false } // blockSeries returns series matching given matchers, that have some data in given time range. @@ -1200,8 +1245,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie []labels.Labels{b.extLset}, cancel, blockClient, - shardMatcher, - true, + nil, + false, b.metrics.emptyStreamResponses, ) @@ -1272,7 +1317,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { begin := time.Now() - set := NewDedupResponseHeap(NewProxyResponseHeap(res...)) + set := NewProxyResponseHeap(res...) for set.Next() { var series storepb.Series @@ -2683,7 +2728,9 @@ func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { } func (r *bucketChunkReader) reset() { - r.toLoad = make([][]loadIdx, len(r.block.chunkObjs)) + for i := range r.toLoad { + r.toLoad[i] = r.toLoad[i][:0] + } } func (r *bucketChunkReader) Close() error { From 1c5886be59001fabc8708c382e83535d67dea273 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 30 Oct 2022 06:40:52 +0100 Subject: [PATCH 03/13] Preload series in batches Signed-off-by: Filip Petkovski --- .gitignore | 3 ++ pkg/store/bucket.go | 75 +++++++++++++++++++++++++-------------------- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index e5068f1580..2035dfd2cb 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,9 @@ kube/.minikube data/ test/e2e/e2e_* +# Ignore benchmarks dir +benchmarks/ + # Ignore promu artifacts. /.build /.release diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4731ff9d26..d143bb4f67 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -790,7 +790,7 @@ type seriesEntry struct { chks []storepb.AggrChunk } -const batchSize = 1000 +const batchSize = 10000 // blockSeriesClient is a storepb.Store_SeriesClient for a // single TSDB block in object storage. @@ -816,19 +816,19 @@ type blockSeriesClient struct { // Transform all series into the response types and mark their relevant chunks // for preloading. - symbolizedLset []symbolizedLabel - entries []seriesEntry - batch []*storepb.SeriesResponse + symbolizedLset []symbolizedLabel + entries []seriesEntry + batch []*storepb.SeriesResponse + hasMorePostings bool } func emptyBlockSeriesClient() *blockSeriesClient { return &blockSeriesClient{ - ps: nil, + hasMorePostings: false, } } func newBlockSeriesClient( - ctx context.Context, extLset labels.Labels, ps []storage.SeriesRef, minTime int64, @@ -843,7 +843,7 @@ func newBlockSeriesClient( calculateChunkHash bool, ) *blockSeriesClient { return &blockSeriesClient{ - ctx: ctx, + ctx: context.Background(), extLset: extLset, ps: ps, mint: minTime, @@ -858,23 +858,28 @@ func newBlockSeriesClient( shardMatcher: shardMatcher, calculateChunkHash: calculateChunkHash, entries: make([]seriesEntry, 0, batchSize), + hasMorePostings: true, } } func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { - for len(b.batch) == 0 { - if done := b.nextBatch(); done { - return nil, io.EOF + for len(b.batch) == 0 && b.hasMorePostings { + if err := b.nextBatch(); err != nil { + return nil, err } } + if len(b.batch) == 0 { + return nil, io.EOF + } + next := b.batch[0] b.batch = b.batch[1:] return next, nil } -func (b *blockSeriesClient) nextBatch() bool { +func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + batchSize if end > len(b.ps) { @@ -885,12 +890,23 @@ func (b *blockSeriesClient) nextBatch() bool { b.batch = b.batch[:0] ps := b.ps[start:end] if len(ps) == 0 { - return true + b.hasMorePostings = false + return nil } + b.entries = b.entries[:0] + + b.indexr.reset() if !b.skipChunks { b.chunkr.reset() } - b.entries = b.entries[:0] + + // Preload all series index data. + // TODO(bwplotka): Consider not keeping all series in memory all the time. + // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. + if err := b.indexr.PreloadSeries(b.ctx, ps, b.bytesLimiter); err != nil { + return errors.Wrap(err, "preload series") + } + for i := 0; i < len(ps); i++ { var chks []chunks.Meta ok, err := b.indexr.LoadSeriesForTime(ps[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) @@ -904,8 +920,7 @@ func (b *blockSeriesClient) nextBatch() bool { var lset labels.Labels if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { - b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols"))) - continue + return errors.Wrap(err, "Lookup labels symbols") } completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) @@ -928,8 +943,7 @@ func (b *blockSeriesClient) nextBatch() bool { for j, meta := range chks { if err := b.chunkr.addLoad(meta.Ref, len(b.entries), j); err != nil { - b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load"))) - return true + return errors.Wrap(err, "add chunk load") } s.chks = append(s.chks, storepb.AggrChunk{ MinTime: meta.MinTime, @@ -940,8 +954,7 @@ func (b *blockSeriesClient) nextBatch() bool { // Ensure sample limit through chunksLimiter if we return chunks. if err := b.limiter.Reserve(uint64(len(chks))); err != nil { - b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit"))) - return true + return errors.Wrap(err, "exceeded chunks limit") } b.entries = append(b.entries, s) @@ -949,8 +962,7 @@ func (b *blockSeriesClient) nextBatch() bool { if !b.skipChunks { if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { - b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks"))) - return true + return errors.Wrap(err, "load chunks") } } @@ -961,7 +973,7 @@ func (b *blockSeriesClient) nextBatch() bool { })) } - return false + return nil } // blockSeries returns series matching given matchers, that have some data in given time range. @@ -996,14 +1008,7 @@ func blockSeries( return nil, nil, errors.Wrap(err, "exceeded series limit") } - // Preload all series index data. - // TODO(bwplotka): Consider not keeping all series in memory all the time. - // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. - if err := indexr.PreloadSeries(ctx, ps, bytesLimiter); err != nil { - return nil, nil, errors.Wrap(err, "preload series") - } - - return newBlockSeriesClient(ctx, extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), indexr.stats, nil + return newBlockSeriesClient(extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), indexr.stats, nil } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { @@ -1205,7 +1210,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer runutil.CloseWithLogOnErr(s.logger, indexr, "series block") g.Go(func() error { - span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ + span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ "block.id": b.meta.ULID, "block.mint": b.meta.MinTime, "block.maxt": b.meta.MaxTime, @@ -1216,9 +1221,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie ctx, cancel := context.WithCancel(ctx) shardMatcher := req.ShardInfo.Matcher(&s.buffers) - defer shardMatcher.Close() blockClient, pstats, err := blockSeries( - newCtx, + srv.Context(), b.extLset, indexr, chunkr, @@ -1245,7 +1249,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie []labels.Labels{b.extLset}, cancel, blockClient, - nil, + shardMatcher, false, b.metrics.emptyStreamResponses, ) @@ -2032,6 +2036,9 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } +func (r *bucketIndexReader) reset() { + r.loadedSeries = map[storage.SeriesRef][]byte{} +} // ExpandedPostings returns postings in expanded list instead of index.Postings. // This is because we need to have them buffered anyway to perform efficient lookup From f1e9538c56ae0a173f30ede022924a8f4ab26ccc Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 30 Oct 2022 13:38:54 +0100 Subject: [PATCH 04/13] Emit proper stats Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 51 +++++++++++++++++----------------------- pkg/store/bucket_test.go | 2 +- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d143bb4f67..76d1379492 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -992,23 +992,23 @@ func blockSeries( shardMatcher *storepb.ShardMatcher, emptyPostingsCount prometheus.Counter, calculateChunkHash bool, -) (*blockSeriesClient, *queryStats, error) { +) (*blockSeriesClient, error) { ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter) if err != nil { - return nil, nil, errors.Wrap(err, "expanded matching posting") + return nil, errors.Wrap(err, "expanded matching posting") } if len(ps) == 0 { emptyPostingsCount.Inc() - return emptyBlockSeriesClient(), indexr.stats, nil + return emptyBlockSeriesClient(), nil } // Reserve series seriesLimiter if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { - return nil, nil, errors.Wrap(err, "exceeded series limit") + return nil, errors.Wrap(err, "exceeded series limit") } - return newBlockSeriesClient(extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), indexr.stats, nil + return newBlockSeriesClient(extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), nil } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { @@ -1218,10 +1218,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie }) defer span.Finish() - ctx, cancel := context.WithCancel(ctx) - shardMatcher := req.ShardInfo.Matcher(&s.buffers) - blockClient, pstats, err := blockSeries( + blockClient, err := blockSeries( srv.Context(), b.extLset, indexr, @@ -1238,16 +1236,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.enableChunkHashCalculation, ) if err != nil { - defer cancel() return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) } part := newLazyRespSet( - ctx, + srv.Context(), span, 10*time.Minute, - "object-store-block", + b.meta.ULID.String(), []labels.Labels{b.extLset}, - cancel, + func() { + mtx.Lock() + stats = stats.merge(indexr.stats) + mtx.Unlock() + }, blockClient, shardMatcher, false, @@ -1256,13 +1257,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie mtx.Lock() res = append(res, part) - stats = stats.merge(pstats) mtx.Unlock() - // No info about samples exactly, so pass at least chunks. - span.SetTag("processed.series", len(indexr.loadedSeries)) - span.SetTag("processed.chunks", pstats.chunksFetched) - return nil }) } @@ -1323,17 +1319,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie begin := time.Now() set := NewProxyResponseHeap(res...) for set.Next() { - var series storepb.Series - - stats.mergedSeriesCount++ - - if req.SkipChunks { - stats.mergedChunksCount += len(series.Chunks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) - } at := set.At() - if at == nil { - continue + series := at.GetSeries() + if series != nil { + stats.mergedSeriesCount++ + if !req.SkipChunks { + stats.mergedChunksCount += len(series.Chunks) + s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + } } if err = srv.Send(at); err != nil { err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) @@ -1451,7 +1444,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq result = strutil.MergeSlices(res, extRes) } else { - seriesSet, _, err := blockSeries( + seriesSet, err := blockSeries( newCtx, b.extLset, indexr, @@ -1632,7 +1625,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } result = res } else { - seriesSet, _, err := blockSeries( + seriesSet, err := blockSeries( newCtx, b.extLset, indexr, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index bff4c4b6bb..c03c15705a 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2447,7 +2447,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet indexReader := blk.indexReader() chunkReader := blk.chunkReader() - seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, NewBytesLimiterFactory(0)(nil), req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) + seriesSet, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, NewBytesLimiterFactory(0)(nil), req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) testutil.Ok(b, err) // Ensure at least 1 series has been returned (as expected). From e1327dc108fa99f2a0ec91ebe1e0ba33132a0dd5 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 31 Oct 2022 07:51:34 +0100 Subject: [PATCH 05/13] Extract block series client Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 278 +++++++++++++++++---------------------- pkg/store/bucket_test.go | 13 +- 2 files changed, 127 insertions(+), 164 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 76d1379492..5bc798ca7f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -796,65 +796,60 @@ const batchSize = 10000 // single TSDB block in object storage. type blockSeriesClient struct { grpc.ClientStream - ctx context.Context - extLset labels.Labels + ctx context.Context + logger log.Logger + extLset labels.Labels + blockMatchers []*labels.Matcher - ps []storage.SeriesRef + postings []storage.SeriesRef i int mint int64 maxt int64 indexr *bucketIndexReader chunkr *bucketChunkReader + loadAggregates []storepb.Aggr limiter ChunksLimiter bytesLimiter BytesLimiter - loadAggregates []storepb.Aggr - - skipChunks bool - shardMatcher *storepb.ShardMatcher + skipChunks bool + shardMatcher *storepb.ShardMatcher calculateChunkHash bool - // Transform all series into the response types and mark their relevant chunks - // for preloading. + // Internal state symbolizedLset []symbolizedLabel entries []seriesEntry batch []*storepb.SeriesResponse hasMorePostings bool } -func emptyBlockSeriesClient() *blockSeriesClient { - return &blockSeriesClient{ - hasMorePostings: false, - } -} - func newBlockSeriesClient( - extLset labels.Labels, - ps []storage.SeriesRef, - minTime int64, - maxTime int64, - indexr *bucketIndexReader, - chunkr *bucketChunkReader, + ctx context.Context, + logger log.Logger, + b *bucketBlock, + req *storepb.SeriesRequest, limiter ChunksLimiter, bytesLimiter BytesLimiter, - skipChunks bool, - loadAggregates []storepb.Aggr, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, ) *blockSeriesClient { + var chunkr *bucketChunkReader + if !req.SkipChunks { + chunkr = b.chunkReader() + } + return &blockSeriesClient{ - ctx: context.Background(), - extLset: extLset, - ps: ps, - mint: minTime, - maxt: maxTime, - indexr: indexr, + ctx: ctx, + logger: logger, + extLset: b.extLset, + mint: req.MinTime, + maxt: req.MaxTime, + indexr: b.indexReader(), chunkr: chunkr, limiter: limiter, bytesLimiter: bytesLimiter, - skipChunks: skipChunks, + skipChunks: req.SkipChunks, - loadAggregates: loadAggregates, + loadAggregates: req.Aggregates, shardMatcher: shardMatcher, calculateChunkHash: calculateChunkHash, entries: make([]seriesEntry, 0, batchSize), @@ -862,6 +857,42 @@ func newBlockSeriesClient( } } +func (b *blockSeriesClient) Close() { + if !b.skipChunks { + runutil.CloseWithLogOnErr(b.logger, b.chunkr, "series block") + } + + runutil.CloseWithLogOnErr(b.logger, b.indexr, "series block") +} + +func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { + return stats.merge(b.indexr.stats) +} + +func (b *blockSeriesClient) ExpandPostings( + matchers []*labels.Matcher, + limiter SeriesLimiter, + emptyPostingsCount prometheus.Counter, +) error { + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) + if err != nil { + return errors.Wrap(err, "expanded matching posting") + } + + if len(ps) == 0 { + emptyPostingsCount.Inc() + return nil + } + + // Reserve series seriesLimiter + if err := limiter.Reserve(uint64(len(ps))); err != nil { + return errors.Wrap(err, "exceeded series limit") + } + + b.postings = ps + return nil +} + func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { for len(b.batch) == 0 && b.hasMorePostings { if err := b.nextBatch(); err != nil { @@ -882,34 +913,32 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + batchSize - if end > len(b.ps) { - end = len(b.ps) + if end > len(b.postings) { + end = len(b.postings) } b.i = end - b.batch = b.batch[:0] - ps := b.ps[start:end] - if len(ps) == 0 { + postingsBatch := b.postings[start:end] + if len(postingsBatch) == 0 { b.hasMorePostings = false return nil } + b.entries = b.entries[:0] + b.batch = b.batch[:0] b.indexr.reset() if !b.skipChunks { b.chunkr.reset() } - // Preload all series index data. - // TODO(bwplotka): Consider not keeping all series in memory all the time. - // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. - if err := b.indexr.PreloadSeries(b.ctx, ps, b.bytesLimiter); err != nil { + if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter); err != nil { return errors.Wrap(err, "preload series") } - for i := 0; i < len(ps); i++ { + for i := 0; i < len(postingsBatch); i++ { var chks []chunks.Meta - ok, err := b.indexr.LoadSeriesForTime(ps[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) + ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) if err != nil { b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series"))) continue @@ -976,41 +1005,6 @@ func (b *blockSeriesClient) nextBatch() error { return nil } -// blockSeries returns series matching given matchers, that have some data in given time range. -func blockSeries( - ctx context.Context, - extLset labels.Labels, - indexr *bucketIndexReader, - chunkr *bucketChunkReader, - matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, - seriesLimiter SeriesLimiter, - bytesLimiter BytesLimiter, // Rate limiter for used bytes. - skipChunks bool, - minTime, maxTime int64, - loadAggregates []storepb.Aggr, - shardMatcher *storepb.ShardMatcher, - emptyPostingsCount prometheus.Counter, - calculateChunkHash bool, -) (*blockSeriesClient, error) { - ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter) - if err != nil { - return nil, errors.Wrap(err, "expanded matching posting") - } - - if len(ps) == 0 { - emptyPostingsCount.Inc() - return emptyBlockSeriesClient(), nil - } - - // Reserve series seriesLimiter - if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { - return nil, errors.Wrap(err, "exceeded series limit") - } - - return newBlockSeriesClient(extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), nil -} - func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { hasher := hashPool.Get().(hash.Hash64) defer hashPool.Put(hasher) @@ -1190,69 +1184,55 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } for _, b := range blocks { - b := b + blk := b gctx := gctx if s.enableSeriesResponseHints { // Keep track of queried blocks. - resHints.AddQueriedBlock(b.meta.ULID) - } - - var chunkr *bucketChunkReader - // We must keep the readers open until all their data has been sent. - indexr := b.indexReader() - if !req.SkipChunks { - chunkr = b.chunkReader() - defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") + resHints.AddQueriedBlock(blk.meta.ULID) } - // Defer all closes to the end of Series method. - defer runutil.CloseWithLogOnErr(s.logger, indexr, "series block") + shardMatcher := req.ShardInfo.Matcher(&s.buffers) + blockClient := newBlockSeriesClient( + srv.Context(), + s.logger, + blk, + req, + chunksLimiter, + bytesLimiter, + shardMatcher, + s.enableChunkHashCalculation, + ) + defer blockClient.Close() g.Go(func() error { span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ - "block.id": b.meta.ULID, - "block.mint": b.meta.MinTime, - "block.maxt": b.meta.MaxTime, - "block.resolution": b.meta.Thanos.Downsample.Resolution, + "block.id": blk.meta.ULID, + "block.mint": blk.meta.MinTime, + "block.maxt": blk.meta.MaxTime, + "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - defer span.Finish() - shardMatcher := req.ShardInfo.Matcher(&s.buffers) - blockClient, err := blockSeries( - srv.Context(), - b.extLset, - indexr, - chunkr, - blockMatchers, - chunksLimiter, - seriesLimiter, - bytesLimiter, - req.SkipChunks, - req.MinTime, req.MaxTime, - req.Aggregates, - shardMatcher, - s.metrics.emptyPostingCount, - s.enableChunkHashCalculation, - ) - if err != nil { - return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) + if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter, s.metrics.emptyPostingCount); err != nil { + span.Finish() + return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) + } + onClose := func() { + mtx.Lock() + stats = blockClient.MergeStats(stats) + mtx.Unlock() } part := newLazyRespSet( srv.Context(), span, 10*time.Minute, - b.meta.ULID.String(), - []labels.Labels{b.extLset}, - func() { - mtx.Lock() - stats = stats.merge(indexr.stats) - mtx.Unlock() - }, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, blockClient, shardMatcher, false, - b.metrics.emptyStreamResponses, + blk.metrics.emptyStreamResponses, ) mtx.Lock() @@ -1444,25 +1424,19 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq result = strutil.MergeSlices(res, extRes) } else { - seriesSet, err := blockSeries( - newCtx, - b.extLset, - indexr, - nil, + seriesReq := &storepb.SeriesRequest{ + MinTime: req.Start, + MaxTime: req.End, + SkipChunks: true, + } + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true) + + if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, - nil, seriesLimiter, - bytesLimiter, - true, - req.Start, - req.End, - nil, - nil, s.metrics.emptyPostingCount, - false, - ) - if err != nil { - return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) + ); err != nil { + return err } // Extract label names from all series. Many label names will be the same, so we need to deduplicate them. @@ -1470,7 +1444,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq // to add them again. labelNames := map[string]struct{}{} for { - ls, err := seriesSet.Recv() + ls, err := blockClient.Recv() if err == io.EOF { break } @@ -1625,32 +1599,26 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } result = res } else { - seriesSet, err := blockSeries( - newCtx, - b.extLset, - indexr, - nil, + seriesReq := &storepb.SeriesRequest{ + MinTime: req.Start, + MaxTime: req.End, + SkipChunks: true, + } + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true) + + if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, - nil, seriesLimiter, - bytesLimiter, - true, - req.Start, - req.End, - nil, - nil, s.metrics.emptyPostingCount, - false, - ) - if err != nil { - return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) + ); err != nil { + return err } // Extract given label's value from all series and deduplicate them. // We don't need to deal with external labels, since they are already added by blockSeries. values := map[string]struct{}{} for { - ls, err := seriesSet.Recv() + ls, err := blockClient.Recv() if err == io.EOF { break } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c03c15705a..4efebf6e14 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2444,18 +2444,13 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - indexReader := blk.indexReader() - chunkReader := blk.chunkReader() - - seriesSet, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, NewBytesLimiterFactory(0)(nil), req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) - testutil.Ok(b, err) + blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false) + testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, dummyCounter)) + defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). - _, err = seriesSet.Recv() + _, err = blockClient.Recv() testutil.Ok(b, err) - - testutil.Ok(b, indexReader.Close()) - testutil.Ok(b, chunkReader.Close()) } }() } From 6a8b06b15336f06733cac0d5f7d2776f33592b41 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 31 Oct 2022 08:39:36 +0100 Subject: [PATCH 06/13] Fix CI Signed-off-by: Filip Petkovski --- cmd/thanos/store.go | 5 +++ pkg/store/bucket.go | 90 ++++++++++++++++++++++++------------- pkg/store/bucket_test.go | 2 +- pkg/store/storepb/custom.go | 11 +++++ 4 files changed, 77 insertions(+), 31 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 1f5d638aea..d0ddd4faf8 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -56,6 +56,7 @@ type storeConfig struct { httpConfig httpConfig indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes + seriesBatchSize int maxSampleCount uint64 maxTouchedSeriesCount uint64 maxDownloadedBytes units.Base2Bytes @@ -129,6 +130,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&sc.blockMetaFetchConcurrency) + cmd.Flag("debug.series-batch-size", "The batch size when fetching series from object storage."). + Hidden().Default("10000").IntVar(&sc.seriesBatchSize) + sc.filterConf = &store.FilterConfig{} cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). @@ -340,6 +344,7 @@ func runStore( store.WithChunkPool(chunkPool), store.WithFilterConfig(conf.filterConf), store.WithChunkHashCalculation(true), + store.WithSeriesBatchSize(conf.seriesBatchSize), } if conf.debugLogging { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5bc798ca7f..3b42ea1d06 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -103,6 +103,9 @@ const ( minBlockSyncConcurrency = 1 enableChunkHashCalculation = true + + // The default batch size when fetching series from object storage. + defaultSeriesBatchSize = 10000 ) var ( @@ -309,6 +312,7 @@ type BucketStore struct { indexReaderPool *indexheader.ReaderPool buffers sync.Pool chunkPool pool.Bytes + seriesBatchSize int // Sets of blocks that have the same labels. They are indexed by a hash over their label set. mtx sync.RWMutex @@ -423,6 +427,12 @@ func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption } } +func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption { + return func(s *BucketStore) { + s.seriesBatchSize = seriesBatchSize + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -464,6 +474,7 @@ func NewBucketStore( postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: defaultSeriesBatchSize, } for _, option := range options { @@ -790,25 +801,20 @@ type seriesEntry struct { chks []storepb.AggrChunk } -const batchSize = 10000 - // blockSeriesClient is a storepb.Store_SeriesClient for a // single TSDB block in object storage. type blockSeriesClient struct { grpc.ClientStream - ctx context.Context - logger log.Logger - extLset labels.Labels - blockMatchers []*labels.Matcher + ctx context.Context + logger log.Logger + extLset labels.Labels - postings []storage.SeriesRef - i int mint int64 maxt int64 indexr *bucketIndexReader chunkr *bucketChunkReader loadAggregates []storepb.Aggr - limiter ChunksLimiter + chunksLimiter ChunksLimiter bytesLimiter BytesLimiter skipChunks bool @@ -816,6 +822,8 @@ type blockSeriesClient struct { calculateChunkHash bool // Internal state + i int + postings []storage.SeriesRef symbolizedLset []symbolizedLabel entries []seriesEntry batch []*storepb.SeriesResponse @@ -831,6 +839,7 @@ func newBlockSeriesClient( bytesLimiter BytesLimiter, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, + batchSize int, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -838,16 +847,16 @@ func newBlockSeriesClient( } return &blockSeriesClient{ - ctx: ctx, - logger: logger, - extLset: b.extLset, - mint: req.MinTime, - maxt: req.MaxTime, - indexr: b.indexReader(), - chunkr: chunkr, - limiter: limiter, - bytesLimiter: bytesLimiter, - skipChunks: req.SkipChunks, + ctx: ctx, + logger: logger, + extLset: b.extLset, + mint: req.MinTime, + maxt: req.MaxTime, + indexr: b.indexReader(), + chunkr: chunkr, + chunksLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: req.SkipChunks, loadAggregates: req.Aggregates, shardMatcher: shardMatcher, @@ -866,7 +875,11 @@ func (b *blockSeriesClient) Close() { } func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { - return stats.merge(b.indexr.stats) + stats = stats.merge(b.indexr.stats) + if !b.skipChunks { + stats = stats.merge(b.chunkr.stats) + } + return stats } func (b *blockSeriesClient) ExpandPostings( @@ -912,7 +925,7 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i - end := start + batchSize + end := start + defaultSeriesBatchSize if end > len(b.postings) { end = len(b.postings) } @@ -940,8 +953,7 @@ func (b *blockSeriesClient) nextBatch() error { var chks []chunks.Meta ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) if err != nil { - b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series"))) - continue + return errors.Wrap(err, "read series") } if !ok { continue @@ -982,7 +994,7 @@ func (b *blockSeriesClient) nextBatch() error { } // Ensure sample limit through chunksLimiter if we return chunks. - if err := b.limiter.Reserve(uint64(len(chks))); err != nil { + if err := b.chunksLimiter.Reserve(uint64(len(chks))); err != nil { return errors.Wrap(err, "exceeded chunks limit") } @@ -1149,7 +1161,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} - res []respSet + respSets []respSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1202,6 +1214,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter, shardMatcher, s.enableChunkHashCalculation, + s.seriesBatchSize, ) defer blockClient.Close() @@ -1236,7 +1249,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie ) mtx.Lock() - res = append(res, part) + respSets = append(respSets, part) mtx.Unlock() return nil @@ -1288,7 +1301,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } return status.Error(code, err.Error()) } - stats.blocksQueried = len(res) + stats.blocksQueried = len(respSets) stats.GetAllDuration = time.Since(begin) s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) @@ -1296,10 +1309,24 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { + defer func() { + for _, resp := range respSets { + resp.Close() + } + }() begin := time.Now() - set := NewProxyResponseHeap(res...) + set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...)) for set.Next() { at := set.At() + warn := at.GetWarning() + if warn != "" { + // TODO(fpetkovski): Consider deprecating string based warnings in favor of a + // separate protobuf message containing the grpc code and + // a human readable error message. + err = status.Error(storepb.GRPCCodeFromWarn(warn), at.GetWarning()) + return + } + series := at.GetSeries() if series != nil { stats.mergedSeriesCount++ @@ -1318,6 +1345,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie err = nil }) + if err != nil { + return err + } if s.enableSeriesResponseHints { var anyHints *types.Any @@ -1429,7 +1459,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, @@ -1604,7 +1634,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 4efebf6e14..3e42539cc6 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2444,7 +2444,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false) + blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, defaultSeriesBatchSize) testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, dummyCounter)) defer blockClient.Close() diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index eaa96f1ede..c1f4b9b8bf 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "google.golang.org/grpc/codes" "github.com/thanos-io/thanos/pkg/store/labelpb" ) @@ -51,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { } } +func GRPCCodeFromWarn(warn string) codes.Code { + if strings.Contains(warn, "rpc error: code = ResourceExhausted") { + return codes.ResourceExhausted + } + if strings.Contains(warn, "rpc error: code = Code(422)") { + return 422 + } + return codes.Unknown +} + type emptySeriesSet struct{} func (emptySeriesSet) Next() bool { return false } From 89d237fd5c5c5683b7a8fa80602f0a6d83266039 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 1 Nov 2022 06:53:15 +0100 Subject: [PATCH 07/13] Address review comments Signed-off-by: Filip Petkovski --- .gitignore | 2 +- cmd/thanos/store.go | 5 +++-- pkg/store/bucket.go | 23 +++++++++++------------ pkg/store/bucket_test.go | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 2035dfd2cb..b11d066506 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,7 @@ kube/.minikube data/ test/e2e/e2e_* -# Ignore benchmarks dir +# Ignore benchmarks dir. benchmarks/ # Ignore promu artifacts. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index d0ddd4faf8..30df09ba5e 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -6,6 +6,7 @@ package main import ( "context" "fmt" + "strconv" "time" "github.com/alecthomas/units" @@ -130,8 +131,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&sc.blockMetaFetchConcurrency) - cmd.Flag("debug.series-batch-size", "The batch size when fetching series from object storage."). - Hidden().Default("10000").IntVar(&sc.seriesBatchSize) + cmd.Flag("debug.series-batch-size", "The batch size when fetching series from TSDB blocks. Setting the number too high can lead to slower retrieval, while setting it too low can lead to throttling caused by too many calls made to object storage."). + Hidden().Default(strconv.Itoa(store.SeriesBatchSize)).IntVar(&sc.seriesBatchSize) sc.filterConf = &store.FilterConfig{} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3b42ea1d06..ad75d54a09 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -104,8 +104,8 @@ const ( enableChunkHashCalculation = true - // The default batch size when fetching series from object storage. - defaultSeriesBatchSize = 10000 + // SeriesBatchSize is the default batch size when fetching series from object storage. + SeriesBatchSize = 10000 ) var ( @@ -283,8 +283,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }) m.emptyStreamResponses = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_empty_stream_responses_total", - Help: "Total number of empty responses received.", + Name: "thanos_bucket_store_empty_blocks_total", + Help: "Total number of queried blocks that have no matching series.", }) return &m @@ -474,7 +474,7 @@ func NewBucketStore( postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, - seriesBatchSize: defaultSeriesBatchSize, + seriesBatchSize: SeriesBatchSize, } for _, option := range options { @@ -821,7 +821,7 @@ type blockSeriesClient struct { shardMatcher *storepb.ShardMatcher calculateChunkHash bool - // Internal state + // Internal state. i int postings []storage.SeriesRef symbolizedLset []symbolizedLabel @@ -884,7 +884,7 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { func (b *blockSeriesClient) ExpandPostings( matchers []*labels.Matcher, - limiter SeriesLimiter, + seriesLimiter SeriesLimiter, emptyPostingsCount prometheus.Counter, ) error { ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) @@ -897,8 +897,7 @@ func (b *blockSeriesClient) ExpandPostings( return nil } - // Reserve series seriesLimiter - if err := limiter.Reserve(uint64(len(ps))); err != nil { + if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { return errors.Wrap(err, "exceeded series limit") } @@ -925,7 +924,7 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i - end := start + defaultSeriesBatchSize + end := start + SeriesBatchSize if end > len(b.postings) { end = len(b.postings) } @@ -1459,7 +1458,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, @@ -1634,7 +1633,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 3e42539cc6..144eeb31e9 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2444,7 +2444,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, defaultSeriesBatchSize) + blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, SeriesBatchSize) testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, dummyCounter)) defer blockClient.Close() From 774016a0f5db7e80ae63c9ece5a2b2ca9e92a0d4 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 4 Nov 2022 09:13:02 +0100 Subject: [PATCH 08/13] Use emptyPostingsCount in lazyRespSet Signed-off-by: Filip Petkovski --- cmd/thanos/query.go | 2 ++ pkg/store/bucket.go | 14 ++------------ pkg/store/bucket_test.go | 8 +------- pkg/store/proxy_heap.go | 1 - 4 files changed, 5 insertions(+), 20 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4862440af1..d6dc2535e5 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -29,6 +29,7 @@ import ( v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/thanos-community/promql-engine/engine" + apiv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" @@ -97,6 +98,7 @@ func registerQuery(app *extkingpin.App) { queryTimeout := extkingpin.ModelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node."). Default("2m")) + promqlEngine := cmd.Flag("query.promql-engine", "PromQL engine to use.").Default(string(promqlEnginePrometheus)).Hidden(). Enum(string(promqlEnginePrometheus), string(promqlEngineThanos)) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ad75d54a09..8f2788a350 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -133,7 +133,6 @@ type bucketStoreMetrics struct { queriesDropped *prometheus.CounterVec seriesRefetches prometheus.Counter emptyPostingCount prometheus.Counter - emptyStreamResponses prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -282,11 +281,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of empty postings when fetching block series.", }) - m.emptyStreamResponses = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_empty_blocks_total", - Help: "Total number of queried blocks that have no matching series.", - }) - return &m } @@ -885,7 +879,6 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { func (b *blockSeriesClient) ExpandPostings( matchers []*labels.Matcher, seriesLimiter SeriesLimiter, - emptyPostingsCount prometheus.Counter, ) error { ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) if err != nil { @@ -893,7 +886,6 @@ func (b *blockSeriesClient) ExpandPostings( } if len(ps) == 0 { - emptyPostingsCount.Inc() return nil } @@ -1225,7 +1217,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter, s.metrics.emptyPostingCount); err != nil { + if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil { span.Finish() return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) } @@ -1244,7 +1236,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie blockClient, shardMatcher, false, - blk.metrics.emptyStreamResponses, + s.metrics.emptyPostingCount, ) mtx.Lock() @@ -1463,7 +1455,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, seriesLimiter, - s.metrics.emptyPostingCount, ); err != nil { return err } @@ -1638,7 +1629,6 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, seriesLimiter, - s.metrics.emptyPostingCount, ); err != nil { return err } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 144eeb31e9..72c0c01a89 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -31,8 +31,6 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -2409,10 +2407,6 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // No limits. chunksLimiter := NewChunksLimiterFactory(0)(nil) seriesLimiter := NewSeriesLimiterFactory(0)(nil) - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{ - Name: "dummy", - Help: "dummy help", - }) ctx := context.Background() // Run multiple workers to execute the queries. @@ -2445,7 +2439,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet testutil.Ok(b, err) blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, SeriesBatchSize) - testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, dummyCounter)) + testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index a63e54dd24..5cdb5a0b78 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -366,7 +366,6 @@ func newLazyRespSet( shardMatcher *storepb.ShardMatcher, applySharding bool, emptyStreamResponses prometheus.Counter, - ) respSet { bufferedResponses := []*storepb.SeriesResponse{} bufferedResponsesMtx := &sync.Mutex{} From 30143bcb85874d3ab525fa3db8f5e728dfaca7d9 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sat, 5 Nov 2022 17:43:04 +0100 Subject: [PATCH 09/13] Reuse chunk metas Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8f2788a350..14feb35231 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -818,6 +818,7 @@ type blockSeriesClient struct { // Internal state. i int postings []storage.SeriesRef + chkMetas []chunks.Meta symbolizedLset []symbolizedLabel entries []seriesEntry batch []*storepb.SeriesResponse @@ -941,8 +942,7 @@ func (b *blockSeriesClient) nextBatch() error { } for i := 0; i < len(postingsBatch); i++ { - var chks []chunks.Meta - ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) + ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt) if err != nil { return errors.Wrap(err, "read series") } @@ -970,10 +970,10 @@ func (b *blockSeriesClient) nextBatch() error { s := seriesEntry{} // Schedule loading chunks. s.lset = completeLabelset - s.refs = make([]chunks.ChunkRef, 0, len(chks)) - s.chks = make([]storepb.AggrChunk, 0, len(chks)) + s.refs = make([]chunks.ChunkRef, 0, len(b.chkMetas)) + s.chks = make([]storepb.AggrChunk, 0, len(b.chkMetas)) - for j, meta := range chks { + for j, meta := range b.chkMetas { if err := b.chunkr.addLoad(meta.Ref, len(b.entries), j); err != nil { return errors.Wrap(err, "add chunk load") } @@ -985,7 +985,7 @@ func (b *blockSeriesClient) nextBatch() error { } // Ensure sample limit through chunksLimiter if we return chunks. - if err := b.chunksLimiter.Reserve(uint64(len(chks))); err != nil { + if err := b.chunksLimiter.Reserve(uint64(len(b.chkMetas))); err != nil { return errors.Wrap(err, "exceeded chunks limit") } From f2c4eccf1fe129d7ae74be3b4e91aec8dc82cc8a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 6 Nov 2022 08:38:19 +0100 Subject: [PATCH 10/13] Avoid overallocating for small responses Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 45 ++++++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 14feb35231..907dd56a8c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -819,10 +819,11 @@ type blockSeriesClient struct { i int postings []storage.SeriesRef chkMetas []chunks.Meta + lset labels.Labels symbolizedLset []symbolizedLabel entries []seriesEntry - batch []*storepb.SeriesResponse hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -856,8 +857,8 @@ func newBlockSeriesClient( loadAggregates: req.Aggregates, shardMatcher: shardMatcher, calculateChunkHash: calculateChunkHash, - entries: make([]seriesEntry, 0, batchSize), hasMorePostings: true, + batchSize: batchSize, } } @@ -895,24 +896,31 @@ func (b *blockSeriesClient) ExpandPostings( } b.postings = ps + if b.batchSize > len(ps) { + b.batchSize = len(ps) + } + b.entries = make([]seriesEntry, 0, b.batchSize) return nil } func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { - for len(b.batch) == 0 && b.hasMorePostings { + for len(b.entries) == 0 && b.hasMorePostings { if err := b.nextBatch(); err != nil { return nil, err } } - if len(b.batch) == 0 { + if len(b.entries) == 0 { return nil, io.EOF } - next := b.batch[0] - b.batch = b.batch[1:] + next := b.entries[0] + b.entries = b.entries[1:] - return next, nil + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(next.lset), + Chunks: next.chks, + }), nil } func (b *blockSeriesClient) nextBatch() error { @@ -929,9 +937,6 @@ func (b *blockSeriesClient) nextBatch() error { return nil } - b.entries = b.entries[:0] - b.batch = b.batch[:0] - b.indexr.reset() if !b.skipChunks { b.chunkr.reset() @@ -941,6 +946,7 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "preload series") } + b.entries = b.entries[:0] for i := 0; i < len(postingsBatch); i++ { ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt) if err != nil { @@ -950,26 +956,22 @@ func (b *blockSeriesClient) nextBatch() error { continue } - var lset labels.Labels - if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &b.lset); err != nil { return errors.Wrap(err, "Lookup labels symbols") } - completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if !b.shardMatcher.MatchesLabels(completeLabelset) { continue } + s := seriesEntry{lset: completeLabelset} if b.skipChunks { - b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), - })) + b.entries = append(b.entries, s) continue } - s := seriesEntry{} // Schedule loading chunks. - s.lset = completeLabelset s.refs = make([]chunks.ChunkRef, 0, len(b.chkMetas)) s.chks = make([]storepb.AggrChunk, 0, len(b.chkMetas)) @@ -998,13 +1000,6 @@ func (b *blockSeriesClient) nextBatch() error { } } - for _, entry := range b.entries { - b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(entry.lset), - Chunks: entry.chks, - })) - } - return nil } From 8946647b1f4928cc588d0dd25cb2082c0eee7269 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sat, 12 Nov 2022 08:31:35 +0100 Subject: [PATCH 11/13] Add metric for chunk fetch time Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 38 ++++++++++++++++++++++++++------------ pkg/store/bucket_test.go | 4 +++- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 907dd56a8c..e2c68576ce 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -142,6 +142,7 @@ type bucketStoreMetrics struct { seriesFetchDuration prometheus.Histogram postingsFetchDuration prometheus.Histogram + chunkFetchDuration prometheus.Histogram } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -276,6 +277,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }) + m.chunkFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_chunks_fetch_duration_seconds", + Help: "The total time spent fetching chunks within a single request a store gateway.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + m.emptyPostingCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_empty_postings_total", Help: "Total number of empty postings when fetching block series.", @@ -814,6 +821,7 @@ type blockSeriesClient struct { skipChunks bool shardMatcher *storepb.ShardMatcher calculateChunkHash bool + chunkFetchDuration prometheus.Histogram // Internal state. i int @@ -836,6 +844,7 @@ func newBlockSeriesClient( shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, + chunkFetchDuration prometheus.Histogram, ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { @@ -843,16 +852,17 @@ func newBlockSeriesClient( } return &blockSeriesClient{ - ctx: ctx, - logger: logger, - extLset: b.extLset, - mint: req.MinTime, - maxt: req.MaxTime, - indexr: b.indexReader(), - chunkr: chunkr, - chunksLimiter: limiter, - bytesLimiter: bytesLimiter, - skipChunks: req.SkipChunks, + ctx: ctx, + logger: logger, + extLset: b.extLset, + mint: req.MinTime, + maxt: req.MaxTime, + indexr: b.indexReader(), + chunkr: chunkr, + chunksLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: req.SkipChunks, + chunkFetchDuration: chunkFetchDuration, loadAggregates: req.Aggregates, shardMatcher: shardMatcher, @@ -911,6 +921,9 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { } if len(b.entries) == 0 { + if b.chunkr != nil { + b.chunkFetchDuration.Observe(float64(b.chunkr.stats.ChunksFetchDurationSum)) + } return nil, io.EOF } @@ -1201,6 +1214,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, + s.metrics.chunkFetchDuration, ) defer blockClient.Close() @@ -1445,7 +1459,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, @@ -1619,7 +1633,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR MaxTime: req.End, SkipChunks: true, } - blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize) + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration) if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 72c0c01a89..aa05bd07ea 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cespare/xxhash" + "github.com/prometheus/client_golang/prometheus" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" @@ -2438,7 +2439,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, SeriesBatchSize) + dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{}) + blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, SeriesBatchSize, dummyHistogram) testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter)) defer blockClient.Close() From cbc3d2370d842a63bb206b0ebd8030c427e3ed36 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 13 Nov 2022 07:42:46 +0100 Subject: [PATCH 12/13] Regroup imports Signed-off-by: Filip Petkovski --- pkg/store/bucket_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index aa05bd07ea..f643222c57 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -23,8 +23,6 @@ import ( "time" "github.com/cespare/xxhash" - "github.com/prometheus/client_golang/prometheus" - "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -32,6 +30,8 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" + + "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" From 25ed544c552fa032cea2aa7843d2ea6586070a00 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 14 Nov 2022 16:22:43 +0100 Subject: [PATCH 13/13] Change counter to uint64 Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e2c68576ce..561d915edd 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -824,7 +824,7 @@ type blockSeriesClient struct { chunkFetchDuration prometheus.Histogram // Internal state. - i int + i uint64 postings []storage.SeriesRef chkMetas []chunks.Meta lset labels.Labels @@ -939,8 +939,8 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + SeriesBatchSize - if end > len(b.postings) { - end = len(b.postings) + if end > uint64(len(b.postings)) { + end = uint64(len(b.postings)) } b.i = end