Skip to content

Commit

Permalink
Add histogram metrics for index cache item size (#6528)
Browse files Browse the repository at this point in the history
* add histogram metrics for index cache item size

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jul 13, 2023
1 parent a395c5d commit 7ff170f
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6441](https://github.com/thanos-io/thanos/pull/6441) Compact: Compactor will set `index_stats` in `meta.json` file with max series and chunk size information.
- [#6466](https://github.com/thanos-io/thanos/pull/6466) Mixin (Receive): add limits alerting for configuration reload and meta-monitoring.
- [#6467](https://github.com/thanos-io/thanos/pull/6467) Mixin (Receive): add alert for tenant reaching head series limit.
- [#6528](https://github.com/thanos-io/thanos/pull/6528) Index Cache: Add histogram metric `thanos_store_index_cache_stored_data_size_bytes` for item size.

### Fixed
- [#6503](https://github.com/thanos-io/thanos/pull/6503) *: Change the engine behind `ContentPathReloader` to be completely independent of any filesystem concept. This effectively fixes this configuration reload when used with Kubernetes ConfigMaps, Secrets, or other volume mounts.
Expand Down
12 changes: 10 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type IndexCache interface {

// Common metrics that should be used by all cache implementations.
type commonMetrics struct {
requestTotal *prometheus.CounterVec
hitsTotal *prometheus.CounterVec
requestTotal *prometheus.CounterVec
hitsTotal *prometheus.CounterVec
dataSizeBytes *prometheus.HistogramVec
}

func newCommonMetrics(reg prometheus.Registerer) *commonMetrics {
Expand All @@ -72,6 +73,13 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics {
Name: "thanos_store_index_cache_hits_total",
Help: "Total number of items requests to the cache that were a hit.",
}, []string{"item_type"}),
dataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_store_index_cache_stored_data_size_bytes",
Help: "Histogram to track item data size stored in index cache",
Buckets: []float64{
32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024,
},
}, []string{"item_type"}),
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func copyToKey(l labels.Label) cacheKeyPostings {
// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings).Observe(float64(len(v)))
c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v)
}

Expand All @@ -318,6 +319,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(len(v)))
c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v)
}

Expand All @@ -332,6 +334,7 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ul
// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries).Observe(float64(len(v)))
c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v)
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ type RemoteIndexCache struct {
compressionScheme string

// Metrics.
postingRequests prometheus.Counter
seriesRequests prometheus.Counter
expandedPostingRequests prometheus.Counter
postingHits prometheus.Counter
seriesHits prometheus.Counter
expandedPostingHits prometheus.Counter
postingRequests prometheus.Counter
seriesRequests prometheus.Counter
expandedPostingRequests prometheus.Counter
postingHits prometheus.Counter
seriesHits prometheus.Counter
expandedPostingHits prometheus.Counter
postingDataSizeBytes prometheus.Observer
expandedPostingDataSizeBytes prometheus.Observer
seriesDataSizeBytes prometheus.Observer
}

// NewRemoteIndexCache makes a new RemoteIndexCache.
Expand All @@ -61,6 +64,10 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
c.seriesHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries)
c.expandedPostingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings)

c.postingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings)
c.seriesDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries)
c.expandedPostingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings)

level.Info(logger).Log("msg", "created index cache")

return c, nil
Expand All @@ -70,6 +77,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.postingDataSizeBytes.Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string()
if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
Expand Down Expand Up @@ -118,6 +126,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) {
c.expandedPostingDataSizeBytes.Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
Expand Down Expand Up @@ -148,6 +157,7 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.seriesDataSizeBytes.Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
Expand Down

0 comments on commit 7ff170f

Please sign in to comment.