From 3ead951f3e0b41286b077d220fe59b974deef86e Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 22 Jun 2023 00:45:11 -0700 Subject: [PATCH] optimize postings fetching by checking postings and series size Signed-off-by: Ben Ye --- cmd/thanos/store.go | 5 + pkg/block/indexheader/binary_reader.go | 19 +- pkg/block/indexheader/header.go | 6 +- pkg/block/indexheader/header_test.go | 32 +++ pkg/block/indexheader/lazy_binary_reader.go | 13 ++ pkg/store/bucket.go | 238 ++++++++++++-------- pkg/store/bucket_test.go | 127 ++++++----- pkg/store/lazy_postings.go | 235 +++++++++++++++++++ pkg/store/lazy_postings_test.go | 197 ++++++++++++++++ 9 files changed, 720 insertions(+), 152 deletions(-) create mode 100644 pkg/store/lazy_postings.go create mode 100644 pkg/store/lazy_postings_test.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 3bc0082da3e..39f97918329 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -88,6 +88,7 @@ type storeConfig struct { reqLogConfig *extflag.PathOrContent lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) + cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). + Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -382,6 +386,7 @@ func runStore( } return conf.estimatedMaxChunkSize }), + store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), } if conf.debugLogging { diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 1befe63a7f2..fb558717f7c 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -47,6 +47,8 @@ const ( postingLengthFieldSize = 4 ) +var notFoundRange = index.Range{Start: -1, End: -1} + // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) { return r.indexVersion, nil } +// PostingsOffsets implements Reader. +func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + return r.postingsOffset(name, values...) +} + // TODO(bwplotka): Get advantage of multi value offset fetch. func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) { rngs, err := r.postingsOffset(name, value) if err != nil { return index.Range{}, err } - if len(rngs) != 1 { + if len(rngs) != 1 || rngs[0] == notFoundRange { return index.Range{}, NotFoundRangeErr } return rngs[0], nil @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value { // Discard values before the start. + rngs = append(rngs, notFoundRange) valueIndex++ } @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue }) if i == len(e.offsets) { // We're past the end. + for len(rngs) < len(values) { + rngs = append(rngs, notFoundRange) + } break } if i > 0 && e.offsets[i].value != wantedValue { @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra // Record on the way if wanted value is equal to the current value. if string(value) == wantedValue { newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize}) + } else { + rngs = append(rngs, notFoundRange) } valueIndex++ if valueIndex == len(values) { @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra } if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value { + // Increment i when wanted value is same as next offset. + if wantedValue == e.offsets[i+1].value { + i++ + } // wantedValue is smaller or same as the next offset we know about, let's iterate further to add those. continue } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index 8ecef33564d..efd369e7cdd 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -20,10 +20,14 @@ type Reader interface { // IndexVersion returns version of index. IndexVersion() (int, error) + // PostingsOffsets returns start and end offsets for postings for given name and values. + // Input values need to be sorted. If a posting doesn't exist, posting with start and end + // both set to -1 will be returned. + PostingsOffsets(name string, value ...string) ([]index.Range, error) + // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. // NotFoundRangeErr is returned when no index can be found for given name and value. - // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. PostingsOffset(name string, value string) (index.Range, error) // LookupSymbol returns string based on given reference. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index d0d7eb5f7dd..2d3a1722233 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, []string(nil), vals) + // single value + rngs, err := br.PostingsOffsets("a", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9") + testutil.Ok(t, err) + for _, rng := range rngs { + testutil.Assert(t, rng.End > rng.Start) + } + + rngs, err = br.PostingsOffsets("a", "0") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 1) + testutil.Equals(t, notFoundRange, rngs[0]) + + rngs, err = br.PostingsOffsets("a", "0", "10", "99") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + for _, rng := range rngs { + testutil.Equals(t, notFoundRange, rng) + } + + rngs, err = br.PostingsOffsets("a", "1", "10", "9") + testutil.Ok(t, err) + testutil.Assert(t, len(rngs) == 3) + testutil.Assert(t, rngs[0].End > rngs[0].Start) + testutil.Assert(t, rngs[2].End > rngs[2].Start) + testutil.Equals(t, notFoundRange, rngs[1]) + // Regression tests for https://github.com/thanos-io/thanos/issues/2213. // Most of not existing value was working despite bug, except in certain unlucky cases // it was causing "invalid size" errors. diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index c3bee382c2f..451a79b6ee5 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { return r.reader.IndexVersion() } +// PostingsOffsets implements Reader. +func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffsets(name, values...) +} + // PostingsOffset implements Reader. func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) { r.readerMx.RLock() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index cbec28a7c7b..8fc7a582cda 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -110,6 +110,7 @@ const ( ) var ( + allPostingsMatcher = labels.MustNewMatcher(labels.MatchEqual, "", "") errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} ) @@ -359,6 +360,8 @@ type BucketStore struct { enableChunkHashCalculation bool + enabledLazyExpandedPostings bool + blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -463,6 +466,13 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { } } +// WithLazyExpandedPostings enables lazy expanded postings. +func WithLazyExpandedPostings(enabled bool) BucketStoreOption { + return func(s *BucketStore) { + s.enabledLazyExpandedPostings = enabled + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -888,18 +898,20 @@ type blockSeriesClient struct { skipChunks bool shardMatcher *storepb.ShardMatcher + blockMatchers []*labels.Matcher calculateChunkHash bool chunkFetchDuration prometheus.Histogram // Internal state. - i uint64 - postings []storage.SeriesRef - chkMetas []chunks.Meta - lset labels.Labels - symbolizedLset []symbolizedLabel - entries []seriesEntry - hasMorePostings bool - batchSize int + i uint64 + lazyPostings *lazyExpandedPostings + expandedPostings []storage.SeriesRef + chkMetas []chunks.Meta + lset labels.Labels + symbolizedLset []symbolizedLabel + entries []seriesEntry + hasMorePostings bool + batchSize int } func newBlockSeriesClient( @@ -909,6 +921,7 @@ func newBlockSeriesClient( req *storepb.SeriesRequest, limiter ChunksLimiter, bytesLimiter BytesLimiter, + blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, calculateChunkHash bool, batchSize int, @@ -940,6 +953,7 @@ func newBlockSeriesClient( loadAggregates: req.Aggregates, shardMatcher: shardMatcher, + blockMatchers: blockMatchers, calculateChunkHash: calculateChunkHash, hasMorePostings: true, batchSize: batchSize, @@ -965,23 +979,28 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { func (b *blockSeriesClient) ExpandPostings( matchers []*labels.Matcher, seriesLimiter SeriesLimiter, + lazyExpandedPostingEnabled bool, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled) if err != nil { return errors.Wrap(err, "expanded matching posting") } - if len(ps) == 0 { + b.lazyPostings = ps + if len(ps.postings) == 0 { return nil } - if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } - b.postings = ps - if b.batchSize > len(ps) { - b.batchSize = len(ps) + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) + } + if b.lazyPostings.lazyExpanded() { + // Assume lazy expansion could cut actual expanded postings length to 50%. + b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1013,14 +1032,26 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (b *blockSeriesClient) nextBatch() error { start := b.i end := start + SeriesBatchSize - if end > uint64(len(b.postings)) { - end = uint64(len(b.postings)) + if end > uint64(len(b.lazyPostings.postings)) { + end = uint64(len(b.lazyPostings.postings)) } b.i = end - postingsBatch := b.postings[start:end] + postingsBatch := b.lazyPostings.postings[start:end] if len(postingsBatch) == 0 { b.hasMorePostings = false + if b.lazyPostings.lazyExpanded() { + v, err := b.indexr.IndexVersion() + if err != nil { + return errors.Wrap(err, "get index version") + } + if v >= 2 { + for i := range b.expandedPostings { + b.expandedPostings[i] = b.expandedPostings[i] / 16 + } + } + b.indexr.storeExpandedPostingsToCache(b.blockMatchers, index.NewListPostings(b.expandedPostings), len(b.expandedPostings)) + } return nil } @@ -1050,6 +1081,16 @@ func (b *blockSeriesClient) nextBatch() error { return errors.Wrap(err, "Lookup labels symbols") } + for _, matcher := range b.lazyPostings.matchers { + val := b.lset.Get(matcher.Name) + if !matcher.Matches(val) { + continue + } + } + if b.lazyPostings.lazyExpanded() { + b.expandedPostings = append(b.expandedPostings, postingsBatch[i]) + } + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if !b.shardMatcher.MatchesLabels(completeLabelset) { continue @@ -1286,6 +1327,17 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie if !ok { continue } + // Sort matchers to make sure we generate the same cache key + // when fetching expanded postings. + sort.Slice(blockMatchers, func(i, j int) bool { + if blockMatchers[i].Type == blockMatchers[j].Type { + if blockMatchers[i].Name == blockMatchers[j].Name { + return blockMatchers[i].Value < blockMatchers[j].Value + } + return blockMatchers[i].Name < blockMatchers[j].Name + } + return blockMatchers[i].Type < blockMatchers[j].Type + }) blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) @@ -1311,6 +1363,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req, chunksLimiter, bytesLimiter, + blockMatchers, shardMatcher, s.enableChunkHashCalculation, s.seriesBatchSize, @@ -1329,7 +1382,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil { + if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter, s.enabledLazyExpandedPostings); err != nil { span.Finish() return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) } @@ -1578,6 +1631,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1589,6 +1643,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, ); err != nil { return err } @@ -1775,6 +1830,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR seriesReq, nil, bytesLimiter, + reqSeriesMatchersNoExtLabels, nil, true, SeriesBatchSize, @@ -1786,6 +1842,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, seriesLimiter, + s.enabledLazyExpandedPostings, ); err != nil { return err } @@ -2179,6 +2236,8 @@ type bucketIndexReader struct { mtx sync.Mutex loadedSeries map[storage.SeriesRef][]byte + + indexVersion int } func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { @@ -2192,6 +2251,20 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } + +// IndexVersion caches the index header version. +func (r *bucketIndexReader) IndexVersion() (int, error) { + if r.indexVersion != 0 { + return r.indexVersion, nil + } + v, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return 0, err + } + r.indexVersion = v + return v, nil +} + func (r *bucketIndexReader) reset() { r.loadedSeries = map[storage.SeriesRef][]byte{} } @@ -2205,34 +2278,23 @@ func (r *bucketIndexReader) reset() { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { - return nil, nil + return emptyLazyPostings, nil } - // Sort matchers to make sure we generate the same cache key. - sort.Slice(ms, func(i, j int) bool { - if ms[i].Type == ms[j].Type { - if ms[i].Name == ms[j].Name { - return ms[i].Value < ms[j].Value - } - return ms[i].Name < ms[j].Name - } - return ms[i].Type < ms[j].Type - }) hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) if err != nil { return nil, err } if hit { - return postings, nil + return newLazyExpandedPostings(postings), nil } var ( allRequested = false hasAdds = false - keys []labels.Label ) postingGroups, err := matchersToPostingGroups(ctx, r.block.indexHeaderReader.LabelValues, ms) @@ -2243,83 +2305,47 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) return nil, nil } + i := 0 for _, pg := range postingGroups { allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 - // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later. - // We don't have any other way of pairing keys and fetched postings. - for _, key := range pg.addKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) - } - for _, key := range pg.removeKeys { - keys = append(keys, labels.Label{Name: pg.name, Value: key}) + // If a posting group doesn't have any keys, like posting group created + // from `=~".*"`, we don't have to keep the posting group as long as we + // keep track of whether we need to add all postings or not. + if len(pg.addKeys) == 0 && len(pg.removeKeys) == 0 { + continue } + postingGroups[i] = pg + i++ } + postingGroups = postingGroups[:i] + addAllPostings := allRequested && !hasAdds // We only need special All postings if there are no other adds. If there are, we can skip fetching // special All postings completely. - if allRequested && !hasAdds { + if addAllPostings { // add group with label to fetch "special All postings". name, value := index.AllPostingsKey() - allPostingsLabel := labels.Label{Name: name, Value: value} - postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) - keys = append(keys, allPostingsLabel) - } - - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) - defer func() { - for _, closeFn := range closeFns { - closeFn() - } - }() - if err != nil { - return nil, errors.Wrap(err, "get postings") - } - - // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys - // again, and this is exactly the same order as before (when building the groups), so we can simply - // use one incrementing index to fetch postings from returned slice. - postingIndex := 0 - - var groupAdds, groupRemovals []index.Postings - for _, g := range postingGroups { - // We cannot add empty set to groupAdds, since they are intersected. - if len(g.addKeys) > 0 { - toMerge := make([]index.Postings, 0, len(g.addKeys)) - for _, l := range g.addKeys { - toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } - - groupAdds = append(groupAdds, index.Merge(toMerge...)) - } - - for _, l := range g.removeKeys { - groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) - postingIndex++ - } } - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) - ps, err := ExpandPostingsWithContext(ctx, result) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled) if err != nil { - return nil, errors.Wrap(err, "expand") + return nil, errors.Wrap(err, "fetch and expand postings") } - r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps), len(ps)) + r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) - if len(ps) > 0 { + if len(ps.postings) > 0 { // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() + version, err := r.IndexVersion() if err != nil { return nil, errors.Wrap(err, "get index version") } if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + for i, id := range ps.postings { + ps.postings[i] = id * 16 } } } @@ -2342,16 +2368,19 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []sto // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This computation happens in ExpandedPostings. type postingGroup struct { - addAll bool - name string - addKeys []string - removeKeys []string + addAll bool + name string + matchers []*labels.Matcher + addKeys []string + removeKeys []string + cardinality int64 + lazy bool } func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup { return &postingGroup{ - addAll: addAll, name: name, + addAll: addAll, addKeys: addKeys, removeKeys: removeKeys, } @@ -2453,12 +2482,16 @@ func checkNilPosting(name, value string, p index.Postings) index.Postings { } func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]string, error), ms []*labels.Matcher) ([]*postingGroup, error) { - matchersMap := make(map[string][]*labels.Matcher) + matchersMap := make(map[string]map[string]*labels.Matcher) for _, m := range ms { - matchersMap[m.Name] = append(matchersMap[m.Name], m) + m := m + if _, ok := matchersMap[m.Name]; !ok { + matchersMap[m.Name] = make(map[string]*labels.Matcher) + } + matchersMap[m.Name][m.String()] = m } - pgs := make([]*postingGroup, 0) + pgs := make([]*postingGroup, 0, len(matchersMap)) // NOTE: Derived from tsdb.PostingsForMatchers. for _, values := range matchersMap { var ( @@ -2469,8 +2502,9 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s valuesCached bool ) lvalsFunc := lvalsFn + matchers := make([]*labels.Matcher, 0, len(vals)) // Merge PostingGroups with the same matcher into 1 to - // avoid fetching duplicate postings. + // avoid fetching duplicate postings. for _, val := range values { pg, vals, err = toPostingGroup(ctx, lvalsFunc, val) if err != nil { @@ -2502,7 +2536,19 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s if !mergedPG.addAll && len(mergedPG.addKeys) == 0 { return nil, nil } - } + matchers = append(matchers, val) + } + // Set and sort matchers to be used when picking up posting fetch strategy. + mergedPG.matchers = matchers + slices.SortFunc(mergedPG.matchers, func(a, b *labels.Matcher) bool { + if a.Type == b.Type { + if a.Name == b.Name { + return a.Value < b.Value + } + return a.Name < b.Name + } + return a.Type < b.Type + }) pgs = append(pgs, mergedPG) } slices.SortFunc(pgs, func(a, b *postingGroup) bool { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e4c1039e8ea..5de62d20f91 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1228,9 +1228,9 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil)) + p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, c.expectedLen, len(p)) + testutil.Equals(t, c.expectedLen, len(p.postings)) } }) } @@ -1261,9 +1261,9 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") - ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil)) + ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil), false) testutil.Ok(t, err) - testutil.Equals(t, len(ps), 0) + testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. testutil.Equals(t, 1, indexr.stats.cachedPostingsCompressions) } @@ -2571,13 +2571,14 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet req, chunksLimiter, NewBytesLimiterFactory(0)(nil), + matchers, nil, false, SeriesBatchSize, dummyHistogram, nil, ) - testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter)) + testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, false)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). @@ -2632,9 +2633,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2649,9 +2651,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2669,9 +2672,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2687,14 +2691,16 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "bar", - addAll: false, - addKeys: []string{"baz"}, + name: "bar", + addAll: false, + addKeys: []string{"baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "bar", "baz")}, }, { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, }, }, @@ -2731,9 +2737,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2758,9 +2765,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "b.*")}, }, }, }, @@ -2775,9 +2783,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "")}, }, }, }, @@ -2792,9 +2801,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")}, }, }, }, @@ -2809,9 +2819,10 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: "foo", - addAll: false, - addKeys: []string{"bar"}, + name: "foo", + addAll: false, + addKeys: []string{"bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|baz"), labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar|buzz")}, }, }, }, @@ -2829,6 +2840,7 @@ func TestMatchersToPostingGroup(t *testing.T) { name: "foo", addAll: true, removeKeys: []string{"bar", "baz"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "foo", "bar"), labels.MustNewMatcher(labels.MatchNotEqual, "foo", "baz")}, }, }, }, @@ -2846,8 +2858,9 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: true, + name: labels.MetricName, + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")}, }, }, }, @@ -2865,18 +2878,21 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: true, + name: "job", + addAll: true, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", ".*")}, }, }, }, @@ -2897,19 +2913,22 @@ func TestMatchersToPostingGroup(t *testing.T) { }, expected: []*postingGroup{ { - name: labels.MetricName, - addAll: false, - addKeys: []string{"go_info", "up"}, + name: labels.MetricName, + addAll: false, + addKeys: []string{"go_info", "up"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "__name__", "")}, }, { - name: "cluster", - addAll: false, - addKeys: []string{"us-east-1", "us-west-2"}, + name: "cluster", + addAll: false, + addKeys: []string{"us-east-1", "us-west-2"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "cluster", "")}, }, { - name: "job", - addAll: false, - addKeys: []string{"prometheus", "thanos"}, + name: "job", + addAll: false, + addKeys: []string{"prometheus", "thanos"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "job", "")}, }, }, }, diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go new file mode 100644 index 00000000000..5ced62b281b --- /dev/null +++ b/pkg/store/lazy_postings.go @@ -0,0 +1,235 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "math" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" + "golang.org/x/exp/slices" +) + +var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} + +// lazyExpandedPostings contains expanded postings (series IDs). If lazy posting expansion is +// enabled, it might contain matchers that can be lazily applied during series filtering time. +type lazyExpandedPostings struct { + postings []storage.SeriesRef + matchers []*labels.Matcher +} + +func newLazyExpandedPostings(ps []storage.SeriesRef, matchers ...*labels.Matcher) *lazyExpandedPostings { + return &lazyExpandedPostings{ + postings: ps, + matchers: matchers, + } +} + +func (p *lazyExpandedPostings) lazyExpanded() bool { + return len(p.matchers) > 0 +} + +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64) ([]*postingGroup, error) { + // Collect posting cardinality of each posting groups. + for _, pg := range postingGroups { + // A posting group can have either add keys or remove keys but not both the same time. + vals := pg.addKeys + if len(pg.removeKeys) > 0 { + vals = pg.removeKeys + } + rngs, err := r.block.indexHeaderReader.PostingsOffsets(pg.name, vals...) + if err != nil { + return nil, errors.Wrap(err, "postings offset") + } + for _, r := range rngs { + pg.cardinality += (r.End - r.Start - 4) / 4 + } + } + slices.SortFunc(postingGroups, func(a, b *postingGroup) bool { + return a.cardinality < b.cardinality + }) + + /* + Algorithm of choosing what postings we need to fetch right now and what + postings we expand lazily. + Sort posting groups by cardinality, so we can iterate from posting group with the smallest posting size. + The algorithm focuses on fetching fewer data, including postings and series. + + We need to fetch at least 1 posting group in order to fetch series. So if we only fetch the first posting group, + the data bytes we need to download is formula F1: P1 * 4 + P1 * S where P1 is the number of postings in group 1 + and S is the size per series. 4 is the byte size per posting. + + If we are going to fetch 2 posting groups, we can intersect the two postings to reduce series we need to download (hopefully). + Assuming for each intersection, the series matching ratio is R (0 < R < 1). Then the data bytes we need to download is + formula F2: P1 * 4 + P2 * 4 + P1 * S * R. + We can get formula F3 if we are going to fetch 3 posting groups: + F3: P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2. + + Let's compare formula F2 and F1 first. + P1 * 4 + P2 * 4 + P1 * S * R < P1 * 4 + P1 * S + => P2 * 4 < P1 * S * (1 - R) + Left hand side is the posting group size and right hand side is basically the series size we don't need to fetch + by having the additional intersection. In order to fetch less data for F2 than F1, we just need to ensure that + the additional postings size is smaller. + + Let's compare formula F3 and F2. + P1 * 4 + P2 * 4 + P3 * 4 + P1 * S * R^2 < P1 * 4 + P2 * 4 + P1 * S * R + => P3 * 4 < P1 * S * R * (1 - R) + Same as the previous formula. + + Compare formula F4 (Cost to fetch up to 4 posting groups) and F3. + P4 * 4 < P1 * S * R^2 * (1 - R) + + We can generalize this to formula: Pn * 4 < P1 * S * R^(n - 2) * (1 - R) + + The idea of the algorithm: + By iterating the posting group in sorted order of cardinality, we need to make sure that by fetching the current posting group, + the total data fetched is smaller than the previous posting group. If so, then we continue to next posting group, + otherwise we stop. + + This ensures that when we stop at one posting group, posting groups after it always need to fetch more data. + Based on formula Pn * 4 < P1 * S * R^(n - 2) * (1 - R), left hand side is always increasing while iterating to larger + posting groups while right hand side value is always decreasing as R < 1. + */ + seriesBytesToFetch := postingGroups[0].cardinality * seriesMaxSize + p := float64(1) + i := 1 // Start from index 1 as we always need to fetch the smallest posting group. + for i < len(postingGroups) { + pg := postingGroups[i] + // Need to fetch more data on postings than series we avoid fetching, stop here and lazy expanding rest of matchers. + if pg.cardinality*4 > int64(p*math.Ceil((1-seriesMatchRatio)*float64(seriesBytesToFetch))) { + break + } + p = p * seriesMatchRatio + i++ + } + for i < len(postingGroups) { + postingGroups[i].lazy = true + i++ + } + return postingGroups, nil +} + +func fetchLazyExpandedPostings( + ctx context.Context, + postingGroups []*postingGroup, + r *bucketIndexReader, + bytesLimiter BytesLimiter, + addAllPostings bool, + lazyExpandedPostingEnabled bool, +) (*lazyExpandedPostings, error) { + var err error + /* + There are several cases that we skip postings fetch optimization: + - Lazy expanded posting disabled. + - Add all postings. This means we don't have a posting group with any add keys. + - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Only one effective posting group then we need to at least download postings from 1 posting group. + */ + if lazyExpandedPostingEnabled && !addAllPostings && + r.block.meta.Thanos.IndexStats.SeriesMaxSize > 0 && len(postingGroups) > 1 { + postingGroups, err = optimizePostingsFetchByDownloadedBytes( + r, + postingGroups, + r.block.meta.Thanos.IndexStats.SeriesMaxSize, + 0.5, // TODO(yeya24): Expose this as a flag. + ) + if err != nil { + return nil, err + } + } + + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter) + if err != nil { + return nil, err + } + return &lazyExpandedPostings{postings: ps, matchers: matchers}, nil +} + +// keysToFetchFromPostingGroups returns label pairs (postings) to fetch +// and matchers we need to use for lazy posting expansion. +// Input `postingGroups` needs to be ordered by cardinality in case lazy +// expansion is enabled. When we find the first lazy posting group we can exit. +func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label, []*labels.Matcher) { + var lazyMatchers []*labels.Matcher + keys := make([]labels.Label, 0) + i := 0 + for i < len(postingGroups) { + pg := postingGroups[i] + if pg.lazy { + break + } + + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + for _, key := range pg.addKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + for _, key := range pg.removeKeys { + keys = append(keys, labels.Label{Name: pg.name, Value: key}) + } + i++ + } + if i < len(postingGroups) { + lazyMatchers = make([]*labels.Matcher, 0) + for i < len(postingGroups) { + lazyMatchers = append(lazyMatchers, postingGroups[i].matchers...) + i++ + } + } + return keys, lazyMatchers +} + +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter) ([]storage.SeriesRef, []*labels.Matcher, error) { + keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + if err != nil { + return nil, nil, errors.Wrap(err, "get postings") + } + + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + + var groupAdds, groupRemovals []index.Postings + for _, g := range postingGroups { + // We cannot add empty set to groupAdds, since they are intersected. + if len(g.addKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.addKeys { + toMerge = append(toMerge, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + groupRemovals = append(groupRemovals, checkNilPosting(g.name, l, fetchedPostings[postingIndex])) + postingIndex++ + } + } + + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go new file mode 100644 index 00000000000..9b0cfbd3a32 --- /dev/null +++ b/pkg/store/lazy_postings_test.go @@ -0,0 +1,197 @@ +package store + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" +) + +func TestKeysToFetchFromPostingGroups(t *testing.T) { + for _, tc := range []struct { + name string + pgs []*postingGroup + expectedLabels []labels.Label + expectedMatchers []*labels.Matcher + }{ + { + name: "empty group", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "empty groups", + pgs: []*postingGroup{ + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + { + addKeys: []string{}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{}, + }, + { + name: "group with add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{}, + removeKeys: []string{"foo", "bar"}, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + }, + { + name: "group with both add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + removeKeys: []string{"a", "b"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "test", Value: "a"}, {Name: "test", Value: "b"}, + }, + }, + { + name: "groups with both add keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + addKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "groups with add and remove keys", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + }, + { + name: "foo", + removeKeys: []string{"bar"}, + }, + }, + expectedLabels: []labels.Label{ + {Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}, + {Name: "foo", Value: "bar"}, + }, + }, + { + name: "lazy posting group with empty matchers", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{}, + }, + { + name: "lazy posting group", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "multiple lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + { + name: "multiple non lazy and lazy posting groups", + pgs: []*postingGroup{ + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + { + name: "test", + addKeys: []string{"foo", "bar"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, + lazy: true, + }, + { + name: "job", + addKeys: []string{"prometheus"}, + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*")}, + lazy: true, + }, + }, + expectedLabels: []labels.Label{{Name: "test", Value: "foo"}, {Name: "test", Value: "bar"}}, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "job", "prometheus.*"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + keys, matchers := keysToFetchFromPostingGroups(tc.pgs) + testutil.Equals(t, tc.expectedLabels, keys) + testutil.Equals(t, tc.expectedMatchers, matchers) + }) + } +}