Skip to content

Commit

Permalink
add pool for expanded posting slice (#8035)
Browse files Browse the repository at this point in the history
* add pool for expanded posting slice

Signed-off-by: Ben Ye <[email protected]>

* check nil postings

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jan 5, 2025
1 parent 803556c commit 07734b9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
28 changes: 26 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/util/zeropool"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -125,8 +126,22 @@ const (
var (
errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.")
hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}
postingsPool zeropool.Pool[[]storage.SeriesRef]
)

func getPostingsSlice() []storage.SeriesRef {
if p := postingsPool.Get(); p != nil {
return p
}

// Pre-allocate slice with initial capacity.
return make([]storage.SeriesRef, 0, 1024)
}

func putPostingsSlice(p []storage.SeriesRef) {
postingsPool.Put(p[:0])
}

type bucketStoreMetrics struct {
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
Expand Down Expand Up @@ -2549,6 +2564,10 @@ type bucketIndexReader struct {

indexVersion int
logger log.Logger

// Posting slice to return to the postings pool on close.
// A single bucketIndexReader should have at most 1 postings slice to return.
postings []storage.SeriesRef
}

func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader {
Expand Down Expand Up @@ -2678,13 +2697,13 @@ func (r *bucketIndexReader) ExpandedPostings(

// ExpandPostingsWithContext returns the postings expanded as a slice and considers context.
func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage.SeriesRef, error) {
res := make([]storage.SeriesRef, 0, 1024) // Pre-allocate slice with initial capacity
res := getPostingsSlice()
i := 0
for p.Next() {
i++
if i%checkContextEveryNIterations == 0 {
if err := ctx.Err(); err != nil {
return nil, err
return res, err
}
}
res = append(res, p.At())
Expand Down Expand Up @@ -2978,6 +2997,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
}

ps, err := ExpandPostingsWithContext(ctx, p)
r.postings = ps
if err != nil {
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
Expand Down Expand Up @@ -3414,6 +3434,10 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym
// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()

if r.postings != nil {
putPostingsSlice(r.postings)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2979,7 +2979,7 @@ func TestExpandPostingsWithContextCancel(t *testing.T) {
res, err := ExpandPostingsWithContext(ctx, p)
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
testutil.Equals(t, []storage.SeriesRef(nil), res)
testutil.Equals(t, true, cap(res) == 1024)
}

func samePostingGroup(a, b *postingGroup) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
return nil, nil, err
}
ps, err := ExpandPostingsWithContext(ctx, result)
r.postings = ps
if err != nil {
return nil, nil, errors.Wrap(err, "expand")
}
Expand Down

0 comments on commit 07734b9

Please sign in to comment.