Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip fetching postings from index cache for group with many keys #8054

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 85 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ type BucketStore struct {

enabledLazyExpandedPostings bool
postingGroupMaxKeySeriesRatio float64
maxKeysToSkipCache int

sortingStrategy sortingStrategy

Expand Down Expand Up @@ -591,6 +592,13 @@ func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) Bu
}
}

// WithMaxKeysToSkipCache configures a threshold to skip fetching index cache if a posting group has more keys.
func WithMaxKeysToSkipCache(maxKeysToSkipCache int) BucketStoreOption {
return func(s *BucketStore) {
s.maxKeysToSkipCache = maxKeysToSkipCache
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -1067,6 +1075,7 @@ type blockSeriesClient struct {
lazyExpandedPostingEnabled bool
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeySeriesRatio float64
maxKeysToSkipCache int
lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
Expand Down Expand Up @@ -1112,6 +1121,7 @@ func newBlockSeriesClient(
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
maxKeysToSkipCache int,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
lazyExpandedPostingSizeBytes prometheus.Counter,
Expand Down Expand Up @@ -1149,6 +1159,7 @@ func newBlockSeriesClient(

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio,
maxKeysToSkipCache: maxKeysToSkipCache,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1202,7 +1213,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.maxKeysToSkipCache, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1636,6 +1647,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.maxKeysToSkipCache,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1952,6 +1964,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.maxKeysToSkipCache,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -2180,6 +2193,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.maxKeysToSkipCache,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -2648,6 +2662,7 @@ func (r *bucketIndexReader) ExpandedPostings(
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
maxKeysToSkipCache int,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
Expand Down Expand Up @@ -2703,7 +2718,7 @@ func (r *bucketIndexReader) ExpandedPostings(
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, maxKeysToSkipCache, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down Expand Up @@ -3070,22 +3085,80 @@ var bufioReaderPool = sync.Pool{
},
}

func skipCachePostingGroup(group *postingGroup, maxKeys int) bool {
if maxKeys <= 0 {
return false
}
return len(group.addKeys) > maxKeys || len(group.removeKeys) > maxKeys
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*postingGroup, maxKeysToSkipCache int, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
var closeFns []func()

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()
cacheKeyCount := 0
skipCacheKeyCount := 0
skipCachePostingGroupSet := make(map[string]struct{})
// Find out posting groups which fetch more than 1 key to fetch expanded posting cache.
for _, pg := range postingGroups {
if pg.lazy {
continue
}
// If posting group has more than maxKeysToSkipCache key to fetch, skip fetching them from cache.
// This helps for matcher such as !="", =~".+" to avoid fetching too many keys from cache.
if skipCachePostingGroup(pg, maxKeysToSkipCache) {
skipCacheKeyCount += len(pg.addKeys) + len(pg.removeKeys)
skipCachePostingGroupSet[pg.name] = struct{}{}
continue
}

// A posting group has either add key or remove key and cannot have both the same time.
cacheKeyCount += len(pg.addKeys) + len(pg.removeKeys)
}

totalKeys := cacheKeyCount + skipCacheKeyCount
cacheKeys := make([]labels.Label, 0, cacheKeyCount)
keys := make([]labels.Label, 0, totalKeys)
output := make([]index.Postings, totalKeys)
var ptrs []postingPtr
if skipCacheKeyCount > 0 {
// We know we have keys to fetch but bypass cache.
ptrs = make([]postingPtr, 0, skipCacheKeyCount)
}

output := make([]index.Postings, len(keys))
for _, pg := range postingGroups {
if pg.lazy {
continue
}
if !skipCachePostingGroup(pg, maxKeysToSkipCache) {
// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
for _, key := range pg.addKeys {
cacheKeys = append(cacheKeys, labels.Label{Name: pg.name, Value: key})
}
for _, key := range pg.removeKeys {
cacheKeys = append(cacheKeys, labels.Label{Name: pg.name, Value: key})
}
}
// Cache keys are copied twice but they should be very small portion as groups
// of large amount of keys are already not fetched from cache.
for _, key := range pg.addKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
for _, key := range pg.removeKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
}

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()

var size int64
// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, cacheKeys, tenant)
for _, dataFromCache := range fromCache {
size += int64(len(dataFromCache))
}
Expand Down Expand Up @@ -3186,6 +3259,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()

output[keyID] = newDiffVarintPostings(diffVarintPostings, nil)
// If the corresponding posting group is marked as no cache, don't encode
// and restore data to cache.
if _, ok := skipCachePostingGroupSet[keys[keyID].Name]; ok {
continue
}

startCompression := time.Now()
dataToCache, err := snappyStreamedEncode(int(postingsCount), diffVarintPostings)
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -2932,6 +2932,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
nil,
false,
0,
0,
dummyCounter,
dummyCounterVec,
dummyCounter,
Expand Down Expand Up @@ -3591,7 +3592,7 @@ func TestExpandedPostingsRace(t *testing.T) {
wg.Add(1)

go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
61 changes: 26 additions & 35 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func fetchLazyExpandedPostings(
addAllPostings bool,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
maxKeysToSkipCache int,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
Expand Down Expand Up @@ -250,7 +251,7 @@ func fetchLazyExpandedPostings(
}
}

ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant)
ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant, lazyExpandedPostingEnabled, maxKeysToSkipCache)
if err != nil {
return nil, err
}
Expand All @@ -260,42 +261,20 @@ func fetchLazyExpandedPostings(
return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil
}

// keysToFetchFromPostingGroups returns label pairs (postings) to fetch
// and matchers we need to use for lazy posting expansion.
// Input `postingGroups` needs to be ordered by cardinality in case lazy
// expansion is enabled. When we find the first lazy posting group we can exit.
func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) {
func fetchAndExpandPostingGroups(
ctx context.Context,
r *bucketIndexReader,
postingGroups []*postingGroup,
bytesLimiter BytesLimiter,
tenant string,
lazyExpandedPostingEnabled bool,
maxKeysToSkipCache int,
) ([]storage.SeriesRef, []*labels.Matcher, error) {
var lazyMatchers []*labels.Matcher
keys := make([]labels.Label, 0)
i := 0
for i < len(postingGroups) {
pg := postingGroups[i]
if pg.lazy {
if len(lazyMatchers) == 0 {
lazyMatchers = make([]*labels.Matcher, 0)
}
lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...)
} else {
// Postings returned by fetchPostings will be in the same order as keys
// so it's important that we iterate them in the same order later.
// We don't have any other way of pairing keys and fetched postings.
for _, key := range pg.addKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
for _, key := range pg.removeKeys {
keys = append(keys, labels.Label{Name: pg.name, Value: key})
}
}

i++
if lazyExpandedPostingEnabled {
lazyMatchers = lazyMatchersFromPostingGroups(postingGroups)
}

return keys, lazyMatchers
}

func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) {
keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, postingGroups, maxKeysToSkipCache, bytesLimiter, tenant)
defer func() {
for _, closeFn := range closeFns {
closeFn()
Expand Down Expand Up @@ -348,3 +327,15 @@ func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings,
result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...))
return result
}

// lazyMatchersFromPostingGroups returns matchers for lazy posting expansion.
func lazyMatchersFromPostingGroups(postingGroups []*postingGroup) []*labels.Matcher {
var lazyMatchers []*labels.Matcher
for i := 0; i < len(postingGroups); i++ {
pg := postingGroups[i]
if pg.lazy {
lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...)
}
}
return lazyMatchers
}
Loading
Loading