diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4c41a90a99..572238adeb 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -99,6 +99,7 @@ const ( var ( errExemplarRef = errors.New("exemplars not ingested because series not already present") errIngesterStopping = errors.New("ingester stopping") + errNoUserDb = errors.New("no user db") tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk] ) @@ -988,8 +989,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { func (i *Ingester) updateUserTSDBConfigs() { for _, userID := range i.getTSDBUsers() { - userDB := i.getTSDB(userID) - if userDB == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil { continue } @@ -1005,7 +1006,7 @@ func (i *Ingester) updateUserTSDBConfigs() { } // This method currently updates the MaxExemplars and OutOfOrderTimeWindow. - err := userDB.db.ApplyConfig(cfg) + err = userDB.db.ApplyConfig(cfg) if err != nil { level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.") } @@ -1029,8 +1030,8 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { purgeTime := time.Now().Add(-i.cfg.ActiveSeriesMetricsIdleTimeout) for _, userID := range i.getTSDBUsers() { - userDB := i.getTSDB(userID) - if userDB == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil { continue } @@ -1045,8 +1046,8 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { func (i *Ingester) updateLabelSetMetrics() { activeUserSet := make(map[string]map[uint64]struct{}) for _, userID := range i.getTSDBUsers() { - userDB := i.getTSDB(userID) - if userDB == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil { continue } @@ -1549,8 +1550,8 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery i.metrics.queries.Inc() - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return &client.ExemplarQueryResponse{}, nil } @@ -1642,8 +1643,8 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu return nil, cleanup, err } - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return &client.LabelValuesResponse{}, cleanup, nil } @@ -1732,8 +1733,8 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR return nil, cleanup, err } - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return &client.LabelNamesResponse{}, cleanup, nil } @@ -1830,8 +1831,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien return cleanup, err } - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return cleanup, nil } @@ -1944,8 +1945,8 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) return nil, err } - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return &client.UserStatsResponse{}, nil } @@ -2064,8 +2065,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ i.metrics.queries.Inc() - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return nil } @@ -2216,11 +2217,14 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th return numSeries, numSamples, totalBatchSizeBytes, numChunks, nil } -func (i *Ingester) getTSDB(userID string) *userTSDB { +func (i *Ingester) getTSDB(userID string) (*userTSDB, error) { i.stoppedMtx.RLock() defer i.stoppedMtx.RUnlock() db := i.TSDBState.dbs[userID] - return db + if db == nil { + return nil, errNoUserDb + } + return db, nil } // List all users for which we have a TSDB. We do it here in order @@ -2238,8 +2242,11 @@ func (i *Ingester) getTSDBUsers() []string { } func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) { - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) if db != nil { + if err != nil { + level.Warn(i.logger).Log("msg", "error getting user DB but userDB is not null", "err", err, "userID", userID) + } return db, nil } @@ -2271,7 +2278,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) } // Create the database and a shipper for a user - db, err := i.createTSDB(userID) + db, err = i.createTSDB(userID) if err != nil { return nil, err } @@ -2285,10 +2292,10 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFunc { return func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { - db := i.getTSDB(userId) + db, err := i.getTSDB(userId) var postingCache cortex_tsdb.ExpandedPostingsCache - if db != nil { + if err == nil && db != nil { postingCache = db.postingCache } @@ -2650,8 +2657,8 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants) } // Get the user's DB. If the user doesn't exist, we skip it. - userDB := i.getTSDB(userID) - if userDB == nil || userDB.shipper == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil || userDB.shipper == nil { return nil } @@ -2762,8 +2769,8 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util. return nil } - userDB := i.getTSDB(userID) - if userDB == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil { return nil } @@ -2773,8 +2780,6 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util. return nil } - var err error - i.TSDBState.compactionsTriggered.Inc() reason := "" @@ -2823,8 +2828,8 @@ func (i *Ingester) expirePostingsCache(ctx context.Context) error { if ctx.Err() != nil { return nil } - userDB := i.getTSDB(userID) - if userDB == nil || userDB.postingCache == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil || userDB.postingCache == nil { continue } userDB.postingCache.PurgeExpiredItems() @@ -2834,8 +2839,8 @@ func (i *Ingester) expirePostingsCache(ctx context.Context) error { } func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckResult { - userDB := i.getTSDB(userID) - if userDB == nil || userDB.shipper == nil { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil || userDB.shipper == nil { // We will not delete local data when not using shipping to storage. return tsdbShippingDisabled } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f3da262fe6..c7069ddb3f 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -222,8 +222,9 @@ func TestIngesterDeletionRace(t *testing.T) { samples := []cortexpb.Sample{{Value: 2, TimestampMs: 10}} _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "name")}, samples, nil, nil, cortexpb.API)) require.NoError(t, err) - ing.getTSDB(u).postingCache = &wrappedExpandedPostingsCache{ExpandedPostingsCache: ing.getTSDB(u).postingCache, purgeDelay: 10 * time.Millisecond} - ing.getTSDB(u).deletionMarkFound.Store(true) // lets force close the tenant + db, _ := ing.getTSDB(u) + db.postingCache = &wrappedExpandedPostingsCache{ExpandedPostingsCache: db.postingCache, purgeDelay: 10 * time.Millisecond} + db.deletionMarkFound.Store(true) // lets force close the tenant }() } @@ -644,7 +645,8 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) // Force set tenant to be deleted. - ing.getTSDB(userID).deletionMarkFound.Store(true) + db, _ := ing.getTSDB(userID) + db.deletionMarkFound.Store(true) require.Equal(t, tsdbTenantMarkedForDeletion, ing.closeAndDeleteUserTSDBIfIdle(userID)) // LabelSet metrics cleaned up. require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(``), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) @@ -716,7 +718,7 @@ func TestPushRace(t *testing.T) { wg.Wait() - db := ing.getTSDB(userID) + db, _ := ing.getTSDB(userID) ir, err := db.db.Head().Index() require.NoError(t, err) @@ -3833,7 +3835,9 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700)) }, check: func(t *testing.T, i *Ingester) { - require.Nil(t, i.getTSDB("user0")) + db, err := i.getTSDB("user0") + require.Nil(t, db) + require.ErrorIs(t, err, errNoUserDb) }, }, "should not load any TSDB if the root directory is empty": { @@ -3861,9 +3865,15 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { }, check: func(t *testing.T, i *Ingester) { require.Equal(t, 2, len(i.TSDBState.dbs)) - require.NotNil(t, i.getTSDB("user0")) - require.NotNil(t, i.getTSDB("user1")) - require.Nil(t, i.getTSDB("user2")) + db0, err := i.getTSDB("user0") + require.NotNil(t, db0) + require.Nil(t, err) + db1, err := i.getTSDB("user1") + require.NotNil(t, db1) + require.Nil(t, err) + db2, err := i.getTSDB("user2") + require.Nil(t, db2) + require.ErrorIs(t, err, errNoUserDb) }, }, "should load all TSDBs on concurrency < number of TSDBs": { @@ -3877,11 +3887,11 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { }, check: func(t *testing.T, i *Ingester) { require.Equal(t, 5, len(i.TSDBState.dbs)) - require.NotNil(t, i.getTSDB("user0")) - require.NotNil(t, i.getTSDB("user1")) - require.NotNil(t, i.getTSDB("user2")) - require.NotNil(t, i.getTSDB("user3")) - require.NotNil(t, i.getTSDB("user4")) + require.NotNil(t, getTSDB(t, i, "user0")) + require.NotNil(t, getTSDB(t, i, "user1")) + require.NotNil(t, getTSDB(t, i, "user2")) + require.NotNil(t, getTSDB(t, i, "user3")) + require.NotNil(t, getTSDB(t, i, "user4")) }, }, "should fail and rollback if an error occur while loading a TSDB on concurrency > number of TSDBs": { @@ -3898,8 +3908,12 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { }, check: func(t *testing.T, i *Ingester) { require.Equal(t, 0, len(i.TSDBState.dbs)) - require.Nil(t, i.getTSDB("user0")) - require.Nil(t, i.getTSDB("user1")) + db0, err := i.getTSDB("user0") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db0) + db1, err := i.getTSDB("user1") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db1) }, expectedErr: "unable to open TSDB for user user0", }, @@ -3920,11 +3934,21 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { }, check: func(t *testing.T, i *Ingester) { require.Equal(t, 0, len(i.TSDBState.dbs)) - require.Nil(t, i.getTSDB("user0")) - require.Nil(t, i.getTSDB("user1")) - require.Nil(t, i.getTSDB("user2")) - require.Nil(t, i.getTSDB("user3")) - require.Nil(t, i.getTSDB("user4")) + db, err := i.getTSDB("user0") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db) + db, err = i.getTSDB("user1") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db) + db, err = i.getTSDB("user2") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db) + db, err = i.getTSDB("user3") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db) + db, err = i.getTSDB("user4") + require.ErrorIs(t, err, errNoUserDb) + require.Nil(t, db) }, expectedErr: "unable to open TSDB for user user2", }, @@ -3968,6 +3992,12 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { } } +func getTSDB(t *testing.T, i *Ingester, uId string) *userTSDB { + db, err := i.getTSDB(uId) + require.NoError(t, err) + return db +} + func TestIngester_shipBlocks(t *testing.T) { testCases := map[string]struct { ss bucketindex.Status @@ -4059,8 +4089,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), objstore.WithNoopInstr(bucket), userID, cortex_tsdb.NewTenantDeletionMark(time.Now()))) numObjects++ // For deletion marker - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) require.NotNil(t, db) + require.NoError(t, err) db.lastDeletionMarkCheck.Store(0) // After writing tenant deletion mark, @@ -4106,8 +4137,9 @@ func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testin i.shipBlocks(context.Background(), nil) // Verify that tenant deletion mark was found. - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) require.NotNil(t, db) + require.NoError(t, err) require.True(t, db.deletionMarkFound.Load()) // If we try to close TSDB now, it should succeed, even though TSDB is not idle and empty. @@ -4139,7 +4171,8 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) { mockUserShipper(t, i) // Mock the shipper meta (no blocks). - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) err = db.updateCachedShippedBlocks() require.NoError(t, err) @@ -4182,7 +4215,8 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP }).Return(0, nil) // Mock the shipper meta (no blocks). - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, })) @@ -4281,7 +4315,8 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) { require.Equal(t, tsdbIdleClosed, i.closeAndDeleteUserTSDBIfIdle(userID)) // Verify that it was closed. - db = i.getTSDB(userID) + db, err = i.getTSDB(userID) + require.ErrorIs(t, err, errNoUserDb) require.Nil(t, db) // And we can recreate it again, if needed. @@ -4325,7 +4360,8 @@ func TestIngester_invalidSamplesDontChangeLastUpdateTime(t *testing.T) { require.NoError(t, err) } - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) lastUpdate := db.lastUpdate.Load() // Wait until 1 second passes. @@ -4513,7 +4549,8 @@ func TestIngester_flushing(t *testing.T) { cortex_ingester_shipper_uploads_total 3 `), "cortex_ingester_shipper_uploads_total")) - userDB := i.getTSDB(userID) + userDB, err := i.getTSDB(userID) + require.NoError(t, err) require.NotNil(t, userDB) blocks := userDB.Blocks() @@ -5015,7 +5052,8 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) { } func verifyCompactedHead(t *testing.T, i *Ingester, expected bool) { - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) require.NotNil(t, db) h := db.Head() @@ -5095,7 +5133,8 @@ func TestHeadCompactionOnStartup(t *testing.T) { defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck - db := ingester.getTSDB(userID) + db, err := ingester.getTSDB(userID) + require.NoError(t, err) require.NotNil(t, db) h := db.Head() @@ -5126,14 +5165,16 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) { // Push some data. pushSingleSampleWithMetadata(t, i) - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) require.NotNil(t, db) // Stop ingester. require.NoError(t, services.StopAndAwaitTerminated(context.Background(), i)) // Verify that DB is no longer in memory, but was closed - db = i.getTSDB(userID) + db, err = i.getTSDB(userID) + require.ErrorIs(t, err, errNoUserDb) require.Nil(t, db) } @@ -5174,7 +5215,8 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { require.NoError(t, err) } - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) require.NotNil(t, db) require.Nil(t, db.Compact(ctx)) @@ -5265,7 +5307,8 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { pushSingleSampleWithMetadata(t, i) // We mock a flushing by setting the boolean. - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) require.NotNil(t, db) require.True(t, db.casState(active, forceCompacting)) @@ -5303,7 +5346,8 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { // Verifying that compaction won't happen when a request is in flight. // This mocks a request in flight. - db := i.getTSDB(userID) + db, err := i.getTSDB(userID) + require.NoError(t, err) require.NoError(t, db.acquireAppendLock()) // Flush handler only triggers compactions, but doesn't wait for them to finish. We cannot use ?wait=true here, @@ -5323,8 +5367,8 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { // Let's wait until all head series have been flushed. test.Poll(t, 5*time.Second, uint64(0), func() interface{} { - db := i.getTSDB(userID) - if db == nil { + db, err := i.getTSDB(userID) + if err != nil || db == nil { return false } return db.Head().NumSeries() @@ -5641,7 +5685,8 @@ func TestExpendedPostingsCacheMatchers(t *testing.T) { } } - db := ing.getTSDB(userID) + db, err := ing.getTSDB(userID) + require.NoError(t, err) type testCase struct { matchers []*client.LabelMatcher @@ -6091,7 +6136,7 @@ func TestExpendedPostingsCache(t *testing.T) { test.Poll(t, c.cacheConfig.Blocks.Ttl+c.cacheConfig.Head.Ttl+cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval, 0, func() interface{} { size := 0 for _, userID := range i.getTSDBUsers() { - userDB := i.getTSDB(userID) + userDB, _ := i.getTSDB(userID) size += userDB.postingCache.Size() } return size