Skip to content

Commit

Permalink
Count the number of blocks loaded by a store-gateway by duration
Browse files Browse the repository at this point in the history
Emit metrics about the number of blocks a store-gateway has loaded based on
the duration of the blocks (bucketed into the block durations the compactor
is configured to produce).

A common symptom of the compactor failing is store-gateways loading more 2h
blocks that usual and degrading performance of the read path. Counting the
number of blocks by duration allows us to detect when this happens.

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Sep 20, 2023
1 parent c13a9ae commit 67de993
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
* [ENHANCEMENT] Query-frontend: add `cortex_query_frontend_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request when not using the query-scheduler. #5879
* [ENHANCEMENT] Store-gateway: add metric `cortex_bucket_store_blocks_loaded_by_duration` for counting the loaded number of blocks based on their duration. #6074
* [ENHANCEMENT] Expose `/sync/mutex/wait/total:seconds` Go runtime metric as `go_sync_mutex_wait_total_seconds_total` from all components. #5879
* [ENHANCEMENT] Query-scheduler: improve latency with many concurrent queriers. #5880
* [ENHANCEMENT] Implement support for `limit`, `limit_per_metric` and `metric` parameters for `<Prometheus HTTP prefix>/api/v1/metadata` endpoint. #5890
Expand Down
40 changes: 34 additions & 6 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ const (
)

type BucketStoreStats struct {
// BlocksLoaded is the number of blocks currently loaded in the bucket store.
BlocksLoaded int
// BlocksLoaded is the number of blocks currently loaded in the bucket store
// indexed by the duration of the block.
BlocksLoaded map[time.Duration]int
}

// BucketStore implements the store API backed by a bucket. It loads all index
Expand Down Expand Up @@ -258,16 +259,38 @@ func (s *BucketStore) RemoveBlocksAndClose() error {
}

// Stats returns statistics about the BucketStore instance.
func (s *BucketStore) Stats() BucketStoreStats {
func (s *BucketStore) Stats(durations tsdb.DurationList) BucketStoreStats {
s.blocksMx.RLock()
defer s.blocksMx.RUnlock()

return buildStoreStats(durations, s.blocks)
}

func buildStoreStats(durations tsdb.DurationList, blocks map[ulid.ULID]*bucketBlock) BucketStoreStats {
stats := BucketStoreStats{}
stats.BlocksLoaded = make(map[time.Duration]int)

s.blocksMx.RLock()
stats.BlocksLoaded = len(s.blocks)
s.blocksMx.RUnlock()
if len(durations) != 0 {
for _, b := range blocks {
// Bucket each block into one of the possible block durations we're creating.
bucketed := bucketBlockDuration(durations, b.blockDuration())
stats.BlocksLoaded[bucketed]++
}
}

return stats
}

func bucketBlockDuration(buckets tsdb.DurationList, duration time.Duration) time.Duration {
for _, d := range buckets {
if duration <= d {
return d
}
}

return buckets[len(buckets)-1]
}

// SyncBlocks synchronizes the stores state with the Bucket bucket.
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
Expand Down Expand Up @@ -1933,6 +1956,11 @@ func (b *bucketBlock) overlapsClosedInterval(mint, maxt int64) bool {
return b.meta.MinTime <= maxt && mint < b.meta.MaxTime
}

// blockDuration returns the difference between the max and min time for this block.
func (b *bucketBlock) blockDuration() time.Duration {
return time.Duration(b.meta.MaxTime-b.meta.MinTime) * time.Millisecond
}

// Close waits for all pending readers to finish and then closes all underlying resources.
func (b *bucketBlock) Close() error {
b.pendingReaders.Wait()
Expand Down
50 changes: 36 additions & 14 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ type BucketStores struct {
stores map[string]*BucketStore

// Metrics.
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
tenantsDiscovered prometheus.Gauge
tenantsSynced prometheus.Gauge
blocksLoaded prometheus.GaugeFunc
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
tenantsDiscovered prometheus.Gauge
tenantsSynced prometheus.Gauge
blocksLoaded *prometheus.Desc
blocksLoadedByDuration *prometheus.Desc
}

// NewBucketStores makes a new BucketStores.
Expand Down Expand Up @@ -144,10 +145,16 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
Name: "cortex_bucket_stores_tenants_synced",
Help: "Number of tenants synced.",
})
u.blocksLoaded = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_bucket_store_blocks_loaded",
Help: "Number of currently loaded blocks.",
}, u.getBlocksLoadedMetric)
u.blocksLoaded = prometheus.NewDesc(
"cortex_bucket_store_blocks_loaded",
"Number of currently loaded blocks.",
nil, nil,
)
u.blocksLoadedByDuration = prometheus.NewDesc(
"cortex_bucket_store_blocks_loaded_by_duration",
"Number of currently loaded blocks, bucketed by block duration.",
[]string{"duration"}, nil,
)

// Init the index cache.
if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore.IndexCache, logger, reg); err != nil {
Expand All @@ -156,6 +163,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra

if reg != nil {
reg.MustRegister(u.metaFetcherMetrics)
reg.MustRegister(u)
}

return u, nil
Expand Down Expand Up @@ -551,17 +559,31 @@ func (u *BucketStores) closeBucketStoreAndDeleteLocalFilesForExcludedTenants(inc
}
}

// getBlocksLoadedMetric returns the number of blocks currently loaded across all bucket stores.
func (u *BucketStores) getBlocksLoadedMetric() float64 {
count := 0
func (u *BucketStores) Describe(descs chan<- *prometheus.Desc) {
descs <- u.blocksLoaded
descs <- u.blocksLoadedByDuration
}

func (u *BucketStores) Collect(metrics chan<- prometheus.Metric) {
loaded := make(map[time.Duration]int)
total := 0

u.storesMu.RLock()

for _, store := range u.stores {
count += store.Stats().BlocksLoaded
stats := store.Stats(u.cfg.TSDB.BlockRanges)
for d, n := range stats.BlocksLoaded {
loaded[d] += n
total += n
}
}

u.storesMu.RUnlock()

return float64(count)
metrics <- prometheus.MustNewConstMetric(u.blocksLoaded, prometheus.GaugeValue, float64(total))
for d, n := range loaded {
metrics <- prometheus.MustNewConstMetric(u.blocksLoadedByDuration, prometheus.GaugeValue, float64(n), d.String())
}
}

func getUserIDFromGRPCContext(ctx context.Context) string {
Expand Down
82 changes: 82 additions & 0 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package storegateway
import (
"bytes"
"context"
cryptorand "crypto/rand"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -2397,6 +2398,87 @@ func TestBucketStore_Series_Limits(t *testing.T) {
}
}

func TestBucketStore_buildStoreStats(t *testing.T) {
durations := []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
now := time.Now().Round(time.Hour)

type buildStoreStatsCase struct {
name string
minTime time.Time
maxTime time.Time
expectedBucket time.Duration
}

testCases := []buildStoreStatsCase{
{
name: "under 2h duration",
minTime: now,
maxTime: now.Add(90 * time.Minute),
expectedBucket: 2 * time.Hour,
},
{
name: "exactly 2h duration",
minTime: now,
maxTime: now.Add(120 * time.Minute),
expectedBucket: 2 * time.Hour,
},
{
name: "over 2h duration",
minTime: now,
maxTime: now.Add(125 * time.Minute),
expectedBucket: 12 * time.Hour,
},
{
name: "double 2h duration",
minTime: now,
maxTime: now.Add(240 * time.Minute),
expectedBucket: 12 * time.Hour,
},
{
name: "exactly 12h duration",
minTime: now,
maxTime: now.Add(12 * time.Hour),
expectedBucket: 12 * time.Hour,
},
{
name: "over 12h duration",
minTime: now,
maxTime: now.Add(13 * time.Hour),
expectedBucket: 24 * time.Hour,
},
{
name: "exactly 24h duration",
minTime: now,
maxTime: now.Add(24 * time.Hour),
expectedBucket: 24 * time.Hour,
},
{
name: "over 24h duration",
minTime: now,
maxTime: now.Add(25 * time.Hour),
expectedBucket: 24 * time.Hour,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
uid := ulid.MustNew(uint64(now.UnixMilli()), cryptorand.Reader)
blk := &bucketBlock{
meta: &block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: tc.minTime.UnixMilli(),
MaxTime: tc.maxTime.UnixMilli(),
},
},
}

stats := buildStoreStats(durations, map[ulid.ULID]*bucketBlock{uid: blk})
require.Contains(t, stats.BlocksLoaded, tc.expectedBucket)
require.Equal(t, 1, stats.BlocksLoaded[tc.expectedBucket])
})
}
}

func mustMarshalAny(pb proto.Message) *types.Any {
out, err := types.MarshalAny(pb)
if err != nil {
Expand Down

0 comments on commit 67de993

Please sign in to comment.