diff --git a/CHANGELOG.md b/CHANGELOG.md index 953fa7d3cab..e3c341a2faa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -181,6 +181,9 @@ * [CHANGE] Compactor: change default of `-compactor.first-level-compaction-wait-period` to 25m. #5128 * [CHANGE] Ruler: changed default of `-ruler.poll-interval` from `1m` to `10m`. Starting from this release, the configured rule groups will also be re-synced each time they're modified calling the ruler configuration API. #5170 * [FEATURE] Query-frontend: add `-query-frontend.log-query-request-headers` to enable logging of request headers in query logs. #5030 +* [FEATURE] Store-gateway: add experimental feature to retain lazy-loaded index headers between restarts by eagerly loading them during startup. This is disabled by default and can only be enabled if lazy loading is enabled. To enable this set the following: #5606 + * `-blocks-storage.bucket-store.index-header-lazy-loading-enabled` must be set to true + * `-blocks-storage.bucket-store.index-header.eager-loading-startup-enabled` must be set to true * [ENHANCEMENT] Add per-tenant limit `-validation.max-native-histogram-buckets` to be able to ignore native histogram samples that have too many buckets. #4765 * [ENHANCEMENT] Store-gateway: reduce memory usage in some LabelValues calls. #4789 * [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797 #4830 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index c07c7181185..9fa91041634 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -7516,13 +7516,24 @@ "kind": "field", "name": "max_idle_file_handles", "required": false, - "desc": "Maximum number of idle file handles the store-gateway keeps open for each index header file.", + "desc": "Maximum number of idle file handles the store-gateway keeps open for each index-header file.", "fieldValue": null, "fieldDefaultValue": 1, "fieldFlag": "blocks-storage.bucket-store.index-header.max-idle-file-handles", "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "eager_loading_startup_enabled", + "required": false, + "desc": "If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "blocks-storage.bucket-store.index-header.eager-loading-startup-enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "verify_on_load", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index f92f8e7a226..4560b6b6548 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -503,8 +503,10 @@ Usage of ./cmd/mimir/mimir: If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s) -blocks-storage.bucket-store.index-header-sparse-persistence-enabled [experimental] If enabled, store-gateway will persist a sparse version of the index-header to disk on construction and load sparse index-headers from disk instead of the whole index-header. + -blocks-storage.bucket-store.index-header.eager-loading-startup-enabled + [experimental] If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled. -blocks-storage.bucket-store.index-header.max-idle-file-handles uint - Maximum number of idle file handles the store-gateway keeps open for each index header file. (default 1) + Maximum number of idle file handles the store-gateway keeps open for each index-header file. (default 1) -blocks-storage.bucket-store.index-header.verify-on-load If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading. -blocks-storage.bucket-store.max-chunk-pool-bytes uint diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index 0661d615ff8..eb00cf5e806 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -3412,10 +3412,16 @@ bucket_store: index_header: # (advanced) Maximum number of idle file handles the store-gateway keeps - # open for each index header file. + # open for each index-header file. # CLI flag: -blocks-storage.bucket-store.index-header.max-idle-file-handles [max_idle_file_handles: | default = 1] + # (experimental) If enabled, store-gateway will periodically persist block + # IDs of lazy loaded index-headers and load them eagerly during startup. It + # is not valid to enable this if index-header lazy loading is disabled. + # CLI flag: -blocks-storage.bucket-store.index-header.eager-loading-startup-enabled + [eager_loading_startup_enabled: | default = false] + # (advanced) If true, verify the checksum of index headers upon loading them # (either on startup or lazily when lazy loading is enabled). Setting to # true helps detect disk corruption at the cost of slowing down index header diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d34f70e8f4e..2589fadcf13 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -498,6 +498,9 @@ func (cfg *BucketStoreConfig) Validate(logger log.Logger) error { if cfg.SeriesSelectionStrategyName == WorstCasePostingsStrategy && cfg.SelectionStrategies.WorstCaseSeriesPreference <= 0 { return errors.New("invalid worst-case series preference; must be positive") } + if err := cfg.IndexHeader.Validate(cfg.IndexHeaderLazyLoadingEnabled); err != nil { + return errors.Wrap(err, "index-header configuration") + } if cfg.IndexHeaderLazyLoadingConcurrency < 0 { return errInvalidIndexHeaderLazyLoadingConcurrency } diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index a0be36b51e6..891acc9c96a 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -42,6 +42,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/sharding" + "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/storegateway/chunkscache" @@ -227,18 +228,11 @@ func NewBucketStore( bkt objstore.InstrumentedBucketReader, fetcher block.MetadataFetcher, dir string, - maxSeriesPerBatch int, - numChunksRangesPerSeries int, + bucketStoreConfig tsdb.BucketStoreConfig, postingsStrategy postingsSelectionStrategy, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, partitioners blockPartitioners, - blockSyncConcurrency int, - postingOffsetsInMemSampling int, - indexHeaderCfg indexheader.Config, - lazyIndexReaderEnabled bool, - lazyIndexReaderIdleTimeout time.Duration, - sparsePersistenceEnabled bool, seriesHashCache *hashcache.SeriesHashCache, metrics *BucketStoreMetrics, options ...BucketStoreOption, @@ -252,19 +246,19 @@ func NewBucketStore( chunksCache: chunkscache.NoopCache{}, blocks: map[ulid.ULID]*bucketBlock{}, blockSet: newBucketBlockSet(), - blockSyncConcurrency: blockSyncConcurrency, + blockSyncConcurrency: bucketStoreConfig.BlockSyncConcurrency, queryGate: gate.NewNoop(), lazyLoadingGate: gate.NewNoop(), chunksLimiterFactory: chunksLimiterFactory, seriesLimiterFactory: seriesLimiterFactory, partitioners: partitioners, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - indexHeaderCfg: indexHeaderCfg, + postingOffsetsInMemSampling: bucketStoreConfig.PostingOffsetsInMemSampling, + indexHeaderCfg: bucketStoreConfig.IndexHeader, seriesHashCache: seriesHashCache, metrics: metrics, userID: userID, - maxSeriesPerBatch: maxSeriesPerBatch, - numChunksRangesPerSeries: numChunksRangesPerSeries, + maxSeriesPerBatch: bucketStoreConfig.StreamingBatchSize, + numChunksRangesPerSeries: bucketStoreConfig.ChunkRangesPerSeries, postingsStrategy: postingsStrategy, } @@ -272,8 +266,13 @@ func NewBucketStore( option(s) } + lazyLoadedSnapshotConfig := indexheader.LazyLoadedHeadersSnapshotConfig{ + Path: dir, + UserID: userID, + EagerLoadingEnabled: bucketStoreConfig.IndexHeader.IndexHeaderEagerLoadingStartupEnabled, + } // Depend on the options - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, sparsePersistenceEnabled, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeaderLazyLoadingEnabled, bucketStoreConfig.IndexHeaderLazyLoadingIdleTimeout, bucketStoreConfig.IndexHeaderSparsePersistenceEnabled, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedSnapshotConfig) if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") @@ -306,6 +305,10 @@ func (s *BucketStore) Stats() BucketStoreStats { // SyncBlocks synchronizes the stores state with the Bucket bucket. // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { + return s.syncBlocks(ctx, false) +} + +func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error { metas, _, metaFetchErr := s.fetcher.Fetch(ctx) // For partial view allow adding new blocks at least. if metaFetchErr != nil && metas == nil { @@ -319,7 +322,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { wg.Add(1) go func() { for meta := range blockc { - if err := s.addBlock(ctx, meta); err != nil { + if err := s.addBlock(ctx, meta, initialSync); err != nil { continue } } @@ -361,7 +364,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { // InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer // present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup. func (s *BucketStore) InitialSync(ctx context.Context) error { - if err := s.SyncBlocks(ctx); err != nil { + if err := s.syncBlocks(ctx, true); err != nil { return errors.Wrap(err, "sync block") } @@ -397,7 +400,7 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { return s.blocks[id] } -func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error) { +func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSync bool) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) start := time.Now() @@ -423,6 +426,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error meta.ULID, s.postingOffsetsInMemSampling, s.indexHeaderCfg, + initialSync, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 4f503f8851e..452edfd7889 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -197,18 +197,22 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS objstore.WithNoopInstr(bkt), metaFetcher, cfg.tempDir, - cfg.maxSeriesPerBatch, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: cfg.maxSeriesPerBatch, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 20, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: true, + }, + IndexHeaderLazyLoadingEnabled: true, + IndexHeaderLazyLoadingIdleTimeout: time.Minute, + IndexHeaderSparsePersistenceEnabled: true, + }, cfg.postingsStrategy, cfg.chunksLimiterFactory, cfg.seriesLimiterFactory, newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 20, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - true, - time.Minute, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(s.metricsRegistry), storeOpts..., diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index f0333f6a777..2e3bde2d12e 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -482,8 +482,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { userBkt, fetcher, u.syncDirForUser(userID), - u.cfg.BucketStore.StreamingBatchSize, - u.cfg.BucketStore.ChunkRangesPerSeries, + u.cfg.BucketStore, selectPostingsStrategy(u.logger, u.cfg.BucketStore.SeriesSelectionStrategyName, u.cfg.BucketStore.SelectionStrategies.WorstCaseSeriesPreference), NewChunksLimiterFactory(func() uint64 { return uint64(u.limits.MaxChunksPerQuery(userID)) @@ -492,12 +491,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { return uint64(u.limits.MaxFetchedSeriesPerQuery(userID)) }), u.partitioners, - u.cfg.BucketStore.BlockSyncConcurrency, - u.cfg.BucketStore.PostingOffsetsInMemSampling, - u.cfg.BucketStore.IndexHeader, - u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled, - u.cfg.BucketStore.IndexHeaderLazyLoadingIdleTimeout, - u.cfg.BucketStore.IndexHeaderSparsePersistenceEnabled, u.seriesHashCache, u.bucketStoreMetrics, bucketStoreOpts..., diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 1b20add0693..b5671d1cb7a 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1480,18 +1480,22 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries ibkt, f, tmpDir, - testData.maxSeriesPerBatch, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: testData.maxSeriesPerBatch, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 1, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(0), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 1, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(reg), testData.options..., @@ -1599,24 +1603,29 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(logger, 1, instrumentedBucket, "", nil, nil) assert.NoError(t, err) + // Create the bucket store. store, err := NewBucketStore( "test-user", instrumentedBucket, metaFetcher, tmpDir, - batchSize, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: batchSize, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 1, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(0), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 1, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, // Lazy index-header loading disabled. - 0, - true, // Sparse index-header persistence enabled. hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -1764,7 +1773,7 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { logger: logger, indexCache: indexCache, chunksCache: chunkscache.NoopCache{}, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, true, gate.NewNoop(), indexheader.NewReaderPoolMetrics(nil)), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, true, gate.NewNoop(), indexheader.NewReaderPoolMetrics(nil), indexheader.LazyLoadedHeadersSnapshotConfig{}), metrics: NewBucketStoreMetrics(nil), blockSet: &bucketBlockSet{blocks: []*bucketBlock{b1, b2}}, blocks: map[ulid.ULID]*bucketBlock{ @@ -1925,18 +1934,22 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -1981,18 +1994,22 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -2044,18 +2061,22 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) { instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -2166,18 +2187,22 @@ func testBucketStoreSeriesBlockWithMultipleChunks( instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(100000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -2329,18 +2354,22 @@ func TestBucketStore_Series_Limits(t *testing.T) { instrBkt, fetcher, tmpDir, - batchSize, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: batchSize, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(testData.chunksLimit), newStaticSeriesLimiterFactory(testData.seriesLimit), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), ) @@ -2445,18 +2474,22 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS instrBkt, fetcher, tmpDir, - maxSeriesPerBatch, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: maxSeriesPerBatch, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), opts..., diff --git a/pkg/storegateway/indexheader/header.go b/pkg/storegateway/indexheader/header.go index 4371a21ca0f..b981f0346cb 100644 --- a/pkg/storegateway/indexheader/header.go +++ b/pkg/storegateway/indexheader/header.go @@ -18,6 +18,8 @@ import ( // NotFoundRangeErr is an error returned by PostingsOffset when there is no posting for given name and value pairs. var NotFoundRangeErr = errors.New("range not found") //nolint:revive +var errEagerLoadingStartupEnabledLazyLoadDisabled = errors.New("invalid configuration: store-gateway index header eager-loading is enabled, but lazy-loading is disabled") + // Reader is an interface allowing to read essential, minimal number of index fields from the small portion of index file called header. type Reader interface { io.Closer @@ -52,11 +54,20 @@ type Reader interface { } type Config struct { - MaxIdleFileHandles uint `yaml:"max_idle_file_handles" category:"advanced"` - VerifyOnLoad bool `yaml:"verify_on_load" category:"advanced"` + MaxIdleFileHandles uint `yaml:"max_idle_file_handles" category:"advanced"` + IndexHeaderEagerLoadingStartupEnabled bool `yaml:"eager_loading_startup_enabled" category:"experimental"` + VerifyOnLoad bool `yaml:"verify_on_load" category:"advanced"` } func (cfg *Config) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.UintVar(&cfg.MaxIdleFileHandles, prefix+"max-idle-file-handles", 1, "Maximum number of idle file handles the store-gateway keeps open for each index header file.") + f.UintVar(&cfg.MaxIdleFileHandles, prefix+"max-idle-file-handles", 1, "Maximum number of idle file handles the store-gateway keeps open for each index-header file.") + f.BoolVar(&cfg.IndexHeaderEagerLoadingStartupEnabled, prefix+"eager-loading-startup-enabled", false, "If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled.") f.BoolVar(&cfg.VerifyOnLoad, prefix+"verify-on-load", false, "If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading.") } + +func (cfg *Config) Validate(lazyLoadingEnabled bool) error { + if !lazyLoadingEnabled && cfg.IndexHeaderEagerLoadingStartupEnabled { + return errEagerLoadingStartupEnabledLazyLoadDisabled + } + return nil +} diff --git a/pkg/storegateway/indexheader/lazy_binary_reader.go b/pkg/storegateway/indexheader/lazy_binary_reader.go index 3a971a900d5..7109a2d6095 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader.go @@ -85,6 +85,8 @@ type LazyBinaryReader struct { // Keep track of the last time it was used. usedAt *atomic.Int64 + + blockID ulid.ULID } // NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist @@ -127,6 +129,7 @@ func NewLazyBinaryReader( usedAt: atomic.NewInt64(time.Now().UnixNano()), onClosed: onClosed, readerFactory: readerFactory, + blockID: id, lazyLoadingGate: lazyLoadingGate, ctx: ctx, }, nil @@ -221,6 +224,19 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) { return r.reader.LabelNames() } +// EagerLoad attempts to eagerly load this index header. +func (r *LazyBinaryReader) EagerLoad() { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + level.Warn(r.logger).Log("msg", "eager loading of lazy loaded index-header failed; skipping", "err", err) + return + } + + r.usedAt.Store(time.Now().UnixNano()) +} + // load ensures the underlying binary index-header reader has been successfully loaded. Returns // an error on failure. This function MUST be called with the read lock already acquired. func (r *LazyBinaryReader) load() error { diff --git a/pkg/storegateway/indexheader/lazy_binary_reader_test.go b/pkg/storegateway/indexheader/lazy_binary_reader_test.go index c8a5eedc9ac..12424d4cd56 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader_test.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader_test.go @@ -39,25 +39,8 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) }) } -func prepareTempDirBucketAndBlock(t *testing.T) (string, *filesystem.Bucket, ulid.ULID) { - tmpDir := filepath.Join(t.TempDir(), "test-indexheader") - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, bkt.Close()) }) - - // Create block. - blockID, err := block.CreateBlock(context.Background(), tmpDir, []labels.Labels{ - labels.FromStrings("a", "1"), - labels.FromStrings("a", "2"), - labels.FromStrings("a", "3"), - }, 100, 0, 1000, labels.FromStrings("ext1", "1")) - require.NoError(t, err) - require.NoError(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) - return tmpDir, bkt, blockID -} - func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -86,7 +69,7 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { } func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) // Write a corrupted index-header for the block. headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) @@ -114,7 +97,7 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { } func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -150,7 +133,7 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { } func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -188,7 +171,7 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { // Run the test for a fixed amount of time. const runDuration = 5 * time.Second - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -236,6 +219,56 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { }) } +func TestNewLazyBinaryReader_EagerLoadLazyLoadedIndexHeaders(t *testing.T) { + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) + + testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { + r.EagerLoad() + + require.NoError(t, err) + require.NotNil(t, r.reader, "t.reader must already eagerly loaded") + t.Cleanup(func() { + require.NoError(t, r.Close()) + }) + + require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) + + // The index should already be loaded, the following call will return reader already loaded above + v, err := r.IndexVersion() + require.NoError(t, err) + require.Equal(t, 2, v) + require.True(t, r.reader != nil) + require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) + + labelNames, err := r.LabelNames() + require.NoError(t, err) + require.Equal(t, []string{"a"}, labelNames) + require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) + }) +} + +func initBucketAndBlocksForTest(t *testing.T) (string, *filesystem.Bucket, ulid.ULID) { + ctx := context.Background() + + tmpDir := filepath.Join(t.TempDir(), "test-indexheader") + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, bkt.Close()) }) + + // Create block. + blockID, err := block.CreateBlock(ctx, tmpDir, []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), + }, 100, 0, 1000, labels.FromStrings("ext1", "1")) + require.NoError(t, err) + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) + return tmpDir, bkt, blockID +} + func testLazyBinaryReader(t *testing.T, bkt objstore.BucketReader, dir string, id ulid.ULID, test func(t *testing.T, r *LazyBinaryReader, err error)) { ctx := context.Background() logger := log.NewNopLogger() @@ -250,7 +283,7 @@ func testLazyBinaryReader(t *testing.T, bkt objstore.BucketReader, dir string, i // TestLazyBinaryReader_ShouldBlockMaxConcurrency tests if LazyBinaryReader blocks // concurrent loads such that it doesn't pass the configured maximum. func TestLazyBinaryReader_ShouldBlockMaxConcurrency(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) logger := log.NewNopLogger() @@ -305,7 +338,7 @@ func TestLazyBinaryReader_ShouldBlockMaxConcurrency(t *testing.T) { } func TestLazyBinaryReader_ConcurrentLoadingOfSameIndexReader(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) const ( maxLazyLoadConcurrency = 1 diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index 876f73cff51..4faa2cd9e6b 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -6,7 +6,11 @@ package indexheader import ( + "bytes" "context" + "encoding/json" + "os" + "path/filepath" "sync" "time" @@ -17,8 +21,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/grafana/mimir/pkg/util/atomicfs" ) +var lazyLoadedHeadersListFileName = "lazy-loaded.json" + // ReaderPoolMetrics holds metrics tracked by ReaderPool. type ReaderPoolMetrics struct { lazyReader *LazyBinaryReaderMetrics @@ -51,27 +59,96 @@ type ReaderPool struct { close chan struct{} // Keep track of all readers managed by the pool. - lazyReadersMx sync.Mutex - lazyReaders map[*LazyBinaryReader]struct{} + lazyReadersMx sync.Mutex + lazyReaders map[*LazyBinaryReader]struct{} + preShutdownLoadedBlocks *lazyLoadedHeadersSnapshot +} + +// LazyLoadedHeadersSnapshotConfig stores information needed to track lazy loaded index headers. +type LazyLoadedHeadersSnapshotConfig struct { + // Path stores where lazy loaded blocks will be tracked in a single file per tenant + Path string + UserID string + EagerLoadingEnabled bool +} + +type lazyLoadedHeadersSnapshot struct { + // IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond. + IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"` + UserID string `json:"user_id"` } -// NewReaderPool makes a new ReaderPool and starts a background task for unloading idle Readers if enabled. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics) *ReaderPool { - p := newReaderPool(logger, lazyReaderEnabled, lazyReaderIdleTimeout, sparsePersistenceEnabled, lazyLoadingGate, metrics) +// persist atomically writes this snapshot to persistDir. +func (l lazyLoadedHeadersSnapshot) persist(persistDir string) error { + // Create temporary path for fsync. + // We don't use temporary folder because the process might not have access to the temporary folder. + tmpPath := filepath.Join(persistDir, "tmp-"+lazyLoadedHeadersListFileName) + // the actual path we want to store the file in + finalPath := filepath.Join(persistDir, lazyLoadedHeadersListFileName) + + data, err := json.Marshal(l) + if err != nil { + return err + } + + return atomicfs.CreateFileAndMove(tmpPath, finalPath, bytes.NewReader(data)) +} + +// NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers and persisting a list of loaded Readers to disk. +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedSnapshotConfig LazyLoadedHeadersSnapshotConfig) *ReaderPool { + var snapshot *lazyLoadedHeadersSnapshot + if lazyReaderEnabled && lazyLoadedSnapshotConfig.EagerLoadingEnabled { + lazyLoadedSnapshotFileName := filepath.Join(lazyLoadedSnapshotConfig.Path, lazyLoadedHeadersListFileName) + var err error + snapshot, err = loadLazyLoadedHeadersSnapshot(lazyLoadedSnapshotFileName) + if err != nil { + level.Warn(logger).Log("msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", "file", lazyLoadedSnapshotFileName, "err", err, "tenant", lazyLoadedSnapshotConfig.UserID) + } + // We will remove the file regardless whether err is nil or not nil. + // In the case such as snapshot loading causing OOM, we will still + // remove the snapshot and lazy load after server is restarted. + if err := os.Remove(lazyLoadedSnapshotFileName); err != nil { + level.Warn(logger).Log("msg", "removing the lazy-loaded index-header snapshot failed", "file", lazyLoadedSnapshotFileName, "err", err) + } + } + + p := newReaderPool(logger, lazyReaderEnabled, lazyReaderIdleTimeout, sparsePersistenceEnabled, lazyLoadingGate, metrics, snapshot) // Start a goroutine to close idle readers (only if required). if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { checkFreq := p.lazyReaderIdleTimeout / 10 go func() { + tickerIdleReader := time.NewTicker(checkFreq) + defer tickerIdleReader.Stop() + + var lazyLoadC <-chan time.Time + + if lazyLoadedSnapshotConfig.EagerLoadingEnabled { + tickerLazyLoadPersist := time.NewTicker(time.Minute) + defer tickerLazyLoadPersist.Stop() + + lazyLoadC = tickerLazyLoadPersist.C + } + for { select { case <-p.close: return - case <-time.After(checkFreq): + case <-tickerIdleReader.C: p.closeIdleReaders() + case <-lazyLoadC: + snapshot := lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: p.LoadedBlocks(), + UserID: lazyLoadedSnapshotConfig.UserID, + } + + if err := snapshot.persist(lazyLoadedSnapshotConfig.Path); err != nil { + level.Warn(p.logger).Log("msg", "failed to persist list of lazy-loaded index headers", "err", err) + } } } + }() } @@ -79,23 +156,37 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime } // newReaderPool makes a new ReaderPool. -func newReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics) *ReaderPool { +func newReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedHeadersSnapshot *lazyLoadedHeadersSnapshot) *ReaderPool { return &ReaderPool{ logger: logger, metrics: metrics, lazyReaderEnabled: lazyReaderEnabled, lazyReaderIdleTimeout: lazyReaderIdleTimeout, sparsePersistenceEnabled: sparsePersistenceEnabled, - lazyLoadingGate: lazyLoadingGate, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), + preShutdownLoadedBlocks: lazyLoadedHeadersSnapshot, + lazyLoadingGate: lazyLoadingGate, + } +} + +func loadLazyLoadedHeadersSnapshot(fileName string) (*lazyLoadedHeadersSnapshot, error) { + snapshotBytes, err := os.ReadFile(fileName) + if err != nil { + return nil, err + } + snapshot := &lazyLoadedHeadersSnapshot{} + err = json.Unmarshal(snapshotBytes, snapshot) + if err != nil { + return nil, err } + return snapshot, nil } // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg Config) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg Config, initialSync bool) (Reader, error) { var readerFactory func() (Reader, error) var reader Reader var err error @@ -105,7 +196,19 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt } if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, readerFactory, logger, bkt, dir, id, p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate) + lazyBinaryReader, lazyErr := NewLazyBinaryReader(ctx, readerFactory, logger, bkt, dir, id, p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate) + if lazyErr != nil { + return nil, lazyErr + } + + // we only try to eager load only during initialSync + if initialSync && p.preShutdownLoadedBlocks != nil { + // we only eager load if we have preShutdownLoadedBlocks for the given block id + if p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime[id] > 0 { + lazyBinaryReader.EagerLoad() + } + } + reader, err = lazyBinaryReader, lazyErr } else { reader, err = readerFactory() } @@ -171,3 +274,18 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { // be used anymore, so we can automatically remove it from the pool. delete(p.lazyReaders, r) } + +// LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds. +func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + blocks := make(map[ulid.ULID]int64, len(p.lazyReaders)) + for r := range p.lazyReaders { + if r.reader != nil { + blocks[r.blockID] = r.usedAt.Load() / int64(time.Millisecond) + } + } + + return blocks +} diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storegateway/indexheader/reader_pool_test.go index d986e9cc5a2..b5be645e8f6 100644 --- a/pkg/storegateway/indexheader/reader_pool_test.go +++ b/pkg/storegateway/indexheader/reader_pool_test.go @@ -7,6 +7,8 @@ package indexheader import ( "context" + "crypto/rand" + "fmt" "os" "path/filepath" "testing" @@ -14,97 +16,136 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/gate" + "github.com/oklog/ulid" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" + "go.uber.org/atomic" "github.com/grafana/mimir/pkg/storage/tsdb/block" ) func TestReaderPool_NewBinaryReader(t *testing.T) { tests := map[string]struct { - lazyReaderEnabled bool - lazyReaderIdleTimeout time.Duration + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + eagerLoadReaderEnabled bool + initialSync bool + createLazyLoadedHeadersSnapshotFn func(blockId ulid.ULID) lazyLoadedHeadersSnapshot + expectedLoadCountMetricBeforeLabelNamesCall int + expectedLoadCountMetricAfterLabelNamesCall int }{ "lazy reader is disabled": { - lazyReaderEnabled: false, + lazyReaderEnabled: false, + expectedLoadCountMetricAfterLabelNamesCall: 0, // no lazy loading }, "lazy reader is enabled but close on idle timeout is disabled": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: 0, + lazyReaderEnabled: true, + lazyReaderIdleTimeout: 0, + expectedLoadCountMetricAfterLabelNamesCall: 1, }, "lazy reader and close on idle timeout are both enabled": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: time.Minute, + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + expectedLoadCountMetricAfterLabelNamesCall: 1, + }, + "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + eagerLoadReaderEnabled: true, + initialSync: true, + expectedLoadCountMetricBeforeLabelNamesCall: 1, // the index header will be eagerly loaded before the operation + expectedLoadCountMetricAfterLabelNamesCall: 1, + createLazyLoadedHeadersSnapshotFn: func(blockId ulid.ULID) lazyLoadedHeadersSnapshot { + return lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{blockId: time.Now().UnixMilli()}, + UserID: "anonymous", + } + }, + }, + "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + eagerLoadReaderEnabled: true, + initialSync: false, + expectedLoadCountMetricBeforeLabelNamesCall: 0, // the index header is not eager loaded if not during initial-sync + expectedLoadCountMetricAfterLabelNamesCall: 1, + createLazyLoadedHeadersSnapshotFn: func(blockId ulid.ULID) lazyLoadedHeadersSnapshot { + return lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{blockId: time.Now().UnixMilli()}, + UserID: "anonymous", + } + }, + }, + "block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + eagerLoadReaderEnabled: true, + initialSync: true, + expectedLoadCountMetricBeforeLabelNamesCall: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file. + expectedLoadCountMetricAfterLabelNamesCall: 1, + createLazyLoadedHeadersSnapshotFn: func(_ ulid.ULID) lazyLoadedHeadersSnapshot { + // let's create a random fake blockID to be stored in lazy loaded headers file + fakeBlockID := ulid.MustNew(ulid.Now(), rand.Reader) + // this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test + + return lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()}, + UserID: "anonymous", + } + }, }, } ctx := context.Background() - - tmpDir, err := os.MkdirTemp("", "test-indexheader") - require.NoError(t, err) - defer func() { require.NoError(t, os.RemoveAll(tmpDir)) }() - - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - require.NoError(t, err) - defer func() { require.NoError(t, bkt.Close()) }() - - // Create block. - blockID, err := block.CreateBlock(ctx, tmpDir, []labels.Labels{ - labels.FromStrings("a", "1"), - labels.FromStrings("a", "2"), - labels.FromStrings("a", "3"), - }, 100, 0, 1000, labels.FromStrings("ext1", "1")) - require.NoError(t, err) - require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, true, gate.NewNoop(), NewReaderPoolMetrics(nil)) + snapshotConfig := LazyLoadedHeadersSnapshotConfig{ + Path: tmpDir, + UserID: "anonymous", + EagerLoadingEnabled: testData.eagerLoadReaderEnabled, + } + if testData.createLazyLoadedHeadersSnapshotFn != nil { + lazyLoadedSnapshot := testData.createLazyLoadedHeadersSnapshotFn(blockID) + err := lazyLoadedSnapshot.persist(snapshotConfig.Path) + require.NoError(t, err) + } + + metrics := NewReaderPoolMetrics(nil) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, true, gate.NewNoop(), metrics, snapshotConfig) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{IndexHeaderEagerLoadingStartupEnabled: testData.eagerLoadReaderEnabled}, testData.initialSync) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() + require.Equal(t, float64(testData.expectedLoadCountMetricBeforeLabelNamesCall), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + // Ensure it can read data. labelNames, err := r.LabelNames() require.NoError(t, err) require.Equal(t, []string{"a"}, labelNames) + + require.Equal(t, float64(testData.expectedLoadCountMetricAfterLabelNamesCall), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) }) } } func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { const idleTimeout = time.Second - - ctx := context.Background() - - tmpDir, err := os.MkdirTemp("", "test-indexheader") - require.NoError(t, err) + ctx, tmpDir, bkt, blockID, metrics := prepareReaderPool(t) defer func() { require.NoError(t, os.RemoveAll(tmpDir)) }() - - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - require.NoError(t, err) defer func() { require.NoError(t, bkt.Close()) }() - // Create block. - blockID, err := block.CreateBlock(ctx, tmpDir, []labels.Labels{ - labels.FromStrings("a", "1"), - labels.FromStrings("a", "2"), - labels.FromStrings("a", "3"), - }, 100, 0, 1000, labels.FromStrings("ext1", "1")) - require.NoError(t, err) - require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) - - metrics := NewReaderPoolMetrics(nil) // Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle // Reader instances. We'll manually invoke the cleanup task when we need it as part of this test. - pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, true, gate.NewNoop(), metrics) + pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, true, gate.NewNoop(), metrics, nil) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() @@ -138,3 +179,101 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { require.Equal(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) require.Equal(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) } + +func TestReaderPool_LoadedBlocks(t *testing.T) { + usedAt := time.Now() + id, err := ulid.New(ulid.Now(), rand.Reader) + require.NoError(t, err) + + lb := LazyBinaryReader{ + blockID: id, + usedAt: atomic.NewInt64(usedAt.UnixNano()), + // we just set to make reader != nil + reader: &StreamBinaryReader{}, + } + rp := ReaderPool{ + lazyReaderEnabled: true, + lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}}, + } + require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks()) +} + +func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) { + const idleTimeout = time.Second + ctx, tmpDir, bkt, blockID, metrics := prepareReaderPool(t) + + // Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle + // Reader instances. We'll manually invoke the cleanup task when we need it as part of this test. + pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, false, gate.NewNoop(), metrics, nil) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false) + require.NoError(t, err) + defer func() { require.NoError(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + require.NoError(t, err) + require.Equal(t, []string{"a"}, labelNames) + require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + + snapshot := lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: pool.LoadedBlocks(), + UserID: "anonymous", + } + + err = snapshot.persist(tmpDir) + require.NoError(t, err) + + persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName) + persistedData, err := os.ReadFile(persistedFile) + require.NoError(t, err) + + var expected string + // we know that there is only one lazyReader, hence just use formatter to set the ULID and timestamp. + require.Equal(t, 1, len(pool.lazyReaders), "expecting only one lazyReaders") + for r := range pool.lazyReaders { + expected = fmt.Sprintf(`{"index_header_last_used_time":{"%s":%d},"user_id":"anonymous"}`, r.blockID, r.usedAt.Load()/int64(time.Millisecond)) + } + require.JSONEq(t, expected, string(persistedData)) + + // Wait enough time before checking it. + time.Sleep(idleTimeout * 2) + pool.closeIdleReaders() + + // LoadedBlocks will update the IndexHeaderLastUsedTime map with the removal of + // idle blocks. + snapshot.IndexHeaderLastUsedTime = pool.LoadedBlocks() + err = snapshot.persist(tmpDir) + require.NoError(t, err) + + persistedData, err = os.ReadFile(persistedFile) + require.NoError(t, err) + + require.JSONEq(t, `{"index_header_last_used_time":{},"user_id":"anonymous"}`, string(persistedData), "index_header_last_used_time should be cleared") +} + +func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) { + ctx := context.Background() + + tmpDir := t.TempDir() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, bkt.Close()) + }) + + // Create block. + blockID, err := block.CreateBlock(ctx, tmpDir, []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), + }, 100, 0, 1000, labels.FromStrings("ext1", "1")) + require.NoError(t, err) + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) + + metrics := NewReaderPoolMetrics(nil) + return ctx, tmpDir, bkt, blockID, metrics +} diff --git a/pkg/util/atomicfs/fsync.go b/pkg/util/atomicfs/fsync.go new file mode 100644 index 00000000000..3e9fe27e52a --- /dev/null +++ b/pkg/util/atomicfs/fsync.go @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package atomicfs + +import ( + "bytes" + "io" + "os" + "path" + + "github.com/grafana/dskit/multierror" +) + +// CreateFile creates a file in the filePath, write the data into the file and then execute +// fsync operation to make sure the file and its content are stored atomically. If the file already +// exists, it will be overwritten. +func CreateFile(filePath string, data io.Reader) error { + // Write the file, fsync it, then fsync the containing directory in order to guarantee + // it is persisted to disk. From https://man7.org/linux/man-pages/man2/fsync.2.html + // + // > Calling fsync() does not necessarily ensure that the entry in the + // > directory containing the file has also reached disk. For that an + // > explicit fsync() on a file descriptor for the directory is also + // > needed. + file, err := os.Create(filePath) + if err != nil { + return err + } + + merr := multierror.New() + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(data) + merr.Add(err) + _, err = file.Write(buf.Bytes()) + merr.Add(err) + merr.Add(file.Sync()) + merr.Add(file.Close()) + + if err := merr.Err(); err != nil { + return err + } + + dir, err := os.OpenFile(path.Dir(file.Name()), os.O_RDONLY, 0777) + if err != nil { + return err + } + + merr.Add(dir.Sync()) + merr.Add(dir.Close()) + return merr.Err() +} + +// CreateFileAndMove creates a file in the tmpPath, write the data into the file and then execute +// fsync operation to make sure the file and its content are stored atomically. After that it will move +// file to the finalPath to make sure if there is a failure in writing to the tmpPath, we can retry and +// ensure integrity of the file in the finalPath. +func CreateFileAndMove(tmpPath, finalPath string, data io.Reader) error { + if err := CreateFile(tmpPath, data); err != nil { + return err + } + defer os.Remove(tmpPath) + // we rely on the atomicity of this on Unix systems for this method to behave correctly + return os.Rename(tmpPath, finalPath) +} diff --git a/pkg/util/atomicfs/fsync_test.go b/pkg/util/atomicfs/fsync_test.go new file mode 100644 index 00000000000..ecbb9b9c6fd --- /dev/null +++ b/pkg/util/atomicfs/fsync_test.go @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package atomicfs + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCreateFile(t *testing.T) { + path := filepath.Join(t.TempDir(), "test") + wantData := "test1" + if err := CreateFile(path, strings.NewReader(wantData)); err != nil { + t.Errorf("CreateFile() error = %v, wantErr %v", err, false) + } + data, err := os.ReadFile(path) + require.NoError(t, err) + + require.Equal(t, wantData, string(data)) +} + +func TestCreateFileAndMove(t *testing.T) { + tmpPath := filepath.Join(t.TempDir(), "test") + finalPath := filepath.Join(t.TempDir(), "testFinal") + wantData := "test1" + err := CreateFileAndMove(tmpPath, finalPath, strings.NewReader(wantData)) + require.NoError(t, err) + _, err = os.ReadFile(tmpPath) + require.Error(t, err, "we expect error because the file in tmpPath should already been removed") + + data, err := os.ReadFile(finalPath) + require.NoError(t, err) + + require.Equal(t, wantData, string(data)) +} diff --git a/pkg/util/shutdownmarker/shutdown_marker.go b/pkg/util/shutdownmarker/shutdown_marker.go index 853a4a68165..a07a64f5493 100644 --- a/pkg/util/shutdownmarker/shutdown_marker.go +++ b/pkg/util/shutdownmarker/shutdown_marker.go @@ -5,9 +5,12 @@ package shutdownmarker import ( "os" "path" + "strings" "time" "github.com/grafana/dskit/multierror" + + "github.com/grafana/mimir/pkg/util/atomicfs" ) const shutdownMarkerFilename = "shutdown-requested.txt" @@ -16,36 +19,7 @@ const shutdownMarkerFilename = "shutdown-requested.txt" // going to be scaled down in the future. The presence of this file means that a component // should perform some operations specified by the component itself before being shutdown. func Create(p string) error { - // Write the file, fsync it, then fsync the containing directory in order to guarantee - // it is persisted to disk. From https://man7.org/linux/man-pages/man2/fsync.2.html - // - // > Calling fsync() does not necessarily ensure that the entry in the - // > directory containing the file has also reached disk. For that an - // > explicit fsync() on a file descriptor for the directory is also - // > needed. - file, err := os.Create(p) - if err != nil { - return err - } - - merr := multierror.New() - _, err = file.WriteString(time.Now().UTC().Format(time.RFC3339)) - merr.Add(err) - merr.Add(file.Sync()) - merr.Add(file.Close()) - - if err := merr.Err(); err != nil { - return err - } - - dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777) - if err != nil { - return err - } - - merr.Add(dir.Sync()) - merr.Add(dir.Close()) - return merr.Err() + return atomicfs.CreateFile(p, strings.NewReader(time.Now().UTC().Format(time.RFC3339))) } // Remove removes the shutdown marker file on the given path if it exists.