Skip to content

Commit

Permalink
skip fetching and backfilling postings from index cache if a posting …
Browse files Browse the repository at this point in the history
…group has a lot of keys

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jan 12, 2025
1 parent f250d68 commit 495fd86
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 273 deletions.
92 changes: 85 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ type BucketStore struct {

enabledLazyExpandedPostings bool
postingGroupMaxKeySeriesRatio float64
maxKeysToSkipCache int

sortingStrategy sortingStrategy

Expand Down Expand Up @@ -591,6 +592,13 @@ func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) Bu
}
}

// WithMaxKeysToSkipCache configures a threshold to skip fetching index cache if a posting group has more keys.
func WithMaxKeysToSkipCache(maxKeysToSkipCache int) BucketStoreOption {
return func(s *BucketStore) {
s.maxKeysToSkipCache = maxKeysToSkipCache
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -1067,6 +1075,7 @@ type blockSeriesClient struct {
lazyExpandedPostingEnabled bool
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeySeriesRatio float64
maxKeysToSkipCache int
lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
Expand Down Expand Up @@ -1112,6 +1121,7 @@ func newBlockSeriesClient(
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
maxKeysToSkipCache int,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
lazyExpandedPostingSizeBytes prometheus.Counter,
Expand Down Expand Up @@ -1149,6 +1159,7 @@ func newBlockSeriesClient(

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio,
maxKeysToSkipCache: maxKeysToSkipCache,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1202,7 +1213,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.maxKeysToSkipCache, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1636,6 +1647,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.maxKeysToSkipCache,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1952,6 +1964,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.maxKeysToSkipCache,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -2180,6 +2193,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.maxKeysToSkipCache,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -2648,6 +2662,7 @@ func (r *bucketIndexReader) ExpandedPostings(
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
maxKeysToSkipCache int,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
Expand Down Expand Up @@ -2703,7 +2718,7 @@ func (r *bucketIndexReader) ExpandedPostings(
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, maxKeysToSkipCache, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down Expand Up @@ -3070,22 +3085,80 @@ var bufioReaderPool = sync.Pool{
},
}

func skipCachePostingGroup(group *postingGroup, maxKeys int) bool {
if maxKeys <= 0 {
return false
}
return len(group.addKeys) > maxKeys || len(group.removeKeys) > maxKeys
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*postingGroup, maxKeysToSkipCache int, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
var closeFns []func()

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()
cacheKeyCount := 0
skipCacheKeyCount := 0
skipCachePostingGroupSet := make(map[string]struct{})
// Find out posting groups which fetch more than 1 key to fetch expanded posting cache.
for _, pg := range postingGroups {
if pg.lazy {
continue
}
// If posting group has more than maxKeysToSkipCache key to fetch, skip fetching them from cache.
// This helps for matcher such as !="", =~".+" to avoid fetching too many keys from cache.
if skipCachePostingGroup(pg, maxKeysToSkipCache) {
skipCacheKeyCount += len(pg.addKeys) + len(pg.removeKeys)
skipCachePostingGroupSet[pg.name] = struct{}{}
continue
}

// A posting group has either add key or remove key and cannot have both the same time.
cacheKeyCount += len(pg.addKeys) + len(pg.removeKeys)
}

totalKeys := cacheKeyCount + skipCacheKeyCount
cacheKeys := make([]labels.Label, 0, cacheKeyCount)
keys := make([]labels.Label, 0, totalKeys)
output := make([]index.Postings, totalKeys)
var ptrs []postingPtr
if skipCacheKeyCount > 0 {
// We know we have keys to fetch but bypass cache.
ptrs = make([]postingPtr, 0, skipCacheKeyCount)
}

output := make([]index.Postings, len(keys))
for _, pg := range postingGroups {
if pg.lazy {
continue
}
if !skipCachePostingGroup(pg, maxKeysToSkipCache) {
// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
for _, key := range pg.addKeys {
cacheKeys = append(cacheKeys, labels.Label{Name: pg.name, Value: key})
}
for _, key := range pg.removeKeys {
cacheKeys = append(cacheKeys, labels.Label{Name: pg.name, Value: key})
}
}
// Cache keys are copied twice but they should be very small portion as groups
// of large amount of keys are already not fetched from cache.
for _, key := range pg.addKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
for _, key := range pg.removeKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
}

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()

var size int64
// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, cacheKeys, tenant)
for _, dataFromCache := range fromCache {
size += int64(len(dataFromCache))
}
Expand Down Expand Up @@ -3186,6 +3259,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)
// If the corresponding posting group is marked as no cache, don't encode
// and restore data to cache.
if _, ok := skipCachePostingGroupSet[keys[keyID].Name]; ok {
continue
}

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -2932,6 +2932,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
nil,
false,
0,
0,
dummyCounter,
dummyCounterVec,
dummyCounter,
Expand Down Expand Up @@ -3591,7 +3592,7 @@ func TestExpandedPostingsRace(t *testing.T) {
wg.Add(1)

go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
61 changes: 26 additions & 35 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func fetchLazyExpandedPostings(
addAllPostings bool,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
maxKeysToSkipCache int,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
Expand Down Expand Up @@ -250,7 +251,7 @@ func fetchLazyExpandedPostings(
}
}

ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant)
ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant, lazyExpandedPostingEnabled, maxKeysToSkipCache)
if err != nil {
return nil, err
}
Expand All @@ -260,42 +261,20 @@ func fetchLazyExpandedPostings(
return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil
}

// keysToFetchFromPostingGroups returns label pairs (postings) to fetch
// and matchers we need to use for lazy posting expansion.
// Input `postingGroups` needs to be ordered by cardinality in case lazy
// expansion is enabled. When we find the first lazy posting group we can exit.
func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) {
func fetchAndExpandPostingGroups(
ctx context.Context,
r *bucketIndexReader,
postingGroups []*postingGroup,
bytesLimiter BytesLimiter,
tenant string,
lazyExpandedPostingEnabled bool,
maxKeysToSkipCache int,
) ([]storage.SeriesRef, []*labels.Matcher, error) {
var lazyMatchers []*labels.Matcher
keys := make([]labels.Label, 0)
i := 0
for i < len(postingGroups) {
pg := postingGroups[i]
if pg.lazy {
if len(lazyMatchers) == 0 {
lazyMatchers = make([]*labels.Matcher, 0)
}
lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...)
} else {
// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
for _, key := range pg.addKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
for _, key := range pg.removeKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
}

i++
if lazyExpandedPostingEnabled {
lazyMatchers = lazyMatchersFromPostingGroups(postingGroups)
}

return keys, lazyMatchers
}

func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) {
keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, postingGroups, maxKeysToSkipCache, bytesLimiter, tenant)
defer func() {
for _, closeFn := range closeFns {
closeFn()
Expand Down Expand Up @@ -348,3 +327,15 @@ func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings,
result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...))
return result
}

// lazyMatchersFromPostingGroups returns matchers for lazy posting expansion.
func lazyMatchersFromPostingGroups(postingGroups []*postingGroup) []*labels.Matcher {
var lazyMatchers []*labels.Matcher
for i := 0; i < len(postingGroups); i++ {
pg := postingGroups[i]
if pg.lazy {
lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...)
}
}
return lazyMatchers
}
Loading

0 comments on commit 495fd86

Please sign in to comment.