diff --git a/CHANGELOG.md b/CHANGELOG.md index ad31ba3e8ec..25fc70d4968 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ ### Grafana Mimir -* [CHANGE] The following deprecated configurations have been removed: #6673 +* [CHANGE] The following deprecated configurations have been removed: #6673 #6779 #6808 * `-querier.iterators` * `-querier.batch-iterators` * `-blocks-storage.bucket-store.max-chunk-pool-bytes` diff --git a/integration/configs.go b/integration/configs.go index 0f7f0417697..32bab1eae42 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -148,6 +148,7 @@ var ( "-blocks-storage.tsdb.ship-interval": "1m", "-blocks-storage.tsdb.head-compaction-interval": "1s", "-querier.query-store-after": "0", + "-compactor.cleanup-interval": "2s", } } diff --git a/integration/getting_started_with_gossiped_ring_test.go b/integration/getting_started_with_gossiped_ring_test.go index 1325a95ffd0..00112f21bfc 100644 --- a/integration/getting_started_with_gossiped_ring_test.go +++ b/integration/getting_started_with_gossiped_ring_test.go @@ -39,6 +39,7 @@ func TestGettingStartedWithGossipedRing(t *testing.T) { // decrease timeouts to make test faster. should still be fine with two instances only "-ingester.ring.observe-period": "5s", // to avoid conflicts in tokens "-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously + "-compactor.cleanup-interval": "1s", // update bucket index continuously "-blocks-storage.bucket-store.ignore-blocks-within": "0", "-blocks-storage.backend": "s3", "-blocks-storage.s3.bucket-name": blocksBucketName, diff --git a/integration/read_write_mode_test.go b/integration/read_write_mode_test.go index 5289439913f..2e8b70dcd30 100644 --- a/integration/read_write_mode_test.go +++ b/integration/read_write_mode_test.go @@ -308,7 +308,6 @@ func TestReadWriteModeCompaction(t *testing.T) { // Frequently cleanup old blocks. // While this doesn't test the compaction functionality of the compactor, it does verify that the compactor // is correctly configured and able to interact with storage, which is the intention of this test. - "-compactor.cleanup-interval": "2s", "-compactor.blocks-retention-period": "5s", }) @@ -341,6 +340,7 @@ func startReadWriteModeCluster(t *testing.T, s *e2e.Scenario, extraFlags ...map[ flagSets := []map[string]string{ CommonStorageBackendFlags(), + BlocksStorageFlags(), { "-memberlist.join": "mimir-backend-1", }, diff --git a/integration/single_binary_test.go b/integration/single_binary_test.go index 379435625a4..b83f571e4d1 100644 --- a/integration/single_binary_test.go +++ b/integration/single_binary_test.go @@ -54,7 +54,6 @@ func TestMimirShouldStartInSingleBinaryModeWithAllMemcachedConfigured(t *testing // Compactor. "-compactor.ring.store": "consul", "-compactor.ring.consul.hostname": consul.NetworkHTTPEndpoint(), - "-compactor.cleanup-interval": "2s", // Update bucket index often. }) // Ensure Mimir successfully starts. diff --git a/integration/store_gateway_limits_hit_test.go b/integration/store_gateway_limits_hit_test.go index 334fb9bc74d..88bca0d02f4 100644 --- a/integration/store_gateway_limits_hit_test.go +++ b/integration/store_gateway_limits_hit_test.go @@ -102,12 +102,15 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - // Start Mimir read components and wait until ready. The querier and store-gateway will be ready after - // they discovered the blocks in the storage. + // Start Mimir read components and wait until ready. + // Compactor needs to start before store-gateway so that the bucket index is updated. + compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, scenario.StartAndWaitReady(compactor)) + + // The querier and store-gateway will be ready after they discovered the blocks in the storage. querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalQuerierFlags)) storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalStoreGatewayFlags)) - compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags) - require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway, compactor)) + require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway)) t.Cleanup(func() { require.NoError(t, scenario.Stop(querier, storeGateway, compactor)) }) @@ -116,15 +119,15 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) { require.NoError(t, err) // Verify we can successfully query timeseries between timeStamp1 and timeStamp2 (excluded) - rangeResultResponse, _, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp1.Add(time.Second), time.Second) + rangeResultResponse, rangeResultBody, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp1.Add(time.Second), time.Second) require.NoError(t, err) - require.Equal(t, http.StatusOK, rangeResultResponse.StatusCode) + require.Equal(t, http.StatusOK, rangeResultResponse.StatusCode, string(rangeResultBody)) // Verify we cannot successfully query timeseries between timeSeries1 and timeSeries2 (included) because the limit is hit, and the status code 422 is returned - rangeResultResponse, rangeResultBody, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp2.Add(time.Second), time.Second) + rangeResultResponse, rangeResultBody, err = client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp2.Add(time.Second), time.Second) require.NoError(t, err) - require.Equal(t, http.StatusUnprocessableEntity, rangeResultResponse.StatusCode) - require.True(t, strings.Contains(string(rangeResultBody), testData.expectedErrorKey)) + require.Equal(t, http.StatusUnprocessableEntity, rangeResultResponse.StatusCode, string(rangeResultBody)) + require.True(t, strings.Contains(string(rangeResultBody), testData.expectedErrorKey), string(rangeResultBody)) }) } } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 05da739c931..5c75eec3cf4 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -446,24 +446,14 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { // but if the store-gateway removes redundant blocks before the querier discovers them, the // consistency check on the querier will fail. } - - // Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not. - var ( - fetcher block.MetadataFetcher - err error - ) - fetcher, err = block.NewMetaFetcher( - userLogger, - u.cfg.BucketStore.MetaSyncConcurrency, - userBkt, - u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory + fetcher := NewBucketIndexMetadataFetcher( + userID, + u.bucket, + u.limits, + u.logger, fetcherReg, filters, ) - if err != nil { - return nil, err - } - bucketStoreOpts := []BucketStoreOption{ WithLogger(userLogger), WithIndexCache(u.indexCache), @@ -471,7 +461,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { WithLazyLoadingGate(u.lazyLoadingGate), } - bs, err = NewBucketStore( + bs, err := NewBucketStore( userID, userBkt, fetcher, diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 17895a78d9c..971bec49e8f 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -81,7 +81,9 @@ func TestBucketStores_InitialSync(t *testing.T) { assert.Empty(t, warnings) assert.Empty(t, seriesSet) } - + for userID := range userToMetric { + createBucketIndex(t, bucket, userID) + } require.NoError(t, stores.InitialSync(ctx)) // Query series after the initial sync. @@ -135,16 +137,17 @@ func TestBucketStores_InitialSync(t *testing.T) { func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { test.VerifyNoLeak(t) + const tenantID = "user-1" ctx := context.Background() cfg := prepareStorageConfig(t) storageDir := t.TempDir() // Generate a block for the user in the storage. - generateStorageBlock(t, storageDir, "user-1", "series_1", 10, 100, 15) - + generateStorageBlock(t, storageDir, tenantID, "series_1", 10, 100, 15) bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) + createBucketIndex(t, bucket, tenantID) // Wrap the bucket to fail the 1st Get() request. bucket = &failFirstGetBucket{Bucket: bucket} @@ -157,7 +160,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { require.NoError(t, stores.InitialSync(ctx)) // Query series after the initial sync. - seriesSet, warnings, err := querySeries(t, stores, "user-1", "series_1", 20, 40) + seriesSet, warnings, err := querySeries(t, stores, tenantID, "series_1", 20, 40) require.NoError(t, err) assert.Empty(t, warnings) require.Len(t, seriesSet, 1) @@ -216,6 +219,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { // Run an initial sync to discover 1 block. generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15) + createBucketIndex(t, bucket, userID) require.NoError(t, stores.InitialSync(ctx)) // Query a range for which we have no samples. @@ -226,6 +230,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { // Generate another block and sync blocks again. generateStorageBlock(t, storageDir, userID, metricName, 100, 200, 15) + createBucketIndex(t, bucket, userID) require.NoError(t, stores.SyncBlocks(ctx)) seriesSet, warnings, err = querySeries(t, stores, userID, metricName, 150, 180) @@ -423,6 +428,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg) require.NoError(t, err) + createBucketIndex(t, bucket, userID) require.NoError(t, stores.InitialSync(ctx)) tests := map[string]struct { @@ -501,6 +507,7 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg) require.NoError(t, err) + createBucketIndex(t, bucket, userID) require.NoError(t, stores.InitialSync(ctx)) tests := map[string]struct { @@ -653,6 +660,9 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) { bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) + for userID := range userToMetric { + createBucketIndex(t, bucket, userID) + } sharding := userShardingStrategy{} diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 23f71017593..667fbb9dc5f 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -133,7 +133,14 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) - bucketClient := &bucket.ClientMock{} + onBucketIndexGet := func() {} + + bucketClient := &bucket.ErrorInjectedBucketClient{Bucket: objstore.NewInMemBucket(), Injector: func(op bucket.Operation, name string) error { + if op == bucket.OpGet && strings.HasSuffix(name, bucketindex.IndexCompressedFilename) { + onBucketIndexGet() + } + return nil + }} // Setup the initial instance state in the ring. if testData.initialExists { @@ -149,16 +156,18 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { t.Cleanup(func() { assert.NoError(t, services.StopAndAwaitTerminated(ctx, g)) }) assert.False(t, g.ringLifecycler.IsRegistered()) - bucketClient.MockIterWithCallback("", []string{"user-1", "user-2"}, nil, func() { + for _, userID := range []string{"user-1", "user-2"} { + createBucketIndex(t, bucketClient, userID) + } + + onBucketIndexGet = func() { // During the initial sync, we expect the instance to always be in the JOINING // state within the ring. assert.True(t, g.ringLifecycler.IsRegistered()) assert.Equal(t, ring.JOINING, g.ringLifecycler.GetState()) assert.Equal(t, ringNumTokensDefault, len(g.ringLifecycler.GetTokens())) assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens) - }) - bucketClient.MockIter("user-1/", []string{}, nil) - bucketClient.MockIter("user-2/", []string{}, nil) + } // Once successfully started, the instance should be ACTIVE in the ring. require.NoError(t, services.StartAndAwaitRunning(ctx, g)) @@ -184,13 +193,11 @@ func TestStoreGateway_InitialSyncFailure(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) - bucketClient := &bucket.ClientMock{} + bucketClient := &bucket.ErrorInjectedBucketClient{Injector: func(operation bucket.Operation, s string) error { return assert.AnError }} - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), log.NewNopLogger(), nil, nil) + g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), log.NewLogfmtLogger(os.Stdout), nil, nil) require.NoError(t, err) - bucketClient.MockIter("", []string{}, errors.New("network error")) - require.NoError(t, g.StartAsync(ctx)) err = g.AwaitRunning(ctx) assert.Error(t, err) @@ -768,6 +775,7 @@ func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15) + createBucketIndex(t, bucket, userID) g, err := newStoreGateway(gatewayCfg, storageCfg, bucket, ringStore, defaultLimitsOverrides(t), log.NewNopLogger(), reg, nil) require.NoError(t, err) @@ -1431,6 +1439,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) + createBucketIndex(t, bucketClient, userID) // Prepare the request to query back all series (1 chunk per series in this test). req := &storepb.SeriesRequest{