Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from thanos-io:main #476

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 11 additions & 36 deletions pkg/store/cache/matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,23 @@
package storecache

import (
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/singleflight"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

const DefaultCacheSize = 200

type NewItemFunc func(matcher ConversionLabelMatcher) (*labels.Matcher, error)
type NewItemFunc func() (*labels.Matcher, error)

type MatchersCache interface {
// GetOrSet retrieves a matcher from cache or creates and stores it if not present.
// If the matcher is not in cache, it uses the provided newItem function to create it.
GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error)
GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error)
}

// Ensure implementations satisfy the interface.
Expand All @@ -41,14 +38,14 @@ func NewNoopMatcherCache() MatchersCache {
}

// GetOrSet implements MatchersCache by always creating a new matcher without caching.
func (n *NoopMatcherCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem(key)
func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) {
return newItem()
}

// LruMatchersCache implements MatchersCache with an LRU cache and metrics.
type LruMatchersCache struct {
reg prometheus.Registerer
cache *lru.Cache[ConversionLabelMatcher, *labels.Matcher]
cache *lru.Cache[string, *labels.Matcher]
metrics *matcherCacheMetrics
size int
sf singleflight.Group
Expand Down Expand Up @@ -78,7 +75,7 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
}
cache.metrics = newMatcherCacheMetrics(cache.reg)

lruCache, err := lru.NewWithEvict[ConversionLabelMatcher, *labels.Matcher](cache.size, cache.onEvict)
lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict)
if err != nil {
return nil, err
}
Expand All @@ -87,16 +84,15 @@ func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) {
return cache, nil
}

func (c *LruMatchersCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) {
func (c *LruMatchersCache) GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) {
c.metrics.requestsTotal.Inc()

v, err, _ := c.sf.Do(key.String(), func() (interface{}, error) {
v, err, _ := c.sf.Do(key, func() (interface{}, error) {
if item, ok := c.cache.Get(key); ok {
c.metrics.hitsTotal.Inc()
return item, nil
}

item, err := newItem(key)
item, err := newItem()
if err != nil {
return nil, err
}
Expand All @@ -111,7 +107,7 @@ func (c *LruMatchersCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemF
return v.(*labels.Matcher), nil
}

func (c *LruMatchersCache) onEvict(_ ConversionLabelMatcher, _ *labels.Matcher) {
func (c *LruMatchersCache) onEvict(_ string, _ *labels.Matcher) {
c.metrics.evicted.Inc()
c.metrics.numItems.Set(float64(c.cache.Len()))
}
Expand Down Expand Up @@ -155,32 +151,11 @@ func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics {
func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for i := range ms {
pm, err := cache.GetOrSet(&ms[i], MatcherToPromMatcher)
pm, err := cache.GetOrSet(ms[i].String(), func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) })
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}

func MatcherToPromMatcher(m ConversionLabelMatcher) (*labels.Matcher, error) {
mi, ok := m.(*storepb.LabelMatcher)
if !ok {
return nil, fmt.Errorf("invalid matcher type. Got: %T", m)
}

return storepb.MatcherToPromMatcher(*mi)
}

// ConversionLabelMatcher is a common interface for the Prometheus and Thanos label matchers.
type ConversionLabelMatcher interface {
String() string
GetName() string
GetValue() string
}

var (
_ ConversionLabelMatcher = (*storepb.LabelMatcher)(nil)
_ ConversionLabelMatcher = (*prompb.LabelMatcher)(nil)
)
26 changes: 15 additions & 11 deletions pkg/store/cache/matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,53 @@ func TestMatchersCache(t *testing.T) {
}

var cacheHit bool
newItem := func(matcher storecache.ConversionLabelMatcher) (*labels.Matcher, error) {
cacheHit = false
return storecache.MatcherToPromMatcher(matcher)
newItem := func(matcher *storepb.LabelMatcher) func() (*labels.Matcher, error) {
return func() (*labels.Matcher, error) {
cacheHit = false
return storepb.MatcherToPromMatcher(*matcher)
}
}
expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val")
expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3")
expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3")

item, err := cache.GetOrSet(matcher, newItem)
item, err := cache.GetOrSet(matcher.String(), newItem(matcher))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher, newItem)
item, err = cache.GetOrSet(matcher.String(), newItem(matcher))
testutil.Ok(t, err)
testutil.Equals(t, true, cacheHit)
testutil.Equals(t, expected.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher2, newItem)
item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected2.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher2, newItem)
item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2))
testutil.Ok(t, err)
testutil.Equals(t, true, cacheHit)
testutil.Equals(t, expected2.String(), item.String())

cacheHit = true
item, err = cache.GetOrSet(matcher, newItem)
item, err = cache.GetOrSet(matcher.String(), newItem(matcher))
testutil.Ok(t, err)
testutil.Equals(t, true, cacheHit)
testutil.Equals(t, expected, item)

cacheHit = true
item, err = cache.GetOrSet(matcher3, newItem)
item, err = cache.GetOrSet(matcher3.String(), newItem(matcher3))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected3, item)

cacheHit = true
item, err = cache.GetOrSet(matcher2, newItem)
item, err = cache.GetOrSet(matcher2.String(), newItem(matcher2))
testutil.Ok(t, err)
testutil.Equals(t, false, cacheHit)
testutil.Equals(t, expected2.String(), item.String())
Expand All @@ -104,7 +106,9 @@ func BenchmarkMatchersCache(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
matcher := matchers[i%len(matchers)]
_, err := cache.GetOrSet(matcher, storecache.MatcherToPromMatcher)
_, err := cache.GetOrSet(matcher.String(), func() (*labels.Matcher, error) {
return storepb.MatcherToPromMatcher(*matcher)
})
if err != nil {
b.Fatalf("failed to get or set cache item: %v", err)
}
Expand Down
Loading