From 1d3ef3c89fc6f15c83ff9d64a03cbd563e94cdec Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 6 Jun 2023 15:06:35 -0700 Subject: [PATCH] fix Signed-off-by: Ben Ye --- pkg/store/bucket.go | 46 ++++++++++++++++++++++++++----------- pkg/store/cache/cache.go | 3 ++- pkg/store/cache/inmemory.go | 2 +- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 04bbefbf126..175e73578ae 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2160,7 +2160,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M sort.Slice(ms, func(i, j int) bool { if ms[i].Type == ms[j].Type { if ms[i].Name == ms[j].Name { - return ms[i].Value == ms[j].Value + return ms[i].Value < ms[j].Value } return ms[i].Name < ms[j].Name } @@ -2185,6 +2185,20 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M if err != nil { return nil, errors.Wrap(err, "expand") } + + if len(ps) > 0 { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return nil, errors.Wrap(err, "get index version") + } + if version >= 2 { + for i, id := range ps { + ps[i] = id * 16 + } + } + } return ps, nil } // If failed to decode cached postings, try to expand postings again. @@ -2284,18 +2298,8 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M return nil, errors.Wrap(err, "expand") } - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() - if err != nil { - return nil, errors.Wrap(err, "get index version") - } - if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 - } - } - // Encode postings to cache. + // Encode postings to cache. We compress and cache postings before adding + // 16 bytes padding in order to make compressed size smaller. dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(index.NewListPostings(ps), len(ps)) r.stats.cachedPostingsCompressions++ r.stats.cachedPostingsCompressionErrors += compressionErrors @@ -2303,6 +2307,20 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(ps) * 4) // Estimate the posting list size. r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache) + + if len(ps) > 0 { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return nil, errors.Wrap(err, "get index version") + } + if version >= 2 { + for i, id := range ps { + ps[i] = id * 16 + } + } + } return ps, nil } @@ -2452,7 +2470,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab l, closer, err := r.decodeCachedPostings(b) if err != nil { - + return nil, closeFns, errors.Wrap(err, "decode postings") } output[ix] = l closeFns = append(closeFns, closer...) diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index a25bdad388f..7014743e9b8 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -6,11 +6,12 @@ package storecache import ( "context" "encoding/base64" + "strconv" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "golang.org/x/crypto/blake2b" - "strconv" ) const ( diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 137be4214c8..f17e6448416 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -326,7 +326,7 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [ // FetchExpandedPostings fetches expanded postings. func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - if b, ok := c.get(cacheTypePostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers))}); ok { + if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers))}); ok { return b, true } return nil, false