From 62ef6662e89a5163425a1adc58cbeb1d71521201 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 10:27:37 +0100 Subject: [PATCH 01/19] Avoid fetching duplicate keys. Simplified groups with add/remove keys. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 150 ++++++++++++++++++++++---------------------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 15f818d956..bd5e9a6a35 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1311,6 +1311,7 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) { var postingGroups []*postingGroup + fetchMap := map[labels.Label]index.Postings{} // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { @@ -1321,19 +1322,20 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } postingGroups = append(postingGroups, pg) + pg.contributeKeysToFetchMap(fetchMap) } if len(postingGroups) == 0 { return nil, nil } - if err := r.fetchPostings(postingGroups); err != nil { + if err := r.fetchPostings(fetchMap); err != nil { return nil, errors.Wrap(err, "get postings") } var postings []index.Postings for _, g := range postingGroups { - postings = append(postings, g.Postings()) + postings = append(postings, g.Postings(fetchMap)) } ps, err := index.ExpandPostings(index.Intersect(postings...)) @@ -1353,81 +1355,80 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } type postingGroup struct { - keys labels.Labels - postings []index.Postings - - aggregate func(postings []index.Postings) index.Postings + addKeys, removeKeys []labels.Label } -func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup { +func newPostingGroup(addKeys, removeKeys []labels.Label) *postingGroup { return &postingGroup{ - keys: keys, - postings: make([]index.Postings, len(keys)), - aggregate: aggr, + addKeys: addKeys, + removeKeys: removeKeys, } } -func (p *postingGroup) Fill(i int, posting index.Postings) { - p.postings[i] = posting +func (p *postingGroup) Postings(fetched map[labels.Label]index.Postings) index.Postings { + toAdd := collectPostings(p.addKeys, fetched) + toRemove := collectPostings(p.removeKeys, fetched) + + return index.Without(index.Merge(toAdd...), index.Merge(toRemove...)) } -func (p *postingGroup) Postings() index.Postings { - if len(p.keys) == 0 { - return index.EmptyPostings() +func (p *postingGroup) contributeKeysToFetchMap(fetchMap map[labels.Label]index.Postings) { + for _, k := range p.addKeys { + fetchMap[k] = nil } + for _, k := range p.removeKeys { + fetchMap[k] = nil + } +} - for i, posting := range p.postings { - if posting == nil { +func collectPostings(lbls []labels.Label, fetched map[labels.Label]index.Postings) []index.Postings { + var result []index.Postings + for _, l := range lbls { + p := fetched[l] + if p == nil { // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. - return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i])) + p = index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l)) } - } - return p.aggregate(p.postings) + result = append(result, p) + } + return result } -func merge(p []index.Postings) index.Postings { - return index.Merge(p...) -} +var allPostingsKeyLabel labels.Label -func allWithout(p []index.Postings) index.Postings { - return index.Without(p[0], index.Merge(p[1:]...)) +// init allPostingsKeyLabel +func init() { + name, value := index.AllPostingsKey() + allPostingsKeyLabel = labels.Label{Name: name, Value: value} } // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { - var matchingLabels labels.Labels - // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555. if m.Matches("") { - allName, allValue := index.AllPostingsKey() + var toAdd, toRemove []labels.Label + + toAdd = append(toAdd, allPostingsKeyLabel) - matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue}) vals, err := lvalsFn(m.Name) if err != nil { return nil, err } for _, val := range vals { if !m.Matches(val) { - matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val}) + toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val}) } } - if len(matchingLabels) == 1 { - // This is known hack to return all series. - // Ask for x != . Allow for that as Prometheus does, - // even though it is expensive. - return newPostingGroup(matchingLabels, merge), nil - } - - return newPostingGroup(matchingLabels, allWithout), nil + return newPostingGroup(toAdd, toRemove), nil } // Fast-path for equal matching. if m.Type == labels.MatchEqual { - return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil + return newPostingGroup([]labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil } vals, err := lvalsFn(m.Name) @@ -1435,29 +1436,30 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match return nil, err } + var toAdd []labels.Label for _, val := range vals { if m.Matches(val) { - matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val}) + toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val}) } } - return newPostingGroup(matchingLabels, merge), nil + return newPostingGroup(toAdd, nil), nil } type postingPtr struct { - groupID int - keyID int - ptr index.Range + keyID int + ptr index.Range } // fetchPostings fill postings requested by posting groups. -func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { +// By forcing client to supply output map, client will deduplicate keys first. +func (r *bucketIndexReader) fetchPostings(output map[labels.Label]index.Postings) error { var ptrs []postingPtr // Fetch postings from the cache with a single call. - keys := make([]labels.Label, 0) - for _, g := range groups { - keys = append(keys, g.keys...) + keys := make([]labels.Label, 0, len(output)) + for k, _ := range output { + keys = append(keys, k) } fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys) @@ -1465,37 +1467,35 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { // Iterate over all groups and fetch posting from cache. // If we have a miss, mark key to be fetched in `ptrs` slice. // Overlaps are well handled by partitioner, so we don't need to deduplicate keys. - for i, g := range groups { - for j, key := range g.keys { - // Get postings for the given key from cache first. - if b, ok := fromCache[key]; ok { - r.stats.postingsTouched++ - r.stats.postingsTouchedSizeSum += len(b) + for j, key := range keys { + // Get postings for the given key from cache first. + if b, ok := fromCache[key]; ok { + r.stats.postingsTouched++ + r.stats.postingsTouchedSizeSum += len(b) - _, l, err := r.dec.Postings(b) - if err != nil { - return errors.Wrap(err, "decode postings") - } - - g.Fill(j, l) - continue + _, l, err := r.dec.Postings(b) + if err != nil { + return errors.Wrap(err, "decode postings") } - // Cache miss; save pointer for actual posting in index stored in object store. - ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) - if err == indexheader.NotFoundRangeErr { - // This block does not have any posting for given key. - g.Fill(j, index.EmptyPostings()) - continue - } + output[key] = l + continue + } - if err != nil { - return errors.Wrap(err, "index header PostingsOffset") - } + // Cache miss; save pointer for actual posting in index stored in object store. + ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) + if err == indexheader.NotFoundRangeErr { + // This block does not have any posting for given key. + output[key] = index.EmptyPostings() + continue + } - r.stats.postingsToFetch++ - ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j}) + if err != nil { + return errors.Wrap(err, "index header PostingsOffset") } + + r.stats.postingsToFetch++ + ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: j}) } sort.Slice(ptrs, func(i, j int) bool { @@ -1543,8 +1543,8 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { r.mtx.Lock() // Return postings and fill LRU cache. // Truncate first 4 bytes which are length of posting. - groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:])) - r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes) + output[keys[p.keyID]] = newBigEndianPostings(pBytes[4:]) + r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ From b80fcc461605cdca900d53d737a3559b87107f51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 11:01:52 +0100 Subject: [PATCH 02/19] Added shortcuts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 46 +++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index bd5e9a6a35..037bada1b2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1321,6 +1321,11 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "toPostingGroup") } + // interesction would return no postings anyway + if pg.AlwaysEmptyPostings() { + return nil, nil + } + postingGroups = append(postingGroups, pg) pg.contributeKeysToFetchMap(fetchMap) } @@ -1365,6 +1370,11 @@ func newPostingGroup(addKeys, removeKeys []labels.Label) *postingGroup { } } +// returns true, if this postingGroup will always return empty postings. +func (p *postingGroup) AlwaysEmptyPostings() bool { + return len(p.addKeys) == 0 +} + func (p *postingGroup) Postings(fetched map[labels.Label]index.Postings) index.Postings { toAdd := collectPostings(p.addKeys, fetched) toRemove := collectPostings(p.removeKeys, fetched) @@ -1372,15 +1382,6 @@ func (p *postingGroup) Postings(fetched map[labels.Label]index.Postings) index.P return index.Without(index.Merge(toAdd...), index.Merge(toRemove...)) } -func (p *postingGroup) contributeKeysToFetchMap(fetchMap map[labels.Label]index.Postings) { - for _, k := range p.addKeys { - fetchMap[k] = nil - } - for _, k := range p.removeKeys { - fetchMap[k] = nil - } -} - func collectPostings(lbls []labels.Label, fetched map[labels.Label]index.Postings) []index.Postings { var result []index.Postings for _, l := range lbls { @@ -1395,6 +1396,15 @@ func collectPostings(lbls []labels.Label, fetched map[labels.Label]index.Posting return result } +func (p *postingGroup) contributeKeysToFetchMap(fetchMap map[labels.Label]index.Postings) { + for _, k := range p.addKeys { + fetchMap[k] = nil + } + for _, k := range p.removeKeys { + fetchMap[k] = nil + } +} + var allPostingsKeyLabel labels.Label // init allPostingsKeyLabel @@ -1405,25 +1415,33 @@ func init() { // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { + if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { + // This matches all values, including no value. If it is the only matcher, it will return all postings. + return newPostingGroup([]labels.Label{allPostingsKeyLabel}, nil), nil + } + + // NOT matching any value = match nothing. We can shortcut this easily. + if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") { + return newPostingGroup(nil, []labels.Label{allPostingsKeyLabel}), nil + } + // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555. if m.Matches("") { - var toAdd, toRemove []labels.Label - - toAdd = append(toAdd, allPostingsKeyLabel) - vals, err := lvalsFn(m.Name) if err != nil { return nil, err } + + var toRemove []labels.Label for _, val := range vals { if !m.Matches(val) { toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val}) } } - return newPostingGroup(toAdd, toRemove), nil + return newPostingGroup([]labels.Label{allPostingsKeyLabel}, toRemove), nil } // Fast-path for equal matching. From 280db8ccd791195e7bfe1edc61f1efaa6c4e0d04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 12:17:41 +0100 Subject: [PATCH 03/19] Optimize away fetching of ALL postings, if possible. Only remove postings for each key once. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 93 ++++++++++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 037bada1b2..c614207eb9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1313,6 +1313,9 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er var postingGroups []*postingGroup fetchMap := map[labels.Label]index.Postings{} + all := false + hasAdds := false + // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { // Each group is separate to tell later what postings are intersecting with what. @@ -1322,28 +1325,60 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } // interesction would return no postings anyway - if pg.AlwaysEmptyPostings() { + if pg.alwaysEmptyPostings() { return nil, nil } postingGroups = append(postingGroups, pg) pg.contributeKeysToFetchMap(fetchMap) + + all = all || pg.addAll + hasAdds = hasAdds || len(pg.addKeys) > 0 } if len(postingGroups) == 0 { return nil, nil } + // we only need All postings if there are no other adds + if all && !hasAdds { + // ask fetchPostings to fetch 'all' postings too + fetchMap[allPostingsKeyLabel] = nil + } + if err := r.fetchPostings(fetchMap); err != nil { return nil, errors.Wrap(err, "get postings") } - var postings []index.Postings + // get add/remove postings from groups + var groupAdds []index.Postings + removals := map[labels.Label]index.Postings{} // use map to skip duplicates for _, g := range postingGroups { - postings = append(postings, g.Postings(fetchMap)) + if len(g.addKeys) > 0 { + var toMerge []index.Postings + for _, l := range g.addKeys { + toMerge = append(toMerge, getFetchedPosting(l, fetchMap)) + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + removals[l] = getFetchedPosting(l, fetchMap) + } } - ps, err := index.ExpandPostings(index.Intersect(postings...)) + if all && !hasAdds { + // ask fetchPostings to fetch 'all' postings too + groupAdds = append(groupAdds, getFetchedPosting(allPostingsKeyLabel, fetchMap)) + } + + result := index.Intersect(groupAdds...) + for _, p := range removals { + result = index.Without(result, p) + } + + ps, err := index.ExpandPostings(result) if err != nil { return nil, errors.Wrap(err, "expand") } @@ -1359,41 +1394,36 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return ps, nil } +// Logical result of each individual group is: +// If addAll is set: ALL postings minus postings for removeKeys labels. (No need to merge postings for addKeys in this case) +// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels +// This happens in ExpandedPostings. type postingGroup struct { - addKeys, removeKeys []labels.Label + addAll bool + addKeys []labels.Label + removeKeys []labels.Label } -func newPostingGroup(addKeys, removeKeys []labels.Label) *postingGroup { +func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup { return &postingGroup{ + addAll: addAll, addKeys: addKeys, removeKeys: removeKeys, } } // returns true, if this postingGroup will always return empty postings. -func (p *postingGroup) AlwaysEmptyPostings() bool { - return len(p.addKeys) == 0 +func (p *postingGroup) alwaysEmptyPostings() bool { + return !p.addAll && len(p.addKeys) == 0 } -func (p *postingGroup) Postings(fetched map[labels.Label]index.Postings) index.Postings { - toAdd := collectPostings(p.addKeys, fetched) - toRemove := collectPostings(p.removeKeys, fetched) - - return index.Without(index.Merge(toAdd...), index.Merge(toRemove...)) -} - -func collectPostings(lbls []labels.Label, fetched map[labels.Label]index.Postings) []index.Postings { - var result []index.Postings - for _, l := range lbls { - p := fetched[l] - if p == nil { - // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. - p = index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l)) - } - - result = append(result, p) +func getFetchedPosting(l labels.Label, fetched map[labels.Label]index.Postings) index.Postings { + p := fetched[l] + if p == nil { + // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. + p = index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l)) } - return result + return p } func (p *postingGroup) contributeKeysToFetchMap(fetchMap map[labels.Label]index.Postings) { @@ -1417,12 +1447,13 @@ func init() { func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { // This matches all values, including no value. If it is the only matcher, it will return all postings. - return newPostingGroup([]labels.Label{allPostingsKeyLabel}, nil), nil + return newPostingGroup(true, nil, nil), nil } // NOT matching any value = match nothing. We can shortcut this easily. if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") { - return newPostingGroup(nil, []labels.Label{allPostingsKeyLabel}), nil + // empty result + return newPostingGroup(false, nil, nil), nil } // If the matcher selects an empty value, it selects all the series which don't @@ -1441,12 +1472,12 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match } } - return newPostingGroup([]labels.Label{allPostingsKeyLabel}, toRemove), nil + return newPostingGroup(true, nil, toRemove), nil } // Fast-path for equal matching. if m.Type == labels.MatchEqual { - return newPostingGroup([]labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil + return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil } vals, err := lvalsFn(m.Name) @@ -1461,7 +1492,7 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match } } - return newPostingGroup(toAdd, nil), nil + return newPostingGroup(false, toAdd, nil), nil } type postingPtr struct { From 701d88eb2cb53f3ab5822b517d59dc7f7e895114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 13:13:34 +0100 Subject: [PATCH 04/19] Don't do individual index.Without, but merge them first. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c614207eb9..05cdb08c5d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1351,9 +1351,9 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } // get add/remove postings from groups - var groupAdds []index.Postings - removals := map[labels.Label]index.Postings{} // use map to skip duplicates + var groupAdds, removals []index.Postings for _, g := range postingGroups { + // we cannot add empty set to groupAdds, since they are intersected if len(g.addKeys) > 0 { var toMerge []index.Postings for _, l := range g.addKeys { @@ -1364,7 +1364,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } for _, l := range g.removeKeys { - removals[l] = getFetchedPosting(l, fetchMap) + removals = append(removals, getFetchedPosting(l, fetchMap)) } } @@ -1373,10 +1373,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er groupAdds = append(groupAdds, getFetchedPosting(allPostingsKeyLabel, fetchMap)) } - result := index.Intersect(groupAdds...) - for _, p := range removals { - result = index.Without(result, p) - } + result := index.Without(index.Intersect(groupAdds...), index.Merge(removals...)) ps, err := index.ExpandPostings(result) if err != nil { From 02d87c9ef9772b4bc6a73ae5f7bec4c1558dc4bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 15:11:12 +0100 Subject: [PATCH 05/19] Don't use map for fetching postings, but return slice instead. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is in line with original code. Using a map was nicer, but more expensive in terms of allocations and hashing labels. Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 93 ++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 05cdb08c5d..ca17b0b99b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1311,10 +1311,10 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) { var postingGroups []*postingGroup - fetchMap := map[labels.Label]index.Postings{} all := false hasAdds := false + keys := []labels.Label(nil) // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { @@ -1330,47 +1330,61 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } postingGroups = append(postingGroups, pg) - pg.contributeKeysToFetchMap(fetchMap) - all = all || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 + + // postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later, + // since we don't build any label -> keys index map + keys = append(keys, pg.addKeys...) + keys = append(keys, pg.removeKeys...) } if len(postingGroups) == 0 { return nil, nil } - // we only need All postings if there are no other adds + allKeyIndex := -1 + // we only need All postings if there are no other adds. If there are, we can skip fetching ALL postings completely. if all && !hasAdds { - // ask fetchPostings to fetch 'all' postings too - fetchMap[allPostingsKeyLabel] = nil + // remember the index (will be used later as a flag, and also to access postings), + // and ask fetchPostings to fetch ALL postings too + allKeyIndex = len(keys) + keys = append(keys, getAllPostingsKeyLabel()) } - if err := r.fetchPostings(fetchMap); err != nil { + fetchedPostings, err := r.fetchPostings(keys) + if err != nil { return nil, errors.Wrap(err, "get postings") } - // get add/remove postings from groups + // get add/remove postings from groups. While we iterate over postingGroups and their keys + // again, this is exactly the same order as before, when building the groups, so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + var groupAdds, removals []index.Postings for _, g := range postingGroups { // we cannot add empty set to groupAdds, since they are intersected if len(g.addKeys) > 0 { var toMerge []index.Postings for _, l := range g.addKeys { - toMerge = append(toMerge, getFetchedPosting(l, fetchMap)) + toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex])) + postingIndex++ } groupAdds = append(groupAdds, index.Merge(toMerge...)) } for _, l := range g.removeKeys { - removals = append(removals, getFetchedPosting(l, fetchMap)) + removals = append(removals, checkNilPosting(l, fetchedPostings[postingIndex])) + postingIndex++ } } - if all && !hasAdds { - // ask fetchPostings to fetch 'all' postings too - groupAdds = append(groupAdds, getFetchedPosting(allPostingsKeyLabel, fetchMap)) + if allKeyIndex >= 0 { + // if we have fetched "ALL" postings, add it + groupAdds = append(groupAdds, checkNilPosting(getAllPostingsKeyLabel(), fetchedPostings[allKeyIndex])) } result := index.Without(index.Intersect(groupAdds...), index.Merge(removals...)) @@ -1391,6 +1405,11 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return ps, nil } +func getAllPostingsKeyLabel() labels.Label { + name, value := index.AllPostingsKey() + return labels.Label{Name: name, Value: value} +} + // Logical result of each individual group is: // If addAll is set: ALL postings minus postings for removeKeys labels. (No need to merge postings for addKeys in this case) // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels @@ -1414,32 +1433,14 @@ func (p *postingGroup) alwaysEmptyPostings() bool { return !p.addAll && len(p.addKeys) == 0 } -func getFetchedPosting(l labels.Label, fetched map[labels.Label]index.Postings) index.Postings { - p := fetched[l] +func checkNilPosting(l labels.Label, p index.Postings) index.Postings { if p == nil { // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. - p = index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l)) + return index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l)) } return p } -func (p *postingGroup) contributeKeysToFetchMap(fetchMap map[labels.Label]index.Postings) { - for _, k := range p.addKeys { - fetchMap[k] = nil - } - for _, k := range p.removeKeys { - fetchMap[k] = nil - } -} - -var allPostingsKeyLabel labels.Label - -// init allPostingsKeyLabel -func init() { - name, value := index.AllPostingsKey() - allPostingsKeyLabel = labels.Label{Name: name, Value: value} -} - // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { @@ -1498,22 +1499,18 @@ type postingPtr struct { } // fetchPostings fill postings requested by posting groups. -// By forcing client to supply output map, client will deduplicate keys first. -func (r *bucketIndexReader) fetchPostings(output map[labels.Label]index.Postings) error { +func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) { var ptrs []postingPtr - // Fetch postings from the cache with a single call. - keys := make([]labels.Label, 0, len(output)) - for k, _ := range output { - keys = append(keys, k) - } + output := make([]index.Postings, len(keys)) + // Fetch postings from the cache with a single call. fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys) // Iterate over all groups and fetch posting from cache. // If we have a miss, mark key to be fetched in `ptrs` slice. // Overlaps are well handled by partitioner, so we don't need to deduplicate keys. - for j, key := range keys { + for ix, key := range keys { // Get postings for the given key from cache first. if b, ok := fromCache[key]; ok { r.stats.postingsTouched++ @@ -1521,10 +1518,10 @@ func (r *bucketIndexReader) fetchPostings(output map[labels.Label]index.Postings _, l, err := r.dec.Postings(b) if err != nil { - return errors.Wrap(err, "decode postings") + return nil, errors.Wrap(err, "decode postings") } - output[key] = l + output[ix] = l continue } @@ -1532,16 +1529,16 @@ func (r *bucketIndexReader) fetchPostings(output map[labels.Label]index.Postings ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) if err == indexheader.NotFoundRangeErr { // This block does not have any posting for given key. - output[key] = index.EmptyPostings() + output[ix] = index.EmptyPostings() continue } if err != nil { - return errors.Wrap(err, "index header PostingsOffset") + return nil, errors.Wrap(err, "index header PostingsOffset") } r.stats.postingsToFetch++ - ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: j}) + ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: ix}) } sort.Slice(ptrs, func(i, j int) bool { @@ -1589,7 +1586,7 @@ func (r *bucketIndexReader) fetchPostings(output map[labels.Label]index.Postings r.mtx.Lock() // Return postings and fill LRU cache. // Truncate first 4 bytes which are length of posting. - output[keys[p.keyID]] = newBigEndianPostings(pBytes[4:]) + output[p.keyID] = newBigEndianPostings(pBytes[4:]) r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes) // If we just fetched it we still have to update the stats for touched postings. @@ -1601,7 +1598,7 @@ func (r *bucketIndexReader) fetchPostings(output map[labels.Label]index.Postings }) } - return g.Wait() + return output, g.Wait() } func resizePostings(b []byte) ([]byte, error) { From f3e16562d6a3e484a70af3166f944dabc0919ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 15:36:35 +0100 Subject: [PATCH 06/19] Renamed 'all' to 'allRequested'. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ca17b0b99b..f8c27f6d60 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1312,7 +1312,7 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) { var postingGroups []*postingGroup - all := false + allRequested := false hasAdds := false keys := []labels.Label(nil) @@ -1330,7 +1330,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } postingGroups = append(postingGroups, pg) - all = all || pg.addAll + allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 // postings returned by fetchPostings will be in the same order as keys @@ -1346,7 +1346,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er allKeyIndex := -1 // we only need All postings if there are no other adds. If there are, we can skip fetching ALL postings completely. - if all && !hasAdds { + if allRequested && !hasAdds { // remember the index (will be used later as a flag, and also to access postings), // and ask fetchPostings to fetch ALL postings too allKeyIndex = len(keys) From ae3f5af32d5aa99c8309187f7c5ae1f233925c7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 15:37:23 +0100 Subject: [PATCH 07/19] Typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f8c27f6d60..217e7dc828 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1324,7 +1324,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "toPostingGroup") } - // interesction would return no postings anyway + // intersection would return no postings anyway if pg.alwaysEmptyPostings() { return nil, nil } From 00b84f6e572d41b59a6bee3142a2606192040592 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 19 Mar 2020 15:50:56 +0100 Subject: [PATCH 08/19] Make linter happy. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 217e7dc828..4a99f5666b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1324,7 +1324,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "toPostingGroup") } - // intersection would return no postings anyway + // Intersection with empty postings would return no postings anyway. if pg.alwaysEmptyPostings() { return nil, nil } @@ -1333,9 +1333,9 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er allRequested = allRequested || pg.addAll hasAdds = hasAdds || len(pg.addKeys) > 0 - // postings returned by fetchPostings will be in the same order as keys + // Postings returned by fetchPostings will be in the same order as keys // so it's important that we iterate them in the same order later, - // since we don't build any label -> keys index map + // since we don't build any label -> keys index map. keys = append(keys, pg.addKeys...) keys = append(keys, pg.removeKeys...) } @@ -1347,8 +1347,8 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er allKeyIndex := -1 // we only need All postings if there are no other adds. If there are, we can skip fetching ALL postings completely. if allRequested && !hasAdds { - // remember the index (will be used later as a flag, and also to access postings), - // and ask fetchPostings to fetch ALL postings too + // Remember the index (will be used later as a flag, and also to access postings), + // and ask fetchPostings to fetch ALL postings too. allKeyIndex = len(keys) keys = append(keys, getAllPostingsKeyLabel()) } @@ -1365,7 +1365,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er var groupAdds, removals []index.Postings for _, g := range postingGroups { - // we cannot add empty set to groupAdds, since they are intersected + // We cannot add empty set to groupAdds, since they are intersected. if len(g.addKeys) > 0 { var toMerge []index.Postings for _, l := range g.addKeys { @@ -1383,7 +1383,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } if allKeyIndex >= 0 { - // if we have fetched "ALL" postings, add it + // If we have fetched "ALL" postings, add it. groupAdds = append(groupAdds, checkNilPosting(getAllPostingsKeyLabel(), fetchedPostings[allKeyIndex])) } @@ -1443,14 +1443,13 @@ func checkNilPosting(l labels.Label, p index.Postings) index.Postings { // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { + // This matches all values, including no value. If it is the only matcher, it will return all postings. if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { - // This matches all values, including no value. If it is the only matcher, it will return all postings. return newPostingGroup(true, nil, nil), nil } // NOT matching any value = match nothing. We can shortcut this easily. if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") { - // empty result return newPostingGroup(false, nil, nil), nil } From b6f44ee54e3bd58294b741515da94103644936f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 20 Mar 2020 16:38:49 +0100 Subject: [PATCH 09/19] Added comment to fetchPostings. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4a99f5666b..49de9a1f53 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1498,6 +1498,8 @@ type postingPtr struct { } // fetchPostings fill postings requested by posting groups. +// It returns one postings for each key, in the same order. +// If postings for given key is not fetched, entry at given index will be nil. func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) { var ptrs []postingPtr From 68ba009de7c2b9384d824abe4626991e3d3aced8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 12:55:25 +0100 Subject: [PATCH 10/19] Group vars MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 49de9a1f53..f5900a2f2f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1310,11 +1310,12 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) { - var postingGroups []*postingGroup - - allRequested := false - hasAdds := false - keys := []labels.Label(nil) + var ( + postingGroups []*postingGroup + allRequested = false + hasAdds = false + keys []labels.Label + ) // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { From 024858344b5b6e3646485de1223cc5e6107ccab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 13:00:11 +0100 Subject: [PATCH 11/19] Comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f5900a2f2f..8c027f829e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1359,8 +1359,8 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "get postings") } - // get add/remove postings from groups. While we iterate over postingGroups and their keys - // again, this is exactly the same order as before, when building the groups, so we can simply + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. postingIndex := 0 From 6d8c48cad1d860dd56b0ccdc5ef7a2c6e4e648f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 13:05:20 +0100 Subject: [PATCH 12/19] Use allPostings and emptyPostings variables for special cases. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8c027f829e..7c4646e522 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1326,7 +1326,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } // Intersection with empty postings would return no postings anyway. - if pg.alwaysEmptyPostings() { + if pg == emptyPostings { return nil, nil } @@ -1429,11 +1429,6 @@ func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGr } } -// returns true, if this postingGroup will always return empty postings. -func (p *postingGroup) alwaysEmptyPostings() bool { - return !p.addAll && len(p.addKeys) == 0 -} - func checkNilPosting(l labels.Label, p index.Postings) index.Postings { if p == nil { // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. @@ -1442,16 +1437,21 @@ func checkNilPosting(l labels.Label, p index.Postings) index.Postings { return p } +var ( + allPostings = newPostingGroup(true, nil, nil) + emptyPostings = newPostingGroup(false, nil, nil) +) + // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { // This matches all values, including no value. If it is the only matcher, it will return all postings. if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { - return newPostingGroup(true, nil, nil), nil + return allPostings, nil } // NOT matching any value = match nothing. We can shortcut this easily. if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") { - return newPostingGroup(false, nil, nil), nil + return emptyPostings, nil } // If the matcher selects an empty value, it selects all the series which don't From f23eb712271a865deab52a09792c112f642c24ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 13:08:26 +0100 Subject: [PATCH 13/19] Unify terminology to "special All postings" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7c4646e522..1d76f70c73 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1346,10 +1346,11 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } allKeyIndex := -1 - // we only need All postings if there are no other adds. If there are, we can skip fetching ALL postings completely. + // we only need special All postings if there are no other adds. If there are, we can skip fetching + // special All postings completely. if allRequested && !hasAdds { // Remember the index (will be used later as a flag, and also to access postings), - // and ask fetchPostings to fetch ALL postings too. + // and ask fetchPostings to fetch special All postings too. allKeyIndex = len(keys) keys = append(keys, getAllPostingsKeyLabel()) } @@ -1384,7 +1385,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } if allKeyIndex >= 0 { - // If we have fetched "ALL" postings, add it. + // If we have fetched special All postings, add it. groupAdds = append(groupAdds, checkNilPosting(getAllPostingsKeyLabel(), fetchedPostings[allKeyIndex])) } @@ -1412,7 +1413,7 @@ func getAllPostingsKeyLabel() labels.Label { } // Logical result of each individual group is: -// If addAll is set: ALL postings minus postings for removeKeys labels. (No need to merge postings for addKeys in this case) +// If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case. // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels // This happens in ExpandedPostings. type postingGroup struct { @@ -1444,7 +1445,7 @@ var ( // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { - // This matches all values, including no value. If it is the only matcher, it will return all postings. + // This matches any label value, and also series that don't have this label at all. if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { return allPostings, nil } From c784dda5a00c86c6cb336a6f8778f779e9d37a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 13:20:41 +0100 Subject: [PATCH 14/19] Address feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1d76f70c73..599c1dd3bb 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1326,7 +1326,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } // Intersection with empty postings would return no postings anyway. - if pg == emptyPostings { + if pg == emptyPostingsGroup { return nil, nil } @@ -1335,8 +1335,8 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er hasAdds = hasAdds || len(pg.addKeys) > 0 // Postings returned by fetchPostings will be in the same order as keys - // so it's important that we iterate them in the same order later, - // since we don't build any label -> keys index map. + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. keys = append(keys, pg.addKeys...) keys = append(keys, pg.removeKeys...) } @@ -1365,11 +1365,11 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er // use one incrementing index to fetch postings from returned slice. postingIndex := 0 - var groupAdds, removals []index.Postings + var groupAdds, groupRemovals []index.Postings for _, g := range postingGroups { // We cannot add empty set to groupAdds, since they are intersected. if len(g.addKeys) > 0 { - var toMerge []index.Postings + toMerge := make([]index.Postings, 0, len(g.addKeys)) for _, l := range g.addKeys { toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex])) postingIndex++ @@ -1379,7 +1379,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } for _, l := range g.removeKeys { - removals = append(removals, checkNilPosting(l, fetchedPostings[postingIndex])) + groupRemovals = append(groupRemovals, checkNilPosting(l, fetchedPostings[postingIndex])) postingIndex++ } } @@ -1389,7 +1389,7 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er groupAdds = append(groupAdds, checkNilPosting(getAllPostingsKeyLabel(), fetchedPostings[allKeyIndex])) } - result := index.Without(index.Intersect(groupAdds...), index.Merge(removals...)) + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) ps, err := index.ExpandPostings(result) if err != nil { @@ -1439,20 +1439,20 @@ func checkNilPosting(l labels.Label, p index.Postings) index.Postings { } var ( - allPostings = newPostingGroup(true, nil, nil) - emptyPostings = newPostingGroup(false, nil, nil) + allPostingsGroup = newPostingGroup(true, nil, nil) + emptyPostingsGroup = newPostingGroup(false, nil, nil) ) // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { // This matches any label value, and also series that don't have this label at all. if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { - return allPostings, nil + return allPostingsGroup, nil } // NOT matching any value = match nothing. We can shortcut this easily. if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") { - return emptyPostings, nil + return emptyPostingsGroup, nil } // If the matcher selects an empty value, it selects all the series which don't From 5a13b8b1e97495bae3962fa964bc8c735f3f27c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 13:52:44 +0100 Subject: [PATCH 15/19] Added CHANGELOG.md entry. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 20b1ac5c5d..795bff7683 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. - [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more. +- [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most. ## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02 From 7122161db407fb9ff4209bc2af5dd7b037754eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 16:16:31 +0100 Subject: [PATCH 16/19] Fix check for empty group. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 599c1dd3bb..682b64247f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1325,8 +1325,9 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "toPostingGroup") } - // Intersection with empty postings would return no postings anyway. - if pg == emptyPostingsGroup { + // If this groups adds nothing, it's an empty group. Intersection with empty postings would return no postings anyway. + // E.g. label="non-existing-value" also returns empty group. + if !pg.addAll && len(pg.addKeys) == 0 { return nil, nil } From 658091d4ddfa8fdfe4337382ac69ed94a9180893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Sat, 21 Mar 2020 16:18:05 +0100 Subject: [PATCH 17/19] Comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 682b64247f..57741fda1b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1325,8 +1325,9 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "toPostingGroup") } - // If this groups adds nothing, it's an empty group. Intersection with empty postings would return no postings anyway. - // E.g. label="non-existing-value" also returns empty group. + // If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty + // postings would return no postings anyway. + // E.g. label="non-existing-value" returns empty group. if !pg.addAll && len(pg.addKeys) == 0 { return nil, nil } From 029d3013cd525dfe0609c3ac7c3dba1e54caab0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 13:49:55 +0100 Subject: [PATCH 18/19] Special All postings is now added as a new group MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No special handling required anymore. Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 57741fda1b..e2492b79f1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1347,14 +1347,15 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, nil } - allKeyIndex := -1 - // we only need special All postings if there are no other adds. If there are, we can skip fetching + // We only need special All postings if there are no other adds. If there are, we can skip fetching // special All postings completely. if allRequested && !hasAdds { - // Remember the index (will be used later as a flag, and also to access postings), - // and ask fetchPostings to fetch special All postings too. - allKeyIndex = len(keys) - keys = append(keys, getAllPostingsKeyLabel()) + // add group with label to fetch "special All postings". + name, value := index.AllPostingsKey() + allPostingsLabel := labels.Label{Name: name, Value: value} + + postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil)) + keys = append(keys, allPostingsLabel) } fetchedPostings, err := r.fetchPostings(keys) @@ -1386,11 +1387,6 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er } } - if allKeyIndex >= 0 { - // If we have fetched special All postings, add it. - groupAdds = append(groupAdds, checkNilPosting(getAllPostingsKeyLabel(), fetchedPostings[allKeyIndex])) - } - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) ps, err := index.ExpandPostings(result) @@ -1409,11 +1405,6 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return ps, nil } -func getAllPostingsKeyLabel() labels.Label { - name, value := index.AllPostingsKey() - return labels.Label{Name: name, Value: value} -} - // Logical result of each individual group is: // If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case. // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels From d9396a9a2897269455834bdf3be903f2420d9602 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 23 Mar 2020 14:10:31 +0100 Subject: [PATCH 19/19] Updated comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e2492b79f1..b9689f4f6b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1405,10 +1405,10 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return ps, nil } -// Logical result of each individual group is: +// postingGroup keeps posting keys for single matcher. Logical result of the group is: // If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case. // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels -// This happens in ExpandedPostings. +// This computation happens in ExpandedPostings. type postingGroup struct { addAll bool addKeys []labels.Label