From e5bdd9c01f25d4c34df8a0150e49ccbf0fe342b4 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 2 Oct 2023 00:13:43 -0700 Subject: [PATCH] support filtered index cache Signed-off-by: Ben Ye --- pkg/store/cache/factory.go | 9 ++ pkg/store/cache/filter_cache.go | 75 ++++++++++++++ pkg/store/cache/filter_cache_test.go | 145 +++++++++++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 pkg/store/cache/filter_cache.go create mode 100644 pkg/store/cache/filter_cache_test.go diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index d4e9f0c5cd5..d1b77d4c1df 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -28,6 +28,10 @@ const ( type IndexCacheConfig struct { Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` + + // Comma separated strings to enable filtered index cache. + // Available item types are Postings, Series and ExpandedPostings. + EnabledItems string `yaml:"enabled_items"` } // NewIndexCache initializes and returns new index cache. @@ -66,5 +70,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) } + + items := strings.Split(strings.Replace(cacheConfig.EnabledItems, " ", "", -1), ",") + if len(items) > 0 { + cache = NewFilteredIndexCache(cache, items) + } return cache, nil } diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go new file mode 100644 index 00000000000..96bac8bb531 --- /dev/null +++ b/pkg/store/cache/filter_cache.go @@ -0,0 +1,75 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "golang.org/x/exp/slices" +) + +type FilteredIndexCache struct { + cache IndexCache + enabledItems []string +} + +// NewFilteredIndexCache creates a filtered index cache based on enabled items. +func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { + return &FilteredIndexCache{ + cache: cache, + enabledItems: enabledItems, + } +} + +// StorePostings sets the postings identified by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + c.cache.StorePostings(blockID, l, v) + } +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + return c.cache.FetchMultiPostings(ctx, blockID, keys) + } + return nil, keys +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + c.cache.StoreExpandedPostings(blockID, matchers, v) + } +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + return c.cache.FetchExpandedPostings(ctx, blockID, matchers) + } + return nil, false +} + +// StoreSeries sets the series identified by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + c.cache.StoreSeries(blockID, id, v) + } +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + return c.cache.FetchMultiSeries(ctx, blockID, ids) + } + return nil, ids +} diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go new file mode 100644 index 00000000000..ee95740fc60 --- /dev/null +++ b/pkg/store/cache/filter_cache_test.go @@ -0,0 +1,145 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" +) + +func TestFilterCache(t *testing.T) { + blockID := ulid.MustNew(ulid.Now(), nil) + postingKeys := []labels.Label{ + {Name: "foo", Value: "bar"}, + } + expandedPostingsMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + } + testPostingData := []byte("postings") + testExpandedPostingsData := []byte("expandedPostings") + testSeriesData := []byte("series") + ctx := context.TODO() + for _, tc := range []struct { + name string + enabledItems []string + verifyFunc func(t *testing.T, c IndexCache) + }{ + { + name: "empty enabled items", + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + { + name: "all enabled items", + enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Assert(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + { + name: "only enable postings", + enabledItems: []string{cacheTypePostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, false, hit) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 1, len(misses)) + testutil.Equals(t, 0, len(seriesHit)) + }, + }, + { + name: "only enable expanded postings", + enabledItems: []string{cacheTypeExpandedPostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 1, len(missed)) + testutil.Equals(t, 0, len(hits)) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 1, len(misses)) + testutil.Equals(t, 0, len(seriesHit)) + }, + }, + { + name: "only enable series", + enabledItems: []string{cacheTypeSeries}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 1, len(missed)) + testutil.Equals(t, 0, len(hits)) + + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, false, hit) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) + testutil.Ok(t, err) + c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) + tc.verifyFunc(t, c) + }) + } +}