Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into async-op-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Sabsay committed Jan 8, 2025
2 parents 7b83f24 + 0d42636 commit 8963c1b
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 108 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options
- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls.
- [#7353](https://github.com/thanos-io/thanos/pull/7353) [#8045](https://github.com/thanos-io/thanos/pull/8045) Receiver/StoreGateway: Add `--matcher-cache-size` option to enable caching for regex matchers in series calls.
- [#8017](https://github.com/thanos-io/thanos/pull/8017) Store Gateway: Use native histogram for binary reader load and download duration and fixed download duration metric. #8017

### Changed
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache = storecache.NewNoopMatcherCache()
var cache = storecache.NoopMatchersCache
if conf.matcherCacheSize > 0 {
cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"about order.").
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)
cmd.Flag("matcher-cache-size", "Max number of cached matchers items. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

Expand Down
13 changes: 13 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type storeConfig struct {
postingGroupMaxKeySeriesRatio float64

indexHeaderLazyDownloadStrategy string

matcherCacheSize int
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -225,6 +227,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&sc.label)

cmd.Flag("matcher-cache-size", "Max number of cached matchers items. Using 0 disables caching.").Default("0").IntVar(&sc.matcherCacheSize)

sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
}

Expand Down Expand Up @@ -368,6 +372,14 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var matchersCache = storecache.NoopMatchersCache
if conf.matcherCacheSize > 0 {
matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "failed to create matchers cache")
}
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
Expand Down Expand Up @@ -413,6 +425,7 @@ func runStore(
}),
store.WithRegistry(reg),
store.WithIndexCache(indexCache),
store.WithMatchersCache(matchersCache),
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
Expand Down
4 changes: 2 additions & 2 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 The size of the cache used for matching against
external labels. Using 0 disables caching.
--matcher-cache-size=0 Max number of cached matchers items. Using 0
disables caching.
--objstore.config=<content>
Alternative to 'objstore.config-file'
flag (mutually exclusive). Content of
Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 Max number of cached matchers items. Using 0
disables caching.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
will serve only blocks, which happened earlier
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
matcherCache: storecache.NewNoopMatcherCache(),
matcherCache: storecache.NoopMatchersCache,
}

for _, option := range options {
Expand Down
21 changes: 15 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ type BucketStore struct {
fetcher block.MetadataFetcher
dir string
indexCache storecache.IndexCache
matcherCache storecache.MatchersCache
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Pool[byte]
Expand Down Expand Up @@ -517,6 +518,13 @@ func WithIndexCache(cache storecache.IndexCache) BucketStoreOption {
}
}

// WithMatchersCache sets a matchers cache to use instead of a noopCache.
func WithMatchersCache(cache storecache.MatchersCache) BucketStoreOption {
return func(s *BucketStore) {
s.matcherCache = cache
}
}

// WithQueryGate sets a queryGate to use instead of a noopGate.
func WithQueryGate(queryGate gate.Gate) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -637,11 +645,12 @@ func NewBucketStore(
options ...BucketStoreOption,
) (*BucketStore, error) {
s := &BucketStore{
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
logger: log.NewNopLogger(),
bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
matcherCache: storecache.NoopMatchersCache,
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
return &b
Expand Down Expand Up @@ -1536,7 +1545,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store

tenant, _ := tenancy.GetTenantFromGRPCMetadata(srv.Context())

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
matchers, err := storecache.MatchersToPromMatchersCached(s.matcherCache, req.Matchers...)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
MaxSize: 8889,
})
testutil.Ok(t, err)
matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(10000))
testutil.Ok(t, err)

var b1 *bucketBlock

Expand Down Expand Up @@ -1775,6 +1777,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
store := &BucketStore{
bkt: objstore.WithNoopInstr(bkt),
logger: logger,
matcherCache: matcherCache,
indexCache: indexCache,
indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader),
metrics: newBucketStoreMetrics(nil),
Expand Down Expand Up @@ -2080,6 +2083,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

matcherCache, err := storecache.NewMatchersCache(storecache.WithSize(1024))
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
Expand All @@ -2096,6 +2102,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
0,
WithLogger(logger),
WithIndexCache(indexCache),
WithMatchersCache(matcherCache),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))
Expand Down
74 changes: 60 additions & 14 deletions pkg/store/cache/matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package storecache

import (
"strings"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -17,28 +19,37 @@ const DefaultCacheSize = 200

type NewItemFunc func() (*labels.Matcher, error)

type ConversionLabelMatcher interface {
GetValue() string
GetName() string
MatcherType() (labels.MatchType, error)
}

type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error)
GetOrSet(m ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface.
var (
_ MatchersCache = (*LruMatchersCache)(nil)
_ MatchersCache = (*NoopMatcherCache)(nil)
)
_ MatchersCache = (*LruMatchersCache)(nil)
NoopMatchersCache MatchersCache = &noopMatcherCache{}

// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything.
type NoopMatcherCache struct{}
defaultIsCacheableFunc = func(m ConversionLabelMatcher) bool {
t, err := m.MatcherType()
if err != nil {
return false
}

// NewNoopMatcherCache creates a new no-op matcher cache.
func NewNoopMatcherCache() MatchersCache {
return &NoopMatcherCache{}
}
return t == labels.MatchRegexp || t == labels.MatchNotRegexp
}
)

type noopMatcherCache struct{}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) {
func (n *noopMatcherCache) GetOrSet(_ ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem()
}

Expand All @@ -49,6 +60,8 @@ type LruMatchersCache struct {
metrics *matcherCacheMetrics
size int
sf singleflight.Group

isCacheable func(matcher ConversionLabelMatcher) bool
}

type MatcherCacheOption func(*LruMatchersCache)
Expand All @@ -65,15 +78,24 @@ func WithSize(size int) MatcherCacheOption {
}
}

// WithIsCacheableFunc sets the function that determines if the item should be cached or not.
func WithIsCacheableFunc(f func(matcher ConversionLabelMatcher) bool) MatcherCacheOption {
return func(c *LruMatchersCache) {
c.isCacheable = f
}
}

func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
cache := &LruMatchersCache{
size: DefaultCacheSize,
size: DefaultCacheSize,
isCacheable: defaultIsCacheableFunc,
}

for _, opt := range opts {
opt(cache)
}
cache.metrics = newMatcherCacheMetrics(cache.reg)
cache.metrics.maxItems.Set(float64(cache.size))

lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
Expand All @@ -84,8 +106,18 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
return cache, nil
}

func (c *LruMatchersCache) GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(m ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
if !c.isCacheable(m) {
return newItem()
}

c.metrics.requestsTotal.Inc()
key, err := cacheKey(m)

if err != nil {
return nil, err
}

v, err, _ := c.sf.Do(key, func() (interface{}, error) {
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
Expand Down Expand Up @@ -151,11 +183,25 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for i := range ms {
pm, err := cache.GetOrSet(ms[i].String(), func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) })
pm, err := cache.GetOrSet(&ms[i], func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) })
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}

func cacheKey(m ConversionLabelMatcher) (string, error) {
sb := strings.Builder{}
t, err := m.MatcherType()
if err != nil {
return "", err
}
typeStr := t.String()
sb.Grow(len(m.GetValue()) + len(m.GetName()) + len(typeStr))
sb.WriteString(m.GetName())
sb.WriteString(typeStr)
sb.WriteString(m.GetValue())
return sb.String(), nil
}
Loading

0 comments on commit 8963c1b

Please sign in to comment.