-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
index cache: Cache expanded postings #6420
Changes from all commits
f6158bb
1fd7b5d
26ef978
c534bc1
220a98f
0049327
f102f7f
6dd60ae
4f0d687
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -374,6 +374,11 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label | |
return map[labels.Label][]byte{}, keys | ||
} | ||
|
||
func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte) {} | ||
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher) ([]byte, bool) { | ||
return []byte{}, false | ||
} | ||
|
||
func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {} | ||
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { | ||
return map[storage.SeriesRef][]byte{}, ids | ||
|
@@ -2151,6 +2156,23 @@ func (r *bucketIndexReader) reset() { | |
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by | ||
// single label name=value. | ||
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { | ||
// Sort matchers to make sure we generate the same cache key. | ||
sort.Slice(ms, func(i, j int) bool { | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ms[i].Type == ms[j].Type { | ||
if ms[i].Name == ms[j].Name { | ||
return ms[i].Value < ms[j].Value | ||
} | ||
return ms[i].Name < ms[j].Name | ||
} | ||
return ms[i].Type < ms[j].Type | ||
}) | ||
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if hit { | ||
return postings, nil | ||
} | ||
var ( | ||
postingGroups []*postingGroup | ||
allRequested = false | ||
|
@@ -2246,18 +2268,29 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M | |
return nil, errors.Wrap(err, "expand") | ||
} | ||
|
||
// As of version two all series entries are 16 byte padded. All references | ||
// we get have to account for that to get the correct offset. | ||
version, err := r.block.indexHeaderReader.IndexVersion() | ||
if err != nil { | ||
return nil, errors.Wrap(err, "get index version") | ||
} | ||
if version >= 2 { | ||
for i, id := range ps { | ||
ps[i] = id * 16 | ||
// Encode postings to cache. We compress and cache postings before adding | ||
// 16 bytes padding in order to make compressed size smaller. | ||
dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(index.NewListPostings(ps), len(ps)) | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
r.stats.cachedPostingsCompressions++ | ||
r.stats.cachedPostingsCompressionErrors += compressionErrors | ||
r.stats.CachedPostingsCompressionTimeSum += compressionDuration | ||
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) | ||
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(ps) * 4) // Estimate the posting list size. | ||
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache) | ||
|
||
if len(ps) > 0 { | ||
// As of version two all series entries are 16 byte padded. All references | ||
// we get have to account for that to get the correct offset. | ||
version, err := r.block.indexHeaderReader.IndexVersion() | ||
if err != nil { | ||
return nil, errors.Wrap(err, "get index version") | ||
} | ||
if version >= 2 { | ||
for i, id := range ps { | ||
ps[i] = id * 16 | ||
} | ||
} | ||
} | ||
|
||
return ps, nil | ||
} | ||
|
||
|
@@ -2374,6 +2407,51 @@ type postingPtr struct { | |
ptr index.Range | ||
} | ||
|
||
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) { | ||
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms) | ||
if !hit { | ||
return false, nil, nil | ||
} | ||
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { | ||
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) | ||
} | ||
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache)) | ||
r.stats.postingsTouched++ | ||
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache)) | ||
p, closeFns, err := r.decodeCachedPostings(dataFromCache) | ||
defer func() { | ||
for _, closeFn := range closeFns { | ||
closeFn() | ||
} | ||
}() | ||
// If failed to decode or expand cached postings, return and expand postings again. | ||
if err != nil { | ||
level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) | ||
return false, nil, nil | ||
} | ||
|
||
ps, err := index.ExpandPostings(p) | ||
if err != nil { | ||
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) | ||
return false, nil, nil | ||
} | ||
|
||
if len(ps) > 0 { | ||
// As of version two all series entries are 16 byte padded. All references | ||
// we get have to account for that to get the correct offset. | ||
version, err := r.block.indexHeaderReader.IndexVersion() | ||
if err != nil { | ||
return false, nil, errors.Wrap(err, "get index version") | ||
} | ||
if version >= 2 { | ||
for i, id := range ps { | ||
ps[i] = id * 16 | ||
} | ||
} | ||
} | ||
return true, ps, nil | ||
} | ||
|
||
// fetchPostings fill postings requested by posting groups. | ||
// It returns one posting for each key, in the same order. | ||
// If postings for given key is not fetched, entry at given index will be nil. | ||
|
@@ -2405,32 +2483,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab | |
r.stats.postingsTouched++ | ||
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b)) | ||
|
||
// Even if this instance is not using compression, there may be compressed | ||
// entries in the cache written by other stores. | ||
var ( | ||
l index.Postings | ||
err error | ||
) | ||
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { | ||
s := time.Now() | ||
clPostings, err := decodePostings(b) | ||
r.stats.cachedPostingsDecompressions += 1 | ||
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) | ||
if err != nil { | ||
r.stats.cachedPostingsDecompressionErrors += 1 | ||
} else { | ||
closeFns = append(closeFns, clPostings.close) | ||
l = clPostings | ||
} | ||
} else { | ||
_, l, err = r.dec.Postings(b) | ||
} | ||
|
||
l, closer, err := r.decodeCachedPostings(b) | ||
if err != nil { | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nil, closeFns, errors.Wrap(err, "decode postings") | ||
} | ||
|
||
output[ix] = l | ||
closeFns = append(closeFns, closer...) | ||
continue | ||
} | ||
|
||
|
@@ -2502,27 +2560,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab | |
return err | ||
} | ||
|
||
dataToCache := pBytes | ||
|
||
compressionTime := time.Duration(0) | ||
compressions, compressionErrors, compressedSize := 0, 0, 0 | ||
|
||
// Reencode postings before storing to cache. If that fails, we store original bytes. | ||
// This can only fail, if postings data was somehow corrupted, | ||
// and there is nothing we can do about it. | ||
// Errors from corrupted postings will be reported when postings are used. | ||
compressions++ | ||
s := time.Now() | ||
bep := newBigEndianPostings(pBytes[4:]) | ||
data, err := diffVarintSnappyStreamedEncode(bep, bep.length()) | ||
compressionTime = time.Since(s) | ||
if err == nil { | ||
dataToCache = data | ||
compressedSize = len(data) | ||
} else { | ||
compressionErrors = 1 | ||
} | ||
|
||
dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length()) | ||
r.mtx.Lock() | ||
// Return postings and fill LRU cache. | ||
// Truncate first 4 bytes which are length of posting. | ||
|
@@ -2533,7 +2576,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab | |
// If we just fetched it we still have to update the stats for touched postings. | ||
r.stats.postingsTouched++ | ||
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes)) | ||
r.stats.cachedPostingsCompressions += compressions | ||
r.stats.cachedPostingsCompressions += 1 | ||
r.stats.cachedPostingsCompressionErrors += compressionErrors | ||
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes)) | ||
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) | ||
|
@@ -2547,6 +2590,47 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab | |
return output, closeFns, g.Wait() | ||
} | ||
|
||
func (r *bucketIndexReader) decodeCachedPostings(b []byte) (index.Postings, []func(), error) { | ||
// Even if this instance is not using compression, there may be compressed | ||
// entries in the cache written by other stores. | ||
var ( | ||
l index.Postings | ||
err error | ||
closeFns []func() | ||
) | ||
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { | ||
s := time.Now() | ||
clPostings, err := decodePostings(b) | ||
r.stats.cachedPostingsDecompressions += 1 | ||
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) | ||
if err != nil { | ||
r.stats.cachedPostingsDecompressionErrors += 1 | ||
} else { | ||
closeFns = append(closeFns, clPostings.close) | ||
l = clPostings | ||
} | ||
} else { | ||
_, l, err = r.dec.Postings(b) | ||
} | ||
return l, closeFns, err | ||
} | ||
|
||
func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int) ([]byte, time.Duration, int, int) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small nit but maybe instead of returning multiple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that makes sense and I wanted to do the same at the beginning. But not all situations need to hold the lock so I want to simplify the logic and the caller can decide how to update the stats and add lock. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, that makes sense. Maybe it is logical to convert those fields to atomic types to avoid holding a lock in a separate PR so that it could simplify this function's return values. |
||
var dataToCache []byte | ||
compressionTime := time.Duration(0) | ||
compressionErrors, compressedSize := 0, 0 | ||
s := time.Now() | ||
data, err := diffVarintSnappyStreamedEncode(p, length) | ||
compressionTime = time.Since(s) | ||
if err == nil { | ||
dataToCache = data | ||
compressedSize = len(data) | ||
} else { | ||
compressionErrors = 1 | ||
} | ||
return dataToCache, compressionTime, compressionErrors, compressedSize | ||
} | ||
|
||
func resizePostings(b []byte) ([]byte, error) { | ||
d := encoding.Decbuf{B: b} | ||
n := d.Be32int() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"context" | ||
"encoding/base64" | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"strconv" | ||
"strings" | ||
|
||
"github.com/oklog/ulid" | ||
"github.com/prometheus/prometheus/model/labels" | ||
|
@@ -15,8 +16,9 @@ import ( | |
) | ||
|
||
const ( | ||
cacheTypePostings string = "Postings" | ||
cacheTypeSeries string = "Series" | ||
cacheTypePostings string = "Postings" | ||
cacheTypeExpandedPostings string = "ExpandedPostings" | ||
cacheTypeSeries string = "Series" | ||
|
||
sliceHeaderSize = 16 | ||
) | ||
|
@@ -38,6 +40,12 @@ type IndexCache interface { | |
// and returns a map containing cache hits, along with a list of missing keys. | ||
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) | ||
|
||
// StoreExpandedPostings stores expanded postings for a set of label matchers. | ||
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) | ||
|
||
// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. | ||
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) | ||
|
||
// StoreSeries stores a single series. | ||
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) | ||
|
||
|
@@ -59,6 +67,8 @@ func (c cacheKey) keyType() string { | |
return cacheTypePostings | ||
case cacheKeySeries: | ||
return cacheTypeSeries | ||
case cacheKeyExpandedPostings: | ||
return cacheTypeExpandedPostings | ||
} | ||
return "<unknown>" | ||
} | ||
|
@@ -68,6 +78,8 @@ func (c cacheKey) size() uint64 { | |
case cacheKeyPostings: | ||
// ULID + 2 slice headers + number of chars in value and name. | ||
return ulidSize + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name)) | ||
case cacheKeyExpandedPostings: | ||
return ulidSize + sliceHeaderSize + uint64(len(k)) | ||
case cacheKeySeries: | ||
return ulidSize + 8 // ULID + uint64. | ||
} | ||
|
@@ -86,12 +98,34 @@ func (c cacheKey) string() string { | |
key += ":" + c.compression | ||
} | ||
return key | ||
case cacheKeyExpandedPostings: | ||
// Use cryptographically hash functions to avoid hash collisions | ||
// which would end up in wrong query results. | ||
matchers := c.key.(cacheKeyExpandedPostings) | ||
matchersHash := blake2b.Sum256([]byte(matchers)) | ||
key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also encode the index header version (https://github.com/thanos-io/thanos/pull/6420/files#diff-3e2896fafa6ff73509c77df2c4389b68828e02575bb4fb78b6c34bcfb922a7ceR2442) because that leads to different results? 🤔 This is probably a theoretical problem but still it probably makes sense to be safer than sorry 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't get it. Block is immutable so since we include the block ID here the version should be the same? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, for about that invariant. My idea was that if in case another index version comes out in the future, it would be enough to only rewrite the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see, you mean we even changed the format of the postings? Like we changed posting list into roaring bitmap or something else? |
||
if len(c.compression) > 0 { | ||
key += ":" + c.compression | ||
} | ||
return key | ||
case cacheKeySeries: | ||
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) | ||
default: | ||
return "" | ||
} | ||
} | ||
|
||
func labelMatchersToString(matchers []*labels.Matcher) string { | ||
sb := strings.Builder{} | ||
for i, lbl := range matchers { | ||
sb.WriteString(lbl.String()) | ||
if i < len(matchers)-1 { | ||
sb.WriteRune(';') | ||
} | ||
} | ||
return sb.String() | ||
} | ||
|
||
type cacheKeyPostings labels.Label | ||
type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache. | ||
type cacheKeySeries uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can move this new section into a function to reduce some nesting with early returns, e.g.
!hit
anderr != nil
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with the new commit