Skip to content

Commit

Permalink
querier: don't cache context.Canceled errors for bucket index (#7620)
Browse files Browse the repository at this point in the history
* querier: don't cache context.Canceled errors for bucket index

Without a cached bucket index the querier will synchronously fetch the bucket index while the query is on hold. If the query is cancelled, then the error will be cached and future queries would return the error until the next periodic bucket index update (after 1 minute).

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update comment

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Also assert on successful load

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Mar 15, 2024
1 parent 9d6f6d5 commit 0fedf8c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
* [BUGFIX] querier: Don't cache context.Canceled errors for bucket index. #7620

### Mixin
* [ENHANCEMENT] Alerts: allow configuring alerts range interval via `_config.base_alerts_range_interval_minutes`. #7591
Expand Down
5 changes: 1 addition & 4 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,7 @@ func TestRulerMetricsForInvalidQueriesAndNoFetchedSeries(t *testing.T) {
// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "5",

// Do not involve the block storage as we don't upload blocks and
// it can happen that a query runs faster than trying to load the
// index, which results in context canceled, which would be cached
// in the index loader, failing all subsequent queries.
// Do not involve the block storage as we don't upload blocks.
"-querier.query-store-after": "12h",

// Enable query stats for ruler to test metric for no fetched series
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/tsdb/bucketindex/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
l.loadAttempts.Inc()
idx, err := ReadIndex(ctx, l.bkt, userID, l.cfgProvider, l.logger)
if err != nil {
// Cache the error, to avoid hammering the object store in case of persistent issues
// (eg. corrupted bucket index or not existing).
l.cacheIndex(userID, nil, err)
if !errors.Is(err, context.Canceled) {
// Cache the error, to avoid hammering the object store in case of persistent issues
// (eg. corrupted bucket index or not existing).
// Don't cache context.Canceled errors, as they are caused by the individual query, and we don't want to
// continue failing queries on them.
l.cacheIndex(userID, nil, err)
}

if errors.Is(err, ErrIndexNotFound) {
level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID)
Expand Down
71 changes: 71 additions & 0 deletions pkg/storage/tsdb/bucketindex/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,77 @@ func TestLoader_GetIndex_ShouldCacheIndexNotFoundError(t *testing.T) {
))
}

func TestLoader_GetIndex_ShouldNotCacheContextCanceled(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
bkt, _ := mimir_testutil.PrepareFilesystemBucket(t)

// Create a bucket index.
idx := &Index{
Version: IndexVersion1,
Blocks: Blocks{
{ID: ulid.MustNew(1, nil), MinTime: 10, MaxTime: 20},
},
BlockDeletionMarks: nil,
UpdatedAt: time.Now().Unix(),
}
require.NoError(t, WriteIndex(ctx, bkt, "user-1", nil, idx))

// Create the loader.
loader := NewLoader(prepareLoaderConfig(), bkt, nil, log.NewNopLogger(), reg)
require.NoError(t, services.StartAndAwaitRunning(ctx, loader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, loader))
})

// Request the index multiple times.
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
for i := 0; i < 10; i++ {
_, err := loader.GetIndex(canceledCtx, "user-1")
assert.ErrorIs(t, err, context.Canceled)
}

// Ensure metrics have been updated accordingly.
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
# TYPE cortex_bucket_index_load_failures_total counter
cortex_bucket_index_load_failures_total 10
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
# TYPE cortex_bucket_index_loaded gauge
cortex_bucket_index_loaded 0
# HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
# TYPE cortex_bucket_index_loads_total counter
cortex_bucket_index_loads_total 10
`),
"cortex_bucket_index_loads_total",
"cortex_bucket_index_load_failures_total",
"cortex_bucket_index_loaded",
))

// Getting it once again with a working context should trigger a load.
_, err := loader.GetIndex(ctx, "user-1")
assert.NoError(t, err)

// Ensure metrics have been updated accordingly.
assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures.
# TYPE cortex_bucket_index_load_failures_total counter
cortex_bucket_index_load_failures_total 10
# HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory.
# TYPE cortex_bucket_index_loaded gauge
cortex_bucket_index_loaded 1
# HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts.
# TYPE cortex_bucket_index_loads_total counter
cortex_bucket_index_loads_total 11
`),
"cortex_bucket_index_loads_total",
"cortex_bucket_index_load_failures_total",
"cortex_bucket_index_loaded",
))

}

func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
Expand Down

0 comments on commit 0fedf8c

Please sign in to comment.