Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: use bucket index instead of scanning the bucket #6808

Merged
merged 17 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

### Grafana Mimir

* [CHANGE] The following deprecated configurations have been removed: #6673
* [CHANGE] The following deprecated configurations have been removed: #6673 #6779 #6808
* `-querier.iterators`
* `-querier.batch-iterators`
* `-blocks-storage.bucket-store.max-chunk-pool-bytes`
Expand Down
1 change: 1 addition & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var (
"-blocks-storage.tsdb.ship-interval": "1m",
"-blocks-storage.tsdb.head-compaction-interval": "1s",
"-querier.query-store-after": "0",
"-compactor.cleanup-interval": "2s",
}
}

Expand Down
1 change: 1 addition & 0 deletions integration/getting_started_with_gossiped_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
// decrease timeouts to make test faster. should still be fine with two instances only
"-ingester.ring.observe-period": "5s", // to avoid conflicts in tokens
"-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
"-compactor.cleanup-interval": "1s", // update bucket index continuously
"-blocks-storage.bucket-store.ignore-blocks-within": "0",
"-blocks-storage.backend": "s3",
"-blocks-storage.s3.bucket-name": blocksBucketName,
Expand Down
2 changes: 1 addition & 1 deletion integration/read_write_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ func TestReadWriteModeCompaction(t *testing.T) {
// Frequently cleanup old blocks.
// While this doesn't test the compaction functionality of the compactor, it does verify that the compactor
// is correctly configured and able to interact with storage, which is the intention of this test.
"-compactor.cleanup-interval": "2s",
"-compactor.blocks-retention-period": "5s",
})

Expand Down Expand Up @@ -341,6 +340,7 @@ func startReadWriteModeCluster(t *testing.T, s *e2e.Scenario, extraFlags ...map[

flagSets := []map[string]string{
CommonStorageBackendFlags(),
BlocksStorageFlags(),
{
"-memberlist.join": "mimir-backend-1",
},
Expand Down
1 change: 0 additions & 1 deletion integration/single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMimirShouldStartInSingleBinaryModeWithAllMemcachedConfigured(t *testing
// Compactor.
"-compactor.ring.store": "consul",
"-compactor.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-compactor.cleanup-interval": "2s", // Update bucket index often.
})

// Ensure Mimir successfully starts.
Expand Down
21 changes: 12 additions & 9 deletions integration/store_gateway_limits_hit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,15 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Start Mimir read components and wait until ready. The querier and store-gateway will be ready after
// they discovered the blocks in the storage.
// Start Mimir read components and wait until ready.
// Compactor needs to start before store-gateway so that the bucket index is updated.
compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, scenario.StartAndWaitReady(compactor))

// The querier and store-gateway will be ready after they discovered the blocks in the storage.
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalQuerierFlags))
storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), mergeFlags(flags, testData.additionalStoreGatewayFlags))
compactor := e2emimir.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway, compactor))
require.NoError(t, scenario.StartAndWaitReady(querier, storeGateway))
t.Cleanup(func() {
require.NoError(t, scenario.Stop(querier, storeGateway, compactor))
})
Expand All @@ -116,15 +119,15 @@ func Test_MaxSeriesAndChunksPerQueryLimitHit(t *testing.T) {
require.NoError(t, err)

// Verify we can successfully query timeseries between timeStamp1 and timeStamp2 (excluded)
rangeResultResponse, _, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp1.Add(time.Second), time.Second)
rangeResultResponse, rangeResultBody, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp1.Add(time.Second), time.Second)
require.NoError(t, err)
require.Equal(t, http.StatusOK, rangeResultResponse.StatusCode)
require.Equal(t, http.StatusOK, rangeResultResponse.StatusCode, string(rangeResultBody))

// Verify we cannot successfully query timeseries between timeSeries1 and timeSeries2 (included) because the limit is hit, and the status code 422 is returned
rangeResultResponse, rangeResultBody, err := client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp2.Add(time.Second), time.Second)
rangeResultResponse, rangeResultBody, err = client.QueryRangeRaw("{__name__=~\"series_.+\"}", timeStamp1, timeStamp2.Add(time.Second), time.Second)
require.NoError(t, err)
require.Equal(t, http.StatusUnprocessableEntity, rangeResultResponse.StatusCode)
require.True(t, strings.Contains(string(rangeResultBody), testData.expectedErrorKey))
require.Equal(t, http.StatusUnprocessableEntity, rangeResultResponse.StatusCode, string(rangeResultBody))
require.True(t, strings.Contains(string(rangeResultBody), testData.expectedErrorKey), string(rangeResultBody))
})
}
}
Expand Down
22 changes: 6 additions & 16 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,32 +446,22 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
// but if the store-gateway removes redundant blocks before the querier discovers them, the
// consistency check on the querier will fail.
}

// Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not.
var (
fetcher block.MetadataFetcher
err error
)
fetcher, err = block.NewMetaFetcher(
userLogger,
u.cfg.BucketStore.MetaSyncConcurrency,
userBkt,
u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory
fetcher := NewBucketIndexMetadataFetcher(
userID,
u.bucket,
u.limits,
u.logger,
fetcherReg,
filters,
)
if err != nil {
return nil, err
}

bucketStoreOpts := []BucketStoreOption{
WithLogger(userLogger),
WithIndexCache(u.indexCache),
WithQueryGate(u.queryGate),
WithLazyLoadingGate(u.lazyLoadingGate),
}

bs, err = NewBucketStore(
bs, err := NewBucketStore(
userID,
userBkt,
fetcher,
Expand Down
18 changes: 14 additions & 4 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func TestBucketStores_InitialSync(t *testing.T) {
assert.Empty(t, warnings)
assert.Empty(t, seriesSet)
}

for userID := range userToMetric {
createBucketIndex(t, bucket, userID)
}
require.NoError(t, stores.InitialSync(ctx))

// Query series after the initial sync.
Expand Down Expand Up @@ -135,16 +137,17 @@ func TestBucketStores_InitialSync(t *testing.T) {
func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
test.VerifyNoLeak(t)

const tenantID = "user-1"
ctx := context.Background()
cfg := prepareStorageConfig(t)

storageDir := t.TempDir()

// Generate a block for the user in the storage.
generateStorageBlock(t, storageDir, "user-1", "series_1", 10, 100, 15)

generateStorageBlock(t, storageDir, tenantID, "series_1", 10, 100, 15)
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
createBucketIndex(t, bucket, tenantID)

// Wrap the bucket to fail the 1st Get() request.
bucket = &failFirstGetBucket{Bucket: bucket}
Expand All @@ -157,7 +160,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
require.NoError(t, stores.InitialSync(ctx))

// Query series after the initial sync.
seriesSet, warnings, err := querySeries(t, stores, "user-1", "series_1", 20, 40)
seriesSet, warnings, err := querySeries(t, stores, tenantID, "series_1", 20, 40)
require.NoError(t, err)
assert.Empty(t, warnings)
require.Len(t, seriesSet, 1)
Expand Down Expand Up @@ -216,6 +219,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {

// Run an initial sync to discover 1 block.
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
createBucketIndex(t, bucket, userID)
require.NoError(t, stores.InitialSync(ctx))

// Query a range for which we have no samples.
Expand All @@ -226,6 +230,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {

// Generate another block and sync blocks again.
generateStorageBlock(t, storageDir, userID, metricName, 100, 200, 15)
createBucketIndex(t, bucket, userID)
require.NoError(t, stores.SyncBlocks(ctx))

seriesSet, warnings, err = querySeries(t, stores, userID, metricName, 150, 180)
Expand Down Expand Up @@ -423,6 +428,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)

createBucketIndex(t, bucket, userID)
require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -501,6 +507,7 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), log.NewNopLogger(), reg)
require.NoError(t, err)
createBucketIndex(t, bucket, userID)
require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -653,6 +660,9 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
for userID := range userToMetric {
createBucketIndex(t, bucket, userID)
}

sharding := userShardingStrategy{}

Expand Down
27 changes: 18 additions & 9 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

bucketClient := &bucket.ClientMock{}
onBucketIndexGet := func() {}

bucketClient := &bucket.ErrorInjectedBucketClient{Bucket: objstore.NewInMemBucket(), Injector: func(op bucket.Operation, name string) error {
if op == bucket.OpGet && strings.HasSuffix(name, bucketindex.IndexCompressedFilename) {
onBucketIndexGet()
}
return nil
}}

// Setup the initial instance state in the ring.
if testData.initialExists {
Expand All @@ -149,16 +156,18 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
t.Cleanup(func() { assert.NoError(t, services.StopAndAwaitTerminated(ctx, g)) })
assert.False(t, g.ringLifecycler.IsRegistered())

bucketClient.MockIterWithCallback("", []string{"user-1", "user-2"}, nil, func() {
for _, userID := range []string{"user-1", "user-2"} {
createBucketIndex(t, bucketClient, userID)
}

onBucketIndexGet = func() {
// During the initial sync, we expect the instance to always be in the JOINING
// state within the ring.
assert.True(t, g.ringLifecycler.IsRegistered())
assert.Equal(t, ring.JOINING, g.ringLifecycler.GetState())
assert.Equal(t, ringNumTokensDefault, len(g.ringLifecycler.GetTokens()))
assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens)
})
bucketClient.MockIter("user-1/", []string{}, nil)
bucketClient.MockIter("user-2/", []string{}, nil)
}

// Once successfully started, the instance should be ACTIVE in the ring.
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
Expand All @@ -184,13 +193,11 @@ func TestStoreGateway_InitialSyncFailure(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

bucketClient := &bucket.ClientMock{}
bucketClient := &bucket.ErrorInjectedBucketClient{Injector: func(operation bucket.Operation, s string) error { return assert.AnError }}

g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), log.NewNopLogger(), nil, nil)
g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), log.NewLogfmtLogger(os.Stdout), nil, nil)
require.NoError(t, err)

bucketClient.MockIter("", []string{}, errors.New("network error"))

require.NoError(t, g.StartAsync(ctx))
err = g.AwaitRunning(ctx)
assert.Error(t, err)
Expand Down Expand Up @@ -768,6 +775,7 @@ func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
createBucketIndex(t, bucket, userID)

g, err := newStoreGateway(gatewayCfg, storageCfg, bucket, ringStore, defaultLimitsOverrides(t), log.NewNopLogger(), reg, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1431,6 +1439,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi

bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
createBucketIndex(t, bucketClient, userID)

// Prepare the request to query back all series (1 chunk per series in this test).
req := &storepb.SeriesRequest{
Expand Down