From 602ad6a18fb74d5f67a863b3375609c489e668b4 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 28 Oct 2022 17:06:34 +0200 Subject: [PATCH] Add batching Signed-off-by: Filip Petkovski --- pkg/store/bucket.go | 155 +++++++++++++++++++++++++++++--------------- 1 file changed, 101 insertions(+), 54 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e0fc7104821..4731ff9d26c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -790,6 +790,8 @@ type seriesEntry struct { chks []storepb.AggrChunk } +const batchSize = 1000 + // blockSeriesClient is a storepb.Store_SeriesClient for a // single TSDB block in object storage. type blockSeriesClient struct { @@ -803,17 +805,20 @@ type blockSeriesClient struct { maxt int64 indexr *bucketIndexReader chunkr *bucketChunkReader - chkLimiter ChunksLimiter + limiter ChunksLimiter bytesLimiter BytesLimiter loadAggregates []storepb.Aggr - skipChunks bool - shardMatcher *storepb.ShardMatcher + skipChunks bool + shardMatcher *storepb.ShardMatcher + calculateChunkHash bool // Transform all series into the response types and mark their relevant chunks // for preloading. symbolizedLset []symbolizedLabel + entries []seriesEntry + batch []*storepb.SeriesResponse } func emptyBlockSeriesClient() *blockSeriesClient { @@ -841,82 +846,122 @@ func newBlockSeriesClient( ctx: ctx, extLset: extLset, ps: ps, - i: -1, mint: minTime, maxt: maxTime, indexr: indexr, chunkr: chunkr, - chkLimiter: limiter, + limiter: limiter, bytesLimiter: bytesLimiter, skipChunks: skipChunks, loadAggregates: loadAggregates, shardMatcher: shardMatcher, calculateChunkHash: calculateChunkHash, + entries: make([]seriesEntry, 0, batchSize), } } func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { - b.i++ - if b.i >= len(b.ps) { - return nil, io.EOF + for len(b.batch) == 0 { + if done := b.nextBatch(); done { + return nil, io.EOF + } } - var chks []chunks.Meta - ok, err := b.indexr.LoadSeriesForTime(b.ps[b.i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) - if err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")), nil - } - if !ok { - return b.Recv() - } + next := b.batch[0] + b.batch = b.batch[1:] - var lset labels.Labels - if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols")), nil - } + return next, nil +} - completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) - if !b.shardMatcher.MatchesLabels(completeLabelset) { - return b.Recv() +func (b *blockSeriesClient) nextBatch() bool { + start := b.i + end := start + batchSize + if end > len(b.ps) { + end = len(b.ps) } + b.i = end - if b.skipChunks { - return storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), - }), nil + b.batch = b.batch[:0] + ps := b.ps[start:end] + if len(ps) == 0 { + return true } + if !b.skipChunks { + b.chunkr.reset() + } + b.entries = b.entries[:0] + for i := 0; i < len(ps); i++ { + var chks []chunks.Meta + ok, err := b.indexr.LoadSeriesForTime(ps[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) + if err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series"))) + continue + } + if !ok { + continue + } - s := seriesEntry{lset: completeLabelset} - //entries := []seriesEntry{s} + var lset labels.Labels + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols"))) + continue + } - // Schedule loading chunks. - s.refs = make([]chunks.ChunkRef, 0, len(chks)) - s.chks = make([]storepb.AggrChunk, 0, len(chks)) - b.chunkr.reset() - for j, meta := range chks { - if err := b.chunkr.addLoad(meta.Ref, 0, j); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load")), nil + completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) + if !b.shardMatcher.MatchesLabels(completeLabelset) { + continue } - s.chks = append(s.chks, storepb.AggrChunk{ - MinTime: meta.MinTime, - MaxTime: meta.MaxTime, - }) - s.refs = append(s.refs, meta.Ref) + + if b.skipChunks { + b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), + })) + continue + } + + s := seriesEntry{} + // Schedule loading chunks. + s.lset = completeLabelset + s.refs = make([]chunks.ChunkRef, 0, len(chks)) + s.chks = make([]storepb.AggrChunk, 0, len(chks)) + + for j, meta := range chks { + if err := b.chunkr.addLoad(meta.Ref, len(b.entries), j); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load"))) + return true + } + s.chks = append(s.chks, storepb.AggrChunk{ + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }) + s.refs = append(s.refs, meta.Ref) + } + + // Ensure sample limit through chunksLimiter if we return chunks. + if err := b.limiter.Reserve(uint64(len(chks))); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit"))) + return true + } + + b.entries = append(b.entries, s) } - // Ensure sample limit through chunksLimiter if we return chunks. - if err := b.chkLimiter.Reserve(uint64(len(chks))); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit")), nil + if !b.skipChunks { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks"))) + return true + } } - if err := b.chunkr.load(b.ctx, []seriesEntry{s}, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { - return storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks")), nil + + for _, entry := range b.entries { + b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(entry.lset), + Chunks: entry.chks, + })) } - return storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(s.lset), - Chunks: s.chks, - }), nil + return false } // blockSeries returns series matching given matchers, that have some data in given time range. @@ -1200,8 +1245,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie []labels.Labels{b.extLset}, cancel, blockClient, - shardMatcher, - true, + nil, + false, b.metrics.emptyStreamResponses, ) @@ -1272,7 +1317,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { begin := time.Now() - set := NewDedupResponseHeap(NewProxyResponseHeap(res...)) + set := NewProxyResponseHeap(res...) for set.Next() { var series storepb.Series @@ -2683,7 +2728,9 @@ func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { } func (r *bucketChunkReader) reset() { - r.toLoad = make([][]loadIdx, len(r.block.chunkObjs)) + for i := range r.toLoad { + r.toLoad[i] = r.toLoad[i][:0] + } } func (r *bucketChunkReader) Close() error {