Skip to content

Commit

Permalink
support filtered index cache
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Oct 2, 2023
1 parent 531cdb1 commit e5bdd9c
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pkg/store/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
type IndexCacheConfig struct {
Type IndexCacheProvider `yaml:"type"`
Config interface{} `yaml:"config"`

// Comma separated strings to enable filtered index cache.
// Available item types are Postings, Series and ExpandedPostings.
EnabledItems string `yaml:"enabled_items"`
}

// NewIndexCache initializes and returns new index cache.
Expand Down Expand Up @@ -66,5 +70,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type))
}

items := strings.Split(strings.Replace(cacheConfig.EnabledItems, " ", "", -1), ",")
if len(items) > 0 {
cache = NewFilteredIndexCache(cache, items)
}
return cache, nil
}
75 changes: 75 additions & 0 deletions pkg/store/cache/filter_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"golang.org/x/exp/slices"
)

type FilteredIndexCache struct {
cache IndexCache
enabledItems []string
}

// NewFilteredIndexCache creates a filtered index cache based on enabled items.
func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache {
return &FilteredIndexCache{
cache: cache,
enabledItems: enabledItems,
}
}

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
c.cache.StorePostings(blockID, l, v)
}
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
return c.cache.FetchMultiPostings(ctx, blockID, keys)
}
return nil, keys
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
c.cache.StoreExpandedPostings(blockID, matchers, v)
}
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
return c.cache.FetchExpandedPostings(ctx, blockID, matchers)
}
return nil, false
}

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
c.cache.StoreSeries(blockID, id, v)
}
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
return c.cache.FetchMultiSeries(ctx, blockID, ids)
}
return nil, ids
}
145 changes: 145 additions & 0 deletions pkg/store/cache/filter_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
)

func TestFilterCache(t *testing.T) {
blockID := ulid.MustNew(ulid.Now(), nil)
postingKeys := []labels.Label{
{Name: "foo", Value: "bar"},
}
expandedPostingsMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}
testPostingData := []byte("postings")
testExpandedPostingsData := []byte("expandedPostings")
testSeriesData := []byte("series")
ctx := context.TODO()
for _, tc := range []struct {
name string
enabledItems []string
verifyFunc func(t *testing.T, c IndexCache)
}{
{
name: "empty enabled items",
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
{
name: "all enabled items",
enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
testutil.Assert(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
{
name: "only enable postings",
enabledItems: []string{cacheTypePostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
},
{
name: "only enable expanded postings",
enabledItems: []string{cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
},
{
name: "only enable series",
enabledItems: []string{cacheTypeSeries},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData)
c.StoreSeries(blockID, 1, testSeriesData)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1})
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
} {
t.Run(tc.name, func(t *testing.T) {
inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig)
testutil.Ok(t, err)
c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems)
tc.verifyFunc(t, c)
})
}
}

0 comments on commit e5bdd9c

Please sign in to comment.