From 3e055448b57e00cf523519beb78f73b7ebf96d70 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 5 Oct 2023 21:36:59 -0700 Subject: [PATCH] Support filtered index cache (#6765) * support filtered index cache Signed-off-by: Ben Ye * changelog Signed-off-by: Ben Ye * fix doc Signed-off-by: Ben Ye * fix unit test failure Signed-off-by: Ben Ye * add item type validation Signed-off-by: Ben Ye * lint Signed-off-by: Ben Ye * change enabled_items to []string type Signed-off-by: Ben Ye * generate docs Signed-off-by: Ben Ye * separate validation code Signed-off-by: Ben Ye * fix lint Signed-off-by: Ben Ye * update doc Signed-off-by: Ben Ye * fix interface Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- CHANGELOG.md | 3 +- docs/components/store.md | 6 + pkg/store/cache/factory.go | 11 ++ pkg/store/cache/filter_cache.go | 88 ++++++++++++++ pkg/store/cache/filter_cache_test.go | 164 +++++++++++++++++++++++++++ 5 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 pkg/store/cache/filter_cache.go create mode 100644 pkg/store/cache/filter_cache_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e8da580bac3..829356c9a15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,9 +17,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified. - [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message. - [#6760](https://github.com/thanos-io/thanos/pull/6760) Query Frontend: Added TLS support in `--query-frontend.downstream-tripper-config` and `--query-frontend.downstream-tripper-config-file` -- [#6749](https://github.com/thanos-io/thanos/pull/6308) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache. +- [#6749](https://github.com/thanos-io/thanos/pull/6749) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache. - [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label. - [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway. +- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`. ### Changed diff --git a/docs/components/store.md b/docs/components/store.md index 26b359f3267..6c0f713411c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -291,12 +291,14 @@ type: IN-MEMORY config: max_size: 0 max_item_size: 0 +enabled_items: [] ``` All the settings are **optional**: - `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`). - `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`). +- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. ### Memcached index cache @@ -315,6 +317,7 @@ config: max_get_multi_batch_size: 0 dns_provider_update_interval: 0s auto_discovery: false +enabled_items: [] ``` The **required** settings are: @@ -332,6 +335,7 @@ While the remaining settings are **optional**: - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. - `auto_discovery`: whether to use the auto-discovery mechanism for memcached. +- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. ### Redis index cache @@ -362,6 +366,7 @@ config: master_name: "" max_async_buffer_size: 10000 max_async_concurrency: 20 +enabled_items: [] ``` The **required** settings are: @@ -377,6 +382,7 @@ While the remaining settings are **optional**: - `read_timeout`: the redis read timeout. - `write_timeout`: the redis write timeout. - `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s). +- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached. Here is an example of what effect client-side caching could have: diff --git a/pkg/store/cache/factory.go b/pkg/store/cache/factory.go index d4e9f0c5cd5..4204c92f22b 100644 --- a/pkg/store/cache/factory.go +++ b/pkg/store/cache/factory.go @@ -28,6 +28,9 @@ const ( type IndexCacheConfig struct { Type IndexCacheProvider `yaml:"type"` Config interface{} `yaml:"config"` + + // Available item types are Postings, Series and ExpandedPostings. + EnabledItems []string `yaml:"enabled_items"` } // NewIndexCache initializes and returns new index cache. @@ -66,5 +69,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type)) } + + if len(cacheConfig.EnabledItems) > 0 { + if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil { + return nil, err + } + cache = NewFilteredIndexCache(cache, cacheConfig.EnabledItems) + } + return cache, nil } diff --git a/pkg/store/cache/filter_cache.go b/pkg/store/cache/filter_cache.go new file mode 100644 index 00000000000..193f7363a26 --- /dev/null +++ b/pkg/store/cache/filter_cache.go @@ -0,0 +1,88 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "fmt" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "golang.org/x/exp/slices" +) + +type FilteredIndexCache struct { + cache IndexCache + enabledItems []string +} + +// NewFilteredIndexCache creates a filtered index cache based on enabled items. +func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { + return &FilteredIndexCache{ + cache: cache, + enabledItems: enabledItems, + } +} + +// StorePostings sets the postings identified by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + c.cache.StorePostings(blockID, l, v, tenant) + } +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { + return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant) + } + return nil, keys +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) + } +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { + return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant) + } + return nil, false +} + +// StoreSeries sets the series identified by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. +func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + c.cache.StoreSeries(blockID, id, v, tenant) + } +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { + return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant) + } + return nil, ids +} + +func ValidateEnabledItems(enabledItems []string) error { + for _, item := range enabledItems { + switch item { + // valid + case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: + default: + return fmt.Errorf("unsupported item type %s", item) + } + } + return nil +} diff --git a/pkg/store/cache/filter_cache_test.go b/pkg/store/cache/filter_cache_test.go new file mode 100644 index 00000000000..0616bc679e3 --- /dev/null +++ b/pkg/store/cache/filter_cache_test.go @@ -0,0 +1,164 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "context" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/thanos-io/thanos/pkg/tenancy" +) + +func TestFilterCache(t *testing.T) { + blockID := ulid.MustNew(ulid.Now(), nil) + postingKeys := []labels.Label{ + {Name: "foo", Value: "bar"}, + } + expandedPostingsMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + } + testPostingData := []byte("postings") + testExpandedPostingsData := []byte("expandedPostings") + testSeriesData := []byte("series") + ctx := context.TODO() + for _, tc := range []struct { + name string + enabledItems []string + expectedError string + verifyFunc func(t *testing.T, c IndexCache) + }{ + { + name: "invalid item type", + expectedError: "unsupported item type foo", + enabledItems: []string{"foo"}, + }, + { + name: "invalid item type with 1 valid cache type", + expectedError: "unsupported item type foo", + enabledItems: []string{cacheTypeExpandedPostings, "foo"}, + }, + { + name: "empty enabled items", + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) + testutil.Equals(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + { + name: "all enabled items", + enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) + testutil.Assert(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + { + name: "only enable postings", + enabledItems: []string{cacheTypePostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(missed)) + testutil.Equals(t, testPostingData, hits[postingKeys[0]]) + + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) + testutil.Equals(t, false, hit) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(misses)) + testutil.Equals(t, 0, len(seriesHit)) + }, + }, + { + name: "only enable expanded postings", + enabledItems: []string{cacheTypeExpandedPostings}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(missed)) + testutil.Equals(t, 0, len(hits)) + + ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) + testutil.Equals(t, true, hit) + testutil.Equals(t, testExpandedPostingsData, ep) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(misses)) + testutil.Equals(t, 0, len(seriesHit)) + }, + }, + { + name: "only enable series", + enabledItems: []string{cacheTypeSeries}, + verifyFunc: func(t *testing.T, c IndexCache) { + c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) + c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) + c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) + + hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) + testutil.Equals(t, 1, len(missed)) + testutil.Equals(t, 0, len(hits)) + + _, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) + testutil.Equals(t, false, hit) + + seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) + testutil.Equals(t, 0, len(misses)) + testutil.Equals(t, testSeriesData, seriesHit[1]) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) + testutil.Ok(t, err) + err = ValidateEnabledItems(tc.enabledItems) + if tc.expectedError != "" { + testutil.Equals(t, tc.expectedError, err.Error()) + } else { + testutil.Ok(t, err) + c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) + tc.verifyFunc(t, c) + } + }) + } +}