Skip to content

Commit

Permalink
Index Cache: Change cache key for postings (#6405)
Browse files Browse the repository at this point in the history
* extend postings cache key with codec

Signed-off-by: Ben Ye <[email protected]>

* add changelog

Signed-off-by: Ben Ye <[email protected]>

* update code back

Signed-off-by: Ben Ye <[email protected]>

* add colon

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

* fix another test

Signed-off-by: Ben Ye <[email protected]>

* add compression scheme const to remote index cache

Signed-off-by: Ben Ye <[email protected]>

* address required comments

Signed-off-by: Ben Ye <[email protected]>

* fix compression scheme name

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jun 7, 2023
1 parent 1788584 commit 6622110
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels.
- [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics
- [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings.
- [#6405](https://github.com/thanos-io/thanos/pull/6405) Index Cache: Change postings cache key to include the encoding format used so that older Thanos versions would not try to decode it during the deployment of a new version.

### Removed

Expand Down
8 changes: 7 additions & 1 deletion pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type IndexCache interface {
type cacheKey struct {
block string
key interface{}

compression string
}

func (c cacheKey) keyType() string {
Expand Down Expand Up @@ -79,7 +81,11 @@ func (c cacheKey) string() string {
// which would end up in wrong query results.
lbl := c.key.(cacheKeyPostings)
lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value))
return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:])
key := "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:])
if len(c.compression) > 0 {
key += ":" + c.compression
}
return key
case cacheKeySeries:
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
default:
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCacheKey_string(t *testing.T) {
expected string
}{
"should stringify postings cache key": {
key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})},
key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"}), ""},
expected: func() string {
hash := blake2b.Sum256([]byte("foo:bar"))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])
Expand All @@ -42,7 +42,7 @@ func TestCacheKey_string(t *testing.T) {
}(),
},
"should stringify series cache key": {
key: cacheKey{ulidString, cacheKeySeries(12345)},
key: cacheKey{ulidString, cacheKeySeries(12345), ""},
expected: fmt.Sprintf("S:%s:12345", uid.String()),
},
}
Expand All @@ -68,14 +68,14 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
"should guarantee reasonably short key length for postings": {
expectedLen: 72,
keys: []cacheKey{
{ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})},
{ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})},
{ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"}), ""},
{ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""},
},
},
"should guarantee reasonably short key length for series": {
expectedLen: 49,
keys: []cacheKey{
{ulidString, cacheKeySeries(math.MaxUint64)},
{ulidString, cacheKeySeries(math.MaxUint64), ""},
},
},
}
Expand All @@ -91,7 +91,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {

func BenchmarkCacheKey_string_Postings(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}
key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)}), ""}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -101,7 +101,7 @@ func BenchmarkCacheKey_string_Postings(b *testing.B) {

func BenchmarkCacheKey_string_Series(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64)}
key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64), ""}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.

blockIDKey := blockID.String()
for _, key := range keys {
if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key)}); ok {
if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}); ok {
hits[key] = b
continue
}
Expand All @@ -314,7 +314,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.
// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id)}, v)
c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v)
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
Expand All @@ -324,7 +324,7 @@ func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.UL

blockIDKey := blockID.String()
for _, id := range ids {
if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id)}); ok {
if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}); ok {
hits[id] = b
continue
}
Expand Down
20 changes: 13 additions & 7 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ const (
memcachedDefaultTTL = 24 * time.Hour
)

const (
compressionSchemeStreamedSnappy = "dss"
)

// RemoteIndexCache is a memcached-based index cache.
type RemoteIndexCache struct {
logger log.Logger
memcached cacheutil.RemoteCacheClient

compressionScheme string

// Metrics.
postingRequests prometheus.Counter
seriesRequests prometheus.Counter
Expand All @@ -37,8 +43,9 @@ type RemoteIndexCache struct {
// NewRemoteIndexCache makes a new RemoteIndexCache.
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, reg prometheus.Registerer) (*RemoteIndexCache, error) {
c := &RemoteIndexCache{
logger: logger,
memcached: cacheClient,
logger: logger,
memcached: cacheClient,
compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions.
}

requests := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Expand All @@ -64,8 +71,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string()

key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string()
if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
}
Expand All @@ -79,7 +85,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.

blockIDKey := blockID.String()
for _, lbl := range lbls {
key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string()
key := cacheKey{blockIDKey, cacheKeyPostings(lbl), c.compressionScheme}.string()
keys = append(keys, key)
}

Expand Down Expand Up @@ -113,7 +119,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
key := cacheKey{blockID.String(), cacheKeySeries(id)}.string()
key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err)
Expand All @@ -128,7 +134,7 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL

blockIDKey := blockID.String()
for _, id := range ids {
key := cacheKey{blockIDKey, cacheKeySeries(id)}.string()
key := cacheKey{blockIDKey, cacheKeySeries(id), ""}.string()
keys = append(keys, key)
}

Expand Down

0 comments on commit 6622110

Please sign in to comment.