Skip to content

Commit

Permalink
optimize store gateway bytes limiter reserve with type request
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 29, 2024
1 parent 6f03fcb commit 8c1c804
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3030,12 +3030,14 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

output := make([]index.Postings, len(keys))

var size int64
// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
}
size += int64(len(dataFromCache))
}
if err := bytesLimiter.ReserveWithType(uint64(size), PostingsTouched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
}

// Iterate over all groups and fetch posting from cache.
Expand Down Expand Up @@ -3086,13 +3088,14 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End)
})

size = 0
for _, part := range parts {
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.ReserveWithType(uint64(length), PostingsFetched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err)
}
size += length
}
if err := bytesLimiter.ReserveWithType(uint64(size), PostingsFetched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err)
}

g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -3263,11 +3266,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant)
var size uint64
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.ReserveWithType(uint64(len(b)), SeriesTouched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err)
}
size += uint64(len(b))
}
if err := bytesLimiter.ReserveWithType(size, SeriesTouched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err)
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
Expand Down Expand Up @@ -3598,10 +3603,12 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + uint64(r.block.estimatedMaxChunkSize)
})

var size uint64
for _, p := range parts {
if err := bytesLimiter.ReserveWithType(uint64(p.End-p.Start), ChunksFetched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
size += p.End - p.Start
}
if err := bytesLimiter.ReserveWithType(size, ChunksFetched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}

for _, p := range parts {
Expand Down

0 comments on commit 8c1c804

Please sign in to comment.