Skip to content

Commit

Permalink
Add batching
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Oct 29, 2022
1 parent 2257389 commit 602ad6a
Showing 1 changed file with 101 additions and 54 deletions.
155 changes: 101 additions & 54 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,8 @@ type seriesEntry struct {
chks []storepb.AggrChunk
}

const batchSize = 1000

// blockSeriesClient is a storepb.Store_SeriesClient for a
// single TSDB block in object storage.
type blockSeriesClient struct {
Expand All @@ -803,17 +805,20 @@ type blockSeriesClient struct {
maxt int64
indexr *bucketIndexReader
chunkr *bucketChunkReader
chkLimiter ChunksLimiter
limiter ChunksLimiter
bytesLimiter BytesLimiter
loadAggregates []storepb.Aggr
skipChunks bool

shardMatcher *storepb.ShardMatcher
skipChunks bool
shardMatcher *storepb.ShardMatcher

calculateChunkHash bool

// Transform all series into the response types and mark their relevant chunks
// for preloading.
symbolizedLset []symbolizedLabel
entries []seriesEntry
batch []*storepb.SeriesResponse
}

func emptyBlockSeriesClient() *blockSeriesClient {
Expand Down Expand Up @@ -841,82 +846,122 @@ func newBlockSeriesClient(
ctx: ctx,
extLset: extLset,
ps: ps,
i: -1,
mint: minTime,
maxt: maxTime,
indexr: indexr,
chunkr: chunkr,
chkLimiter: limiter,
limiter: limiter,
bytesLimiter: bytesLimiter,
skipChunks: skipChunks,

loadAggregates: loadAggregates,
shardMatcher: shardMatcher,
calculateChunkHash: calculateChunkHash,
entries: make([]seriesEntry, 0, batchSize),
}
}

func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) {
b.i++
if b.i >= len(b.ps) {
return nil, io.EOF
for len(b.batch) == 0 {
if done := b.nextBatch(); done {
return nil, io.EOF
}
}

var chks []chunks.Meta
ok, err := b.indexr.LoadSeriesForTime(b.ps[b.i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt)
if err != nil {
return storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")), nil
}
if !ok {
return b.Recv()
}
next := b.batch[0]
b.batch = b.batch[1:]

var lset labels.Labels
if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil {
return storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols")), nil
}
return next, nil
}

completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset)
if !b.shardMatcher.MatchesLabels(completeLabelset) {
return b.Recv()
func (b *blockSeriesClient) nextBatch() bool {
start := b.i
end := start + batchSize
if end > len(b.ps) {
end = len(b.ps)
}
b.i = end

if b.skipChunks {
return storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(completeLabelset),
}), nil
b.batch = b.batch[:0]
ps := b.ps[start:end]
if len(ps) == 0 {
return true
}
if !b.skipChunks {
b.chunkr.reset()
}
b.entries = b.entries[:0]
for i := 0; i < len(ps); i++ {
var chks []chunks.Meta
ok, err := b.indexr.LoadSeriesForTime(ps[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt)
if err != nil {
b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")))
continue
}
if !ok {
continue
}

s := seriesEntry{lset: completeLabelset}
//entries := []seriesEntry{s}
var lset labels.Labels
if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil {
b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols")))
continue
}

// Schedule loading chunks.
s.refs = make([]chunks.ChunkRef, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
b.chunkr.reset()
for j, meta := range chks {
if err := b.chunkr.addLoad(meta.Ref, 0, j); err != nil {
return storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load")), nil
completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset)
if !b.shardMatcher.MatchesLabels(completeLabelset) {
continue
}
s.chks = append(s.chks, storepb.AggrChunk{
MinTime: meta.MinTime,
MaxTime: meta.MaxTime,
})
s.refs = append(s.refs, meta.Ref)

if b.skipChunks {
b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(completeLabelset),
}))
continue
}

s := seriesEntry{}
// Schedule loading chunks.
s.lset = completeLabelset
s.refs = make([]chunks.ChunkRef, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))

for j, meta := range chks {
if err := b.chunkr.addLoad(meta.Ref, len(b.entries), j); err != nil {
b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load")))
return true
}
s.chks = append(s.chks, storepb.AggrChunk{
MinTime: meta.MinTime,
MaxTime: meta.MaxTime,
})
s.refs = append(s.refs, meta.Ref)
}

// Ensure sample limit through chunksLimiter if we return chunks.
if err := b.limiter.Reserve(uint64(len(chks))); err != nil {
b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit")))
return true
}

b.entries = append(b.entries, s)
}

// Ensure sample limit through chunksLimiter if we return chunks.
if err := b.chkLimiter.Reserve(uint64(len(chks))); err != nil {
return storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit")), nil
if !b.skipChunks {
if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil {
b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks")))
return true
}
}
if err := b.chunkr.load(b.ctx, []seriesEntry{s}, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil {
return storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks")), nil

for _, entry := range b.entries {
b.batch = append(b.batch, storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(entry.lset),
Chunks: entry.chks,
}))
}

return storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(s.lset),
Chunks: s.chks,
}), nil
return false
}

// blockSeries returns series matching given matchers, that have some data in given time range.
Expand Down Expand Up @@ -1200,8 +1245,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
[]labels.Labels{b.extLset},
cancel,
blockClient,
shardMatcher,
true,
nil,
false,
b.metrics.emptyStreamResponses,
)

Expand Down Expand Up @@ -1272,7 +1317,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
set := NewDedupResponseHeap(NewProxyResponseHeap(res...))
set := NewProxyResponseHeap(res...)
for set.Next() {
var series storepb.Series

Expand Down Expand Up @@ -2683,7 +2728,9 @@ func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
}

func (r *bucketChunkReader) reset() {
r.toLoad = make([][]loadIdx, len(r.block.chunkObjs))
for i := range r.toLoad {
r.toLoad[i] = r.toLoad[i][:0]
}
}

func (r *bucketChunkReader) Close() error {
Expand Down

0 comments on commit 602ad6a

Please sign in to comment.