diff --git a/CHANGELOG.md b/CHANGELOG.md index 20b1ac5c5d..795bff7683 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. - [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more. +- [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most. ## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02 diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 15f818d956..b9689f4f6b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1310,7 +1310,12 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) { - var postingGroups []*postingGroup + var ( + postingGroups []*postingGroup + allRequested = false + hasAdds = false + keys []labels.Label + ) // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { @@ -1320,23 +1325,71 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return nil, errors.Wrap(err, "toPostingGroup") } + // If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty + // postings would return no postings anyway. + // E.g. label="non-existing-value" returns empty group. + if !pg.addAll && len(pg.addKeys) == 0 { + return nil, nil + } + postingGroups = append(postingGroups, pg) + allRequested = allRequested || pg.addAll + hasAdds = hasAdds || len(pg.addKeys) > 0 + + // Postings returned by fetchPostings will be in the same order as keys + // so it's important that we iterate them in the same order later. + // We don't have any other way of pairing keys and fetched postings. + keys = append(keys, pg.addKeys...) + keys = append(keys, pg.removeKeys...) } if len(postingGroups) == 0 { return nil, nil } - if err := r.fetchPostings(postingGroups); err != nil { + // We only need special All postings if there are no other adds. If there are, we can skip fetching + // special All postings completely. + if allRequested && !hasAdds { + // add group with label to fetch "special All postings". + name, value := index.AllPostingsKey() + allPostingsLabel := labels.Label{Name: name, Value: value} + + postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil)) + keys = append(keys, allPostingsLabel) + } + + fetchedPostings, err := r.fetchPostings(keys) + if err != nil { return nil, errors.Wrap(err, "get postings") } - var postings []index.Postings + // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys + // again, and this is exactly the same order as before (when building the groups), so we can simply + // use one incrementing index to fetch postings from returned slice. + postingIndex := 0 + + var groupAdds, groupRemovals []index.Postings for _, g := range postingGroups { - postings = append(postings, g.Postings()) + // We cannot add empty set to groupAdds, since they are intersected. + if len(g.addKeys) > 0 { + toMerge := make([]index.Postings, 0, len(g.addKeys)) + for _, l := range g.addKeys { + toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex])) + postingIndex++ + } + + groupAdds = append(groupAdds, index.Merge(toMerge...)) + } + + for _, l := range g.removeKeys { + groupRemovals = append(groupRemovals, checkNilPosting(l, fetchedPostings[postingIndex])) + postingIndex++ + } } - ps, err := index.ExpandPostings(index.Intersect(postings...)) + result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + + ps, err := index.ExpandPostings(result) if err != nil { return nil, errors.Wrap(err, "expand") } @@ -1352,82 +1405,71 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er return ps, nil } +// postingGroup keeps posting keys for single matcher. Logical result of the group is: +// If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case. +// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels +// This computation happens in ExpandedPostings. type postingGroup struct { - keys labels.Labels - postings []index.Postings - - aggregate func(postings []index.Postings) index.Postings + addAll bool + addKeys []labels.Label + removeKeys []labels.Label } -func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup { +func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup { return &postingGroup{ - keys: keys, - postings: make([]index.Postings, len(keys)), - aggregate: aggr, + addAll: addAll, + addKeys: addKeys, + removeKeys: removeKeys, } } -func (p *postingGroup) Fill(i int, posting index.Postings) { - p.postings[i] = posting -} - -func (p *postingGroup) Postings() index.Postings { - if len(p.keys) == 0 { - return index.EmptyPostings() +func checkNilPosting(l labels.Label, p index.Postings) index.Postings { + if p == nil { + // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. + return index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l)) } - - for i, posting := range p.postings { - if posting == nil { - // This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874. - return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i])) - } - } - - return p.aggregate(p.postings) -} - -func merge(p []index.Postings) index.Postings { - return index.Merge(p...) + return p } -func allWithout(p []index.Postings) index.Postings { - return index.Without(p[0], index.Merge(p[1:]...)) -} +var ( + allPostingsGroup = newPostingGroup(true, nil, nil) + emptyPostingsGroup = newPostingGroup(false, nil, nil) +) // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { - var matchingLabels labels.Labels + // This matches any label value, and also series that don't have this label at all. + if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") { + return allPostingsGroup, nil + } + + // NOT matching any value = match nothing. We can shortcut this easily. + if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") { + return emptyPostingsGroup, nil + } // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555. if m.Matches("") { - allName, allValue := index.AllPostingsKey() - - matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue}) vals, err := lvalsFn(m.Name) if err != nil { return nil, err } + + var toRemove []labels.Label for _, val := range vals { if !m.Matches(val) { - matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val}) + toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val}) } } - if len(matchingLabels) == 1 { - // This is known hack to return all series. - // Ask for x != . Allow for that as Prometheus does, - // even though it is expensive. - return newPostingGroup(matchingLabels, merge), nil - } - - return newPostingGroup(matchingLabels, allWithout), nil + return newPostingGroup(true, nil, toRemove), nil } // Fast-path for equal matching. if m.Type == labels.MatchEqual { - return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil + return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil } vals, err := lvalsFn(m.Name) @@ -1435,67 +1477,64 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match return nil, err } + var toAdd []labels.Label for _, val := range vals { if m.Matches(val) { - matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val}) + toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val}) } } - return newPostingGroup(matchingLabels, merge), nil + return newPostingGroup(false, toAdd, nil), nil } type postingPtr struct { - groupID int - keyID int - ptr index.Range + keyID int + ptr index.Range } // fetchPostings fill postings requested by posting groups. -func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { +// It returns one postings for each key, in the same order. +// If postings for given key is not fetched, entry at given index will be nil. +func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) { var ptrs []postingPtr - // Fetch postings from the cache with a single call. - keys := make([]labels.Label, 0) - for _, g := range groups { - keys = append(keys, g.keys...) - } + output := make([]index.Postings, len(keys)) + // Fetch postings from the cache with a single call. fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys) // Iterate over all groups and fetch posting from cache. // If we have a miss, mark key to be fetched in `ptrs` slice. // Overlaps are well handled by partitioner, so we don't need to deduplicate keys. - for i, g := range groups { - for j, key := range g.keys { - // Get postings for the given key from cache first. - if b, ok := fromCache[key]; ok { - r.stats.postingsTouched++ - r.stats.postingsTouchedSizeSum += len(b) - - _, l, err := r.dec.Postings(b) - if err != nil { - return errors.Wrap(err, "decode postings") - } + for ix, key := range keys { + // Get postings for the given key from cache first. + if b, ok := fromCache[key]; ok { + r.stats.postingsTouched++ + r.stats.postingsTouchedSizeSum += len(b) - g.Fill(j, l) - continue + _, l, err := r.dec.Postings(b) + if err != nil { + return nil, errors.Wrap(err, "decode postings") } - // Cache miss; save pointer for actual posting in index stored in object store. - ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) - if err == indexheader.NotFoundRangeErr { - // This block does not have any posting for given key. - g.Fill(j, index.EmptyPostings()) - continue - } + output[ix] = l + continue + } - if err != nil { - return errors.Wrap(err, "index header PostingsOffset") - } + // Cache miss; save pointer for actual posting in index stored in object store. + ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) + if err == indexheader.NotFoundRangeErr { + // This block does not have any posting for given key. + output[ix] = index.EmptyPostings() + continue + } - r.stats.postingsToFetch++ - ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j}) + if err != nil { + return nil, errors.Wrap(err, "index header PostingsOffset") } + + r.stats.postingsToFetch++ + ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: ix}) } sort.Slice(ptrs, func(i, j int) bool { @@ -1543,8 +1582,8 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { r.mtx.Lock() // Return postings and fill LRU cache. // Truncate first 4 bytes which are length of posting. - groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:])) - r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes) + output[p.keyID] = newBigEndianPostings(pBytes[4:]) + r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ @@ -1555,7 +1594,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { }) } - return g.Wait() + return output, g.Wait() } func resizePostings(b []byte) ([]byte, error) {