From 590cec06adba6fd46a8258c653251f889b772849 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 2 Oct 2023 00:13:43 -0700 Subject: [PATCH 01/12] support filtered index cache Signed-off-by: Ben Ye --- pkg/store/cache/factory.go | 9 ++ pkg/store/cache/filter_cache.go | 75 ++++++++++++++ pkg/store/cache/filter_cache_test.go | 145 +++++++++++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 pkg/store/cache/filter_cache.go create mode 100644 pkg/store/cache/filter_cache_test.go diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index d4e9f0c5cd..d1b77d4c1d 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -28,6 +28,10 @@ const ( type IndexCacheConfig struct { Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` + + // Comma separated strings to enable filtered index cache. + // Available item types are Postings, Series and ExpandedPostings. + EnabledItems string `yaml:"enabled_items"` } // NewIndexCache initializes and returns new index cache. @@ -66,5 +70,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) } + + items := strings.Split(strings.Replace(cacheConfig.EnabledItems, " ", "", -1), ",") + if len(items) > 0 { + cache = NewFilteredIndexCache(cache, items) + } return cache, nil } diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go new file mode 100644 index 0000000000..96bac8bb53 --- /dev/null +++ b/pkg/store/cache/filter_cache.go @@ -0,0 +1,75 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "golang.org/x/exp/slices" +) + +type FilteredIndexCache struct { + cache IndexCache + enabledItems []string +} + +// NewFilteredIndexCache creates a filtered index cache based on enabled items. +func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { + return &FilteredIndexCache{ + cache: cache, + enabledItems: enabledItems, + } +} + +// StorePostings sets the postings identified by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + c.cache.StorePostings(blockID, l, v) + } +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + return c.cache.FetchMultiPostings(ctx, blockID, keys) + } + return nil, keys +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + c.cache.StoreExpandedPostings(blockID, matchers, v) + } +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + return c.cache.FetchExpandedPostings(ctx, blockID, matchers) + } + return nil, false +} + +// StoreSeries sets the series identified by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + c.cache.StoreSeries(blockID, id, v) + } +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + return c.cache.FetchMultiSeries(ctx, blockID, ids) + } + return nil, ids +} diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go new file mode 100644 index 0000000000..ee95740fc6 --- /dev/null +++ b/pkg/store/cache/filter_cache_test.go @@ -0,0 +1,145 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" +) + +func TestFilterCache(t *testing.T) { + blockID := ulid.MustNew(ulid.Now(), nil) + postingKeys := []labels.Label{ + {Name: "foo", Value: "bar"}, + } + expandedPostingsMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + } + testPostingData := []byte("postings") + testExpandedPostingsData := []byte("expandedPostings") + testSeriesData := []byte("series") + ctx := context.TODO() + for _, tc := range []struct { + name string + enabledItems []string + verifyFunc func(t *testing.T, c IndexCache) + }{ + { + name: "empty enabled items", + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + { + name: "all enabled items", + enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Assert(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + { + name: "only enable postings", + enabledItems: []string{cacheTypePostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, false, hit) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 1, len(misses)) + testutil.Equals(t, 0, len(seriesHit)) + }, + }, + { + name: "only enable expanded postings", + enabledItems: []string{cacheTypeExpandedPostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 1, len(missed)) + testutil.Equals(t, 0, len(hits)) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 1, len(misses)) + testutil.Equals(t, 0, len(seriesHit)) + }, + }, + { + name: "only enable series", + enabledItems: []string{cacheTypeSeries}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) + c.StoreSeries(blockID, 1, testSeriesData) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + testutil.Equals(t, 1, len(missed)) + testutil.Equals(t, 0, len(hits)) + + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + testutil.Equals(t, false, hit) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) + testutil.Ok(t, err) + c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) + tc.verifyFunc(t, c) + }) + } +} From 03da8227363c8ed4ad19350ba3f8691acf8c6c97 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 2 Oct 2023 00:26:55 -0700 Subject: [PATCH 02/12] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1be2cc3bb..36590c239a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,9 +17,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified. - [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message. - [#6760](https://github.com/thanos-io/thanos/pull/6760) Query Frontend: Added TLS support in `--query-frontend.downstream-tripper-config` and `--query-frontend.downstream-tripper-config-file` -- [#6749](https://github.com/thanos-io/thanos/pull/6308) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache. +- [#6749](https://github.com/thanos-io/thanos/pull/6749) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache. - [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label. - [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway. +- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`. ### Changed From 35b4cc4ac17ab33de6df7fff9af64451c6bc7559 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 2 Oct 2023 09:27:46 -0700 Subject: [PATCH 03/12] fix doc Signed-off-by: Ben Ye --- docs/components/store.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/components/store.md b/docs/components/store.md index 26b359f326..4f475a3d5f 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -291,6 +291,7 @@ type: IN-MEMORY config: max_size: 0 max_item_size: 0 +enabled_items: "" ``` All the settings are **optional**: @@ -315,6 +316,7 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false +enabled_items: "" ``` The **required** settings are: @@ -362,6 +364,7 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 +enabled_items: "" ``` The **required** settings are: From 9360f3240c09135de10f3edac552dee9d5728d0c Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 2 Oct 2023 14:36:39 -0700 Subject: [PATCH 04/12] fix unit test failure Signed-off-by: Ben Ye --- pkg/store/cache/factory.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index d1b77d4c1d..2adbc45253 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -71,9 +71,11 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) } - items := strings.Split(strings.Replace(cacheConfig.EnabledItems, " ", "", -1), ",") - if len(items) > 0 { - cache = NewFilteredIndexCache(cache, items) + if len(cacheConfig.EnabledItems) > 0 { + if items := strings.Split(cacheConfig.EnabledItems, ","); len(items) > 0 { + cache = NewFilteredIndexCache(cache, items) + } } + return cache, nil } From c2b6e585b7fe77f5b7742d5bc119e4702ceaf6a7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 3 Oct 2023 19:53:24 -0700 Subject: [PATCH 05/12] add item type validation Signed-off-by: Ben Ye --- pkg/store/cache/factory.go | 5 ++++- pkg/store/cache/filter_cache.go | 14 +++++++++++--- pkg/store/cache/filter_cache_test.go | 26 +++++++++++++++++++++----- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index 2adbc45253..7824ccb0f8 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -73,7 +73,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg if len(cacheConfig.EnabledItems) > 0 { if items := strings.Split(cacheConfig.EnabledItems, ","); len(items) > 0 { - cache = NewFilteredIndexCache(cache, items) + cache, err = NewFilteredIndexCache(cache, items) + if err != nil { + return nil, err + } } } diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index 96bac8bb53..6ca02ec1d1 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -5,7 +5,7 @@ package storecache import ( "context" - + "fmt" "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -18,11 +18,19 @@ type FilteredIndexCache struct { } // NewFilteredIndexCache creates a filtered index cache based on enabled items. -func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { +func NewFilteredIndexCache(cache IndexCache, enabledItems []string) (*FilteredIndexCache, error) { + for _, item := range enabledItems { + switch item { + // valid + case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: + default: + return nil, fmt.Errorf("unsupported item type %s", item) + } + } return &FilteredIndexCache{ cache: cache, enabledItems: enabledItems, - } + }, nil } // StorePostings sets the postings identified by the ulid and label to the value v, diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go index ee95740fc6..a29ecfd088 100644 --- a/pkg/store/cache/filter_cache_test.go +++ b/pkg/store/cache/filter_cache_test.go @@ -28,10 +28,21 @@ func TestFilterCache(t *testing.T) { testSeriesData := []byte("series") ctx := context.TODO() for _, tc := range []struct { - name string - enabledItems []string - verifyFunc func(t *testing.T, c IndexCache) + name string + enabledItems []string + expectedError string + verifyFunc func(t *testing.T, c IndexCache) }{ + { + name: "invalid item type", + expectedError: "unsupported item type foo", + enabledItems: []string{"foo"}, + }, + { + name: "invalid item type with 1 valid cache type", + expectedError: "unsupported item type foo", + enabledItems: []string{cacheTypeExpandedPostings, "foo"}, + }, { name: "empty enabled items", verifyFunc: func(t *testing.T, c IndexCache) { @@ -138,8 +149,13 @@ func TestFilterCache(t *testing.T) { t.Run(tc.name, func(t *testing.T) { inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) testutil.Ok(t, err) - c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) - tc.verifyFunc(t, c) + c, err := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) + if tc.expectedError != "" { + testutil.Equals(t, tc.expectedError, err.Error()) + } else { + testutil.Ok(t, err) + tc.verifyFunc(t, c) + } }) } } From 654eab2ca7242e0b36a6c6717b331f1e7d35676b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 3 Oct 2023 19:54:11 -0700 Subject: [PATCH 06/12] lint Signed-off-by: Ben Ye --- pkg/store/cache/filter_cache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index 6ca02ec1d1..d5bd05179f 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -6,6 +6,7 @@ package storecache import ( "context" "fmt" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" From 722b68ba2cfafdd62c06bbea9915190d9cebadb2 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 3 Oct 2023 22:37:15 -0700 Subject: [PATCH 07/12] change enabled_items to []string type Signed-off-by: Ben Ye --- pkg/store/cache/factory.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index 7824ccb0f8..1eb9324acf 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -29,9 +29,8 @@ type IndexCacheConfig struct { Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` - // Comma separated strings to enable filtered index cache. // Available item types are Postings, Series and ExpandedPostings. - EnabledItems string `yaml:"enabled_items"` + EnabledItems []string `yaml:"enabled_items,omitempty"` } // NewIndexCache initializes and returns new index cache. @@ -72,11 +71,9 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg } if len(cacheConfig.EnabledItems) > 0 { - if items := strings.Split(cacheConfig.EnabledItems, ","); len(items) > 0 { - cache, err = NewFilteredIndexCache(cache, items) - if err != nil { - return nil, err - } + cache, err = NewFilteredIndexCache(cache, cacheConfig.EnabledItems) + if err != nil { + return nil, err } } From f0c198222596a0d74e0b36ae7a30460a60da2f52 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 4 Oct 2023 00:06:21 -0700 Subject: [PATCH 08/12] generate docs Signed-off-by: Ben Ye --- docs/components/store.md | 6 +++--- pkg/store/cache/factory.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 4f475a3d5f..2758ab668c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -291,7 +291,7 @@ type: IN-MEMORY config: max_size: 0 max_item_size: 0 -enabled_items: "" +enabled_items: [] ``` All the settings are **optional**: @@ -316,7 +316,7 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false -enabled_items: "" +enabled_items: [] ``` The **required** settings are: @@ -364,7 +364,7 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 -enabled_items: "" +enabled_items: [] ``` The **required** settings are: diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index 1eb9324acf..650a1b6aee 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -30,7 +30,7 @@ type IndexCacheConfig struct { Config interface{} `yaml:"config"` // Available item types are Postings, Series and ExpandedPostings. - EnabledItems []string `yaml:"enabled_items,omitempty"` + EnabledItems []string `yaml:"enabled_items"` } // NewIndexCache initializes and returns new index cache. From 31db8417c82b3371d9c2133ad70c4aea6dc5a208 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 4 Oct 2023 14:24:06 -0700 Subject: [PATCH 09/12] separate validation code Signed-off-by: Ben Ye --- pkg/store/cache/factory.go | 4 ++-- pkg/store/cache/filter_cache.go | 25 ++++++++++++++----------- pkg/store/cache/filter_cache_test.go | 3 ++- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index 650a1b6aee..4204c92f22 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -71,10 +71,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg } if len(cacheConfig.EnabledItems) > 0 { - cache, err = NewFilteredIndexCache(cache, cacheConfig.EnabledItems) - if err != nil { + if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil { return nil, err } + cache = NewFilteredIndexCache(cache, cacheConfig.EnabledItems) } return cache, nil diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index d5bd05179f..659de5c2f1 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -6,7 +6,6 @@ package storecache import ( "context" "fmt" - "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -19,19 +18,11 @@ type FilteredIndexCache struct { } // NewFilteredIndexCache creates a filtered index cache based on enabled items. -func NewFilteredIndexCache(cache IndexCache, enabledItems []string) (*FilteredIndexCache, error) { - for _, item := range enabledItems { - switch item { - // valid - case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: - default: - return nil, fmt.Errorf("unsupported item type %s", item) - } - } +func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { return &FilteredIndexCache{ cache: cache, enabledItems: enabledItems, - }, nil + } } // StorePostings sets the postings identified by the ulid and label to the value v, @@ -82,3 +73,15 @@ func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid. } return nil, ids } + +func ValidateEnabledItems(enabledItems []string) error { + for _, item := range enabledItems { + switch item { + // valid + case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: + default: + return fmt.Errorf("unsupported item type %s", item) + } + } + return nil +} diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go index a29ecfd088..4c136dfbb2 100644 --- a/pkg/store/cache/filter_cache_test.go +++ b/pkg/store/cache/filter_cache_test.go @@ -149,11 +149,12 @@ func TestFilterCache(t *testing.T) { t.Run(tc.name, func(t *testing.T) { inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) testutil.Ok(t, err) - c, err := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) + err = ValidateEnabledItems(tc.enabledItems) if tc.expectedError != "" { testutil.Equals(t, tc.expectedError, err.Error()) } else { testutil.Ok(t, err) + c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) tc.verifyFunc(t, c) } }) From bf860d578bf14564f9512b541763ac31d212037d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 4 Oct 2023 16:13:09 -0700 Subject: [PATCH 10/12] fix lint Signed-off-by: Ben Ye --- pkg/store/cache/filter_cache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index 659de5c2f1..fdf998d585 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -6,6 +6,7 @@ package storecache import ( "context" "fmt" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" From a6b537609f36988f644b7769ebf250c06994334b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 5 Oct 2023 09:15:57 -0700 Subject: [PATCH 11/12] update doc Signed-off-by: Ben Ye --- docs/components/store.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/components/store.md b/docs/components/store.md index 2758ab668c..6c0f713411 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -298,6 +298,7 @@ All the settings are **optional**: - `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`). - `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`). +- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. ### Memcached index cache @@ -334,6 +335,7 @@ While the remaining settings are **optional**: - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. - `auto_discovery`: whether to use the auto-discovery mechanism for memcached. +- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. ### Redis index cache @@ -380,6 +382,7 @@ While the remaining settings are **optional**: - `read_timeout`: the redis read timeout. - `write_timeout`: the redis write timeout. - `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s). +- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. Here is an example of what effect client-side caching could have: From 7501e882d345681cefb23c3203c4e51705bb7234 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 5 Oct 2023 09:35:46 -0700 Subject: [PATCH 12/12] fix interface Signed-off-by: Ben Ye --- pkg/store/cache/filter_cache.go | 24 +++++------ pkg/store/cache/filter_cache_test.go | 62 ++++++++++++++-------------- 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go index fdf998d585..193f7363a2 100644 --- a/pkg/store/cache/filter_cache.go +++ b/pkg/store/cache/filter_cache.go @@ -28,49 +28,49 @@ func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredInd // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. -func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { +func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { - c.cache.StorePostings(blockID, l, v) + c.cache.StorePostings(blockID, l, v, tenant) } } // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. -func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { +func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { - return c.cache.FetchMultiPostings(ctx, blockID, keys) + return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant) } return nil, keys } // StoreExpandedPostings stores expanded postings for a set of label matchers. -func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { +func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { - c.cache.StoreExpandedPostings(blockID, matchers, v) + c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) } } // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. -func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { +func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { - return c.cache.FetchExpandedPostings(ctx, blockID, matchers) + return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant) } return nil, false } // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. -func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { +func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { - c.cache.StoreSeries(blockID, id, v) + c.cache.StoreSeries(blockID, id, v, tenant) } } // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. -func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { +func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { - return c.cache.FetchMultiSeries(ctx, blockID, ids) + return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant) } return nil, ids } diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go index 4c136dfbb2..0616bc679e 100644 --- a/pkg/store/cache/filter_cache_test.go +++ b/pkg/store/cache/filter_cache_test.go @@ -13,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + + "github.com/thanos-io/thanos/pkg/tenancy" ) func TestFilterCache(t *testing.T) { @@ -46,19 +48,19 @@ func TestFilterCache(t *testing.T) { { name: "empty enabled items", verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) - c.StoreSeries(blockID, 1, testSeriesData) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) - hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 0, len(missed)) testutil.Equals(t, testPostingData, hits[postingKeys[0]]) - ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) testutil.Equals(t, true, hit) testutil.Equals(t, testExpandedPostingsData, ep) - seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 0, len(misses)) testutil.Equals(t, testSeriesData, seriesHit[1]) }, @@ -67,19 +69,19 @@ func TestFilterCache(t *testing.T) { name: "all enabled items", enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) - c.StoreSeries(blockID, 1, testSeriesData) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) - hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 0, len(missed)) testutil.Equals(t, testPostingData, hits[postingKeys[0]]) - ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) testutil.Assert(t, true, hit) testutil.Equals(t, testExpandedPostingsData, ep) - seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 0, len(misses)) testutil.Equals(t, testSeriesData, seriesHit[1]) }, @@ -88,18 +90,18 @@ func TestFilterCache(t *testing.T) { name: "only enable postings", enabledItems: []string{cacheTypePostings}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) - c.StoreSeries(blockID, 1, testSeriesData) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) - hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 0, len(missed)) testutil.Equals(t, testPostingData, hits[postingKeys[0]]) - _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) testutil.Equals(t, false, hit) - seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 1, len(misses)) testutil.Equals(t, 0, len(seriesHit)) }, @@ -108,19 +110,19 @@ func TestFilterCache(t *testing.T) { name: "only enable expanded postings", enabledItems: []string{cacheTypeExpandedPostings}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) - c.StoreSeries(blockID, 1, testSeriesData) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) - hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 1, len(missed)) testutil.Equals(t, 0, len(hits)) - ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) testutil.Equals(t, true, hit) testutil.Equals(t, testExpandedPostingsData, ep) - seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 1, len(misses)) testutil.Equals(t, 0, len(seriesHit)) }, @@ -129,18 +131,18 @@ func TestFilterCache(t *testing.T) { name: "only enable series", enabledItems: []string{cacheTypeSeries}, verifyFunc: func(t *testing.T, c IndexCache) { - c.StorePostings(blockID, postingKeys[0], testPostingData) - c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) - c.StoreSeries(blockID, 1, testSeriesData) + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) - hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) testutil.Equals(t, 1, len(missed)) testutil.Equals(t, 0, len(hits)) - _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) testutil.Equals(t, false, hit) - seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) testutil.Equals(t, 0, len(misses)) testutil.Equals(t, testSeriesData, seriesHit[1]) },