From 2109b06ff9a27d496a7092527d88b78158312bae Mon Sep 17 00:00:00 2001 From: Jon Kartago Lamida Date: Fri, 11 Aug 2023 14:59:42 +0800 Subject: [PATCH] store-gateway: Eagerly Load Lazy Loaded Index Header (#5596) * Start to do eager load of index header during startup Signed-off-by: Jon Kartago Lamida * Make EagerLoading as public method Signed-off-by: Jon Kartago Lamida * Add eager loading test Signed-off-by: Jon Kartago Lamida * Small refactoring of test Signed-off-by: Jon Kartago Lamida * Add godoc Signed-off-by: Jon Kartago Lamida * Update reader_pool_test with eager loading logic Signed-off-by: Jon Kartago Lamida * Small refactoring to make test clearer Signed-off-by: Jon Kartago Lamida * Appease the linter Signed-off-by: Jon Kartago Lamida * Rename private method for test Signed-off-by: Jon Kartago Lamida * Add config to enable or disable index headers eager loading Signed-off-by: Jon Kartago Lamida * Changelog Signed-off-by: Jon Kartago Lamida * Changelog Signed-off-by: Jon Kartago Lamida * Go select can't be inside method Signed-off-by: Jon Kartago Lamida * Run make doc Signed-off-by: Jon Kartago Lamida * Fix test Signed-off-by: Jon Kartago Lamida * Update CHANGELOG.md Co-authored-by: Charles Korn * Update pkg/storegateway/indexheader/lazy_binary_reader.go Co-authored-by: Charles Korn * Apply configs PR feedbacks Signed-off-by: Jon Kartago Lamida * Pass BucketStoreConfig as argument Signed-off-by: Jon Kartago Lamida * Pass eagerLoadIndexReaderEnabled into snapshotConfig Signed-off-by: Jon Kartago Lamida * Don't pass lazyLoadedSnapshot into LazyBinaryReader Signed-off-by: Jon Kartago Lamida * Only initialize tickerLazyLoad when flag is enabled Signed-off-by: Jon Kartago Lamida * Apply PR feedbacks Signed-off-by: Jon Kartago Lamida * Update config description Signed-off-by: Jon Kartago Lamida * Update usedAt to current time in eager loading Signed-off-by: Jon Kartago Lamida * Update pkg/storegateway/indexheader/reader_pool.go Co-authored-by: Dimitar Dimitrov * Fix snapshotByte name Signed-off-by: Jon Kartago Lamida * Address PR feedbacks Signed-off-by: Jon Kartago Lamida * Rename initLazyBinaryForTest Signed-off-by: Jon Kartago Lamida * Apply PR suggestion Signed-off-by: Jon Kartago Lamida * Refactor test Signed-off-by: Jon Kartago Lamida * Fix comment Signed-off-by: Jon Kartago Lamida * Apply more PR suggestions Signed-off-by: Jon Kartago Lamida * Refactor configs Signed-off-by: Jon Kartago Lamida * Header eager loading only must be done during initialSync Signed-off-by: Jon Kartago Lamida * Remove unnecesary comment Signed-off-by: Jon Kartago Lamida * Remove snapshot file regardless snapshot loading result Signed-off-by: Jon Kartago Lamida * Fix compilation error Signed-off-by: Jon Kartago Lamida * Update pkg/storegateway/indexheader/header.go Co-authored-by: Charles Korn * Regenerate doc for updated command line args Signed-off-by: Jon Kartago Lamida * Update error log Signed-off-by: Jon Kartago Lamida * Update comment Signed-off-by: Jon Kartago Lamida * Update comment Signed-off-by: Jon Kartago Lamida * Update test Signed-off-by: Jon Kartago Lamida * Add test case Signed-off-by: Jon Kartago Lamida * Update pkg/storegateway/indexheader/reader_pool.go Co-authored-by: Charles Korn * Update pkg/storegateway/indexheader/reader_pool_test.go Co-authored-by: Charles Korn * Update pkg/storegateway/indexheader/reader_pool_test.go Co-authored-by: Charles Korn * Update pkg/storegateway/indexheader/lazy_binary_reader.go Co-authored-by: Charles Korn * Update pkg/storegateway/indexheader/reader_pool.go Co-authored-by: Charles Korn * Update pkg/storegateway/indexheader/header.go Co-authored-by: Charles Korn * Update changelog Signed-off-by: Jon Kartago Lamida --------- Signed-off-by: Jon Kartago Lamida Co-authored-by: Charles Korn Co-authored-by: Dimitar Dimitrov --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 13 +- cmd/mimir/help-all.txt.tmpl | 4 +- .../configuration-parameters/index.md | 8 +- pkg/storage/tsdb/config.go | 3 + pkg/storegateway/bucket.go | 38 ++-- pkg/storegateway/bucket_e2e_test.go | 20 +- pkg/storegateway/bucket_stores.go | 9 +- pkg/storegateway/bucket_test.go | 197 ++++++++++-------- pkg/storegateway/indexheader/header.go | 17 +- .../indexheader/lazy_binary_reader.go | 13 ++ .../indexheader/lazy_binary_reader_test.go | 81 ++++--- pkg/storegateway/indexheader/reader_pool.go | 80 +++++-- .../indexheader/reader_pool_test.go | 114 +++++++--- 14 files changed, 408 insertions(+), 190 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 953fa7d3cab..a645dcbe1bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -181,6 +181,7 @@ * [CHANGE] Compactor: change default of `-compactor.first-level-compaction-wait-period` to 25m. #5128 * [CHANGE] Ruler: changed default of `-ruler.poll-interval` from `1m` to `10m`. Starting from this release, the configured rule groups will also be re-synced each time they're modified calling the ruler configuration API. #5170 * [FEATURE] Query-frontend: add `-query-frontend.log-query-request-headers` to enable logging of request headers in query logs. #5030 +* [FEATURE] Store-gateway: add experimental feature to retain lazy-loaded index headers between restarts by eagerly loading them during startup. This is disabled by default and can only be enabled if lazy loading is enabled. #5606 * [ENHANCEMENT] Add per-tenant limit `-validation.max-native-histogram-buckets` to be able to ignore native histogram samples that have too many buckets. #4765 * [ENHANCEMENT] Store-gateway: reduce memory usage in some LabelValues calls. #4789 * [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797 #4830 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index c07c7181185..9fa91041634 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -7516,13 +7516,24 @@ "kind": "field", "name": "max_idle_file_handles", "required": false, - "desc": "Maximum number of idle file handles the store-gateway keeps open for each index header file.", + "desc": "Maximum number of idle file handles the store-gateway keeps open for each index-header file.", "fieldValue": null, "fieldDefaultValue": 1, "fieldFlag": "blocks-storage.bucket-store.index-header.max-idle-file-handles", "fieldType": "int", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "eager_loading_startup_enabled", + "required": false, + "desc": "If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "blocks-storage.bucket-store.index-header.eager-loading-startup-enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "verify_on_load", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index f92f8e7a226..4560b6b6548 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -503,8 +503,10 @@ Usage of ./cmd/mimir/mimir: If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s) -blocks-storage.bucket-store.index-header-sparse-persistence-enabled [experimental] If enabled, store-gateway will persist a sparse version of the index-header to disk on construction and load sparse index-headers from disk instead of the whole index-header. + -blocks-storage.bucket-store.index-header.eager-loading-startup-enabled + [experimental] If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled. -blocks-storage.bucket-store.index-header.max-idle-file-handles uint - Maximum number of idle file handles the store-gateway keeps open for each index header file. (default 1) + Maximum number of idle file handles the store-gateway keeps open for each index-header file. (default 1) -blocks-storage.bucket-store.index-header.verify-on-load If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading. -blocks-storage.bucket-store.max-chunk-pool-bytes uint diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index 0661d615ff8..eb00cf5e806 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -3412,10 +3412,16 @@ bucket_store: index_header: # (advanced) Maximum number of idle file handles the store-gateway keeps - # open for each index header file. + # open for each index-header file. # CLI flag: -blocks-storage.bucket-store.index-header.max-idle-file-handles [max_idle_file_handles: | default = 1] + # (experimental) If enabled, store-gateway will periodically persist block + # IDs of lazy loaded index-headers and load them eagerly during startup. It + # is not valid to enable this if index-header lazy loading is disabled. + # CLI flag: -blocks-storage.bucket-store.index-header.eager-loading-startup-enabled + [eager_loading_startup_enabled: | default = false] + # (advanced) If true, verify the checksum of index headers upon loading them # (either on startup or lazily when lazy loading is enabled). Setting to # true helps detect disk corruption at the cost of slowing down index header diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d34f70e8f4e..2589fadcf13 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -498,6 +498,9 @@ func (cfg *BucketStoreConfig) Validate(logger log.Logger) error { if cfg.SeriesSelectionStrategyName == WorstCasePostingsStrategy && cfg.SelectionStrategies.WorstCaseSeriesPreference <= 0 { return errors.New("invalid worst-case series preference; must be positive") } + if err := cfg.IndexHeader.Validate(cfg.IndexHeaderLazyLoadingEnabled); err != nil { + return errors.Wrap(err, "index-header configuration") + } if cfg.IndexHeaderLazyLoadingConcurrency < 0 { return errInvalidIndexHeaderLazyLoadingConcurrency } diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 6e9a2dc1f1b..7ab785a3f68 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -42,6 +42,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/sharding" + "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/storegateway/chunkscache" @@ -227,18 +228,11 @@ func NewBucketStore( bkt objstore.InstrumentedBucketReader, fetcher block.MetadataFetcher, dir string, - maxSeriesPerBatch int, - numChunksRangesPerSeries int, + bucketStoreConfig tsdb.BucketStoreConfig, postingsStrategy postingsSelectionStrategy, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, partitioners blockPartitioners, - blockSyncConcurrency int, - postingOffsetsInMemSampling int, - indexHeaderCfg indexheader.Config, - lazyIndexReaderEnabled bool, - lazyIndexReaderIdleTimeout time.Duration, - sparsePersistenceEnabled bool, seriesHashCache *hashcache.SeriesHashCache, metrics *BucketStoreMetrics, options ...BucketStoreOption, @@ -252,19 +246,19 @@ func NewBucketStore( chunksCache: chunkscache.NoopCache{}, blocks: map[ulid.ULID]*bucketBlock{}, blockSet: newBucketBlockSet(), - blockSyncConcurrency: blockSyncConcurrency, + blockSyncConcurrency: bucketStoreConfig.BlockSyncConcurrency, queryGate: gate.NewNoop(), lazyLoadingGate: gate.NewNoop(), chunksLimiterFactory: chunksLimiterFactory, seriesLimiterFactory: seriesLimiterFactory, partitioners: partitioners, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - indexHeaderCfg: indexHeaderCfg, + postingOffsetsInMemSampling: bucketStoreConfig.PostingOffsetsInMemSampling, + indexHeaderCfg: bucketStoreConfig.IndexHeader, seriesHashCache: seriesHashCache, metrics: metrics, userID: userID, - maxSeriesPerBatch: maxSeriesPerBatch, - numChunksRangesPerSeries: numChunksRangesPerSeries, + maxSeriesPerBatch: bucketStoreConfig.StreamingBatchSize, + numChunksRangesPerSeries: bucketStoreConfig.ChunkRangesPerSeries, postingsStrategy: postingsStrategy, } @@ -274,11 +268,12 @@ func NewBucketStore( lazyLoadedSnapshotConfig := indexheader.LazyLoadedHeadersSnapshotConfig{ // Path stores where lazy loaded blocks will be tracked in a single file per tenant - Path: dir, - UserID: userID, + Path: dir, + UserID: userID, + EagerLoadingEnabled: bucketStoreConfig.IndexHeader.IndexHeaderEagerLoadingStartupEnabled, } // Depend on the options - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, sparsePersistenceEnabled, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedSnapshotConfig) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeaderLazyLoadingEnabled, bucketStoreConfig.IndexHeaderLazyLoadingIdleTimeout, bucketStoreConfig.IndexHeaderSparsePersistenceEnabled, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedSnapshotConfig) if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") @@ -311,6 +306,10 @@ func (s *BucketStore) Stats() BucketStoreStats { // SyncBlocks synchronizes the stores state with the Bucket bucket. // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { + return s.syncBlocks(ctx, false) +} + +func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error { metas, _, metaFetchErr := s.fetcher.Fetch(ctx) // For partial view allow adding new blocks at least. if metaFetchErr != nil && metas == nil { @@ -324,7 +323,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { wg.Add(1) go func() { for meta := range blockc { - if err := s.addBlock(ctx, meta); err != nil { + if err := s.addBlock(ctx, meta, initialSync); err != nil { continue } } @@ -366,7 +365,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { // InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer // present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup. func (s *BucketStore) InitialSync(ctx context.Context) error { - if err := s.SyncBlocks(ctx); err != nil { + if err := s.syncBlocks(ctx, true); err != nil { return errors.Wrap(err, "sync block") } @@ -402,7 +401,7 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { return s.blocks[id] } -func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error) { +func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSync bool) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) start := time.Now() @@ -428,6 +427,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error meta.ULID, s.postingOffsetsInMemSampling, s.indexHeaderCfg, + initialSync, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 4f503f8851e..452edfd7889 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -197,18 +197,22 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS objstore.WithNoopInstr(bkt), metaFetcher, cfg.tempDir, - cfg.maxSeriesPerBatch, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: cfg.maxSeriesPerBatch, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 20, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: true, + }, + IndexHeaderLazyLoadingEnabled: true, + IndexHeaderLazyLoadingIdleTimeout: time.Minute, + IndexHeaderSparsePersistenceEnabled: true, + }, cfg.postingsStrategy, cfg.chunksLimiterFactory, cfg.seriesLimiterFactory, newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 20, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - true, - time.Minute, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(s.metricsRegistry), storeOpts..., diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index f0333f6a777..2e3bde2d12e 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -482,8 +482,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { userBkt, fetcher, u.syncDirForUser(userID), - u.cfg.BucketStore.StreamingBatchSize, - u.cfg.BucketStore.ChunkRangesPerSeries, + u.cfg.BucketStore, selectPostingsStrategy(u.logger, u.cfg.BucketStore.SeriesSelectionStrategyName, u.cfg.BucketStore.SelectionStrategies.WorstCaseSeriesPreference), NewChunksLimiterFactory(func() uint64 { return uint64(u.limits.MaxChunksPerQuery(userID)) @@ -492,12 +491,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { return uint64(u.limits.MaxFetchedSeriesPerQuery(userID)) }), u.partitioners, - u.cfg.BucketStore.BlockSyncConcurrency, - u.cfg.BucketStore.PostingOffsetsInMemSampling, - u.cfg.BucketStore.IndexHeader, - u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled, - u.cfg.BucketStore.IndexHeaderLazyLoadingIdleTimeout, - u.cfg.BucketStore.IndexHeaderSparsePersistenceEnabled, u.seriesHashCache, u.bucketStoreMetrics, bucketStoreOpts..., diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index bb25f80bcb2..5b956378129 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1480,18 +1480,22 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries ibkt, f, tmpDir, - testData.maxSeriesPerBatch, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: testData.maxSeriesPerBatch, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 1, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(0), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 1, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(reg), testData.options..., @@ -1597,32 +1601,37 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { // Reset the memory pool tracker. seriesChunkRefsSetPool.(*pool.TrackedPool).Reset() - metaFetcher, err := block.NewMetaFetcher(logger, 1, instrumentedBucket, "", nil, nil) - assert.NoError(t, err) - // Create the bucket store. - store, err := NewBucketStore( - "test-user", - instrumentedBucket, - metaFetcher, - tmpDir, - batchSize, - 1, - selectAllStrategy{}, - newStaticChunksLimiterFactory(0), - newStaticSeriesLimiterFactory(0), - newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 1, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, // Lazy index-header loading disabled. - 0, - true, // Sparse index-header persistence enabled. - hashcache.NewSeriesHashCache(1024*1024), - NewBucketStoreMetrics(nil), - WithLogger(logger), - ) - require.NoError(t, err) - require.NoError(t, store.SyncBlocks(ctx)) + metaFetcher, err := block.NewMetaFetcher(logger, 1, instrumentedBucket, "", nil, nil) + assert.NoError(t, err) + + // Create the bucket store. + store, err := NewBucketStore( + "test-user", + instrumentedBucket, + metaFetcher, + tmpDir, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: batchSize, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 1, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, + selectAllStrategy{}, + newStaticChunksLimiterFactory(0), + newStaticSeriesLimiterFactory(0), + newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), + hashcache.NewSeriesHashCache(1024*1024), + NewBucketStoreMetrics(nil), + WithLogger(logger), + ) + require.NoError(t, err) + require.NoError(t, store.SyncBlocks(ctx)) // Run workers. wg := sync.WaitGroup{} @@ -1925,18 +1934,22 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -1981,18 +1994,22 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -2044,18 +2061,22 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) { instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -2166,18 +2187,22 @@ func testBucketStoreSeriesBlockWithMultipleChunks( instrBkt, fetcher, tmpDir, - 5000, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: 5000, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(100000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), WithLogger(logger), @@ -2329,18 +2354,22 @@ func TestBucketStore_Series_Limits(t *testing.T) { instrBkt, fetcher, tmpDir, - batchSize, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: batchSize, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(testData.chunksLimit), newStaticSeriesLimiterFactory(testData.seriesLimit), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), ) @@ -2445,18 +2474,22 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS instrBkt, fetcher, tmpDir, - maxSeriesPerBatch, - 1, + mimir_tsdb.BucketStoreConfig{ + StreamingBatchSize: maxSeriesPerBatch, + ChunkRangesPerSeries: 1, + BlockSyncConcurrency: 10, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + IndexHeaderEagerLoadingStartupEnabled: false, + }, + IndexHeaderLazyLoadingEnabled: false, + IndexHeaderLazyLoadingIdleTimeout: 0, + IndexHeaderSparsePersistenceEnabled: true, + }, selectAllStrategy{}, newStaticChunksLimiterFactory(10000/MaxSamplesPerChunk), newStaticSeriesLimiterFactory(0), newGapBasedPartitioners(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 10, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.Config{}, - false, - 0, - true, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), opts..., diff --git a/pkg/storegateway/indexheader/header.go b/pkg/storegateway/indexheader/header.go index 4371a21ca0f..b981f0346cb 100644 --- a/pkg/storegateway/indexheader/header.go +++ b/pkg/storegateway/indexheader/header.go @@ -18,6 +18,8 @@ import ( // NotFoundRangeErr is an error returned by PostingsOffset when there is no posting for given name and value pairs. var NotFoundRangeErr = errors.New("range not found") //nolint:revive +var errEagerLoadingStartupEnabledLazyLoadDisabled = errors.New("invalid configuration: store-gateway index header eager-loading is enabled, but lazy-loading is disabled") + // Reader is an interface allowing to read essential, minimal number of index fields from the small portion of index file called header. type Reader interface { io.Closer @@ -52,11 +54,20 @@ type Reader interface { } type Config struct { - MaxIdleFileHandles uint `yaml:"max_idle_file_handles" category:"advanced"` - VerifyOnLoad bool `yaml:"verify_on_load" category:"advanced"` + MaxIdleFileHandles uint `yaml:"max_idle_file_handles" category:"advanced"` + IndexHeaderEagerLoadingStartupEnabled bool `yaml:"eager_loading_startup_enabled" category:"experimental"` + VerifyOnLoad bool `yaml:"verify_on_load" category:"advanced"` } func (cfg *Config) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.UintVar(&cfg.MaxIdleFileHandles, prefix+"max-idle-file-handles", 1, "Maximum number of idle file handles the store-gateway keeps open for each index header file.") + f.UintVar(&cfg.MaxIdleFileHandles, prefix+"max-idle-file-handles", 1, "Maximum number of idle file handles the store-gateway keeps open for each index-header file.") + f.BoolVar(&cfg.IndexHeaderEagerLoadingStartupEnabled, prefix+"eager-loading-startup-enabled", false, "If enabled, store-gateway will periodically persist block IDs of lazy loaded index-headers and load them eagerly during startup. It is not valid to enable this if index-header lazy loading is disabled.") f.BoolVar(&cfg.VerifyOnLoad, prefix+"verify-on-load", false, "If true, verify the checksum of index headers upon loading them (either on startup or lazily when lazy loading is enabled). Setting to true helps detect disk corruption at the cost of slowing down index header loading.") } + +func (cfg *Config) Validate(lazyLoadingEnabled bool) error { + if !lazyLoadingEnabled && cfg.IndexHeaderEagerLoadingStartupEnabled { + return errEagerLoadingStartupEnabledLazyLoadDisabled + } + return nil +} diff --git a/pkg/storegateway/indexheader/lazy_binary_reader.go b/pkg/storegateway/indexheader/lazy_binary_reader.go index 33dda9507ce..613cbbc9a6d 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader.go @@ -224,6 +224,19 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) { return r.reader.LabelNames() } +// EagerLoad attempts to eagerly load this index header. +func (r *LazyBinaryReader) EagerLoad() { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + level.Warn(r.logger).Log("msg", "eager loading of lazy loaded index-header failed; skipping", "err", err) + return + } + + r.usedAt.Store(time.Now().UnixNano()) +} + // load ensures the underlying binary index-header reader has been successfully loaded. Returns // an error on failure. This function MUST be called with the read lock already acquired. func (r *LazyBinaryReader) load() error { diff --git a/pkg/storegateway/indexheader/lazy_binary_reader_test.go b/pkg/storegateway/indexheader/lazy_binary_reader_test.go index c8a5eedc9ac..12424d4cd56 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader_test.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader_test.go @@ -39,25 +39,8 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) }) } -func prepareTempDirBucketAndBlock(t *testing.T) (string, *filesystem.Bucket, ulid.ULID) { - tmpDir := filepath.Join(t.TempDir(), "test-indexheader") - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, bkt.Close()) }) - - // Create block. - blockID, err := block.CreateBlock(context.Background(), tmpDir, []labels.Labels{ - labels.FromStrings("a", "1"), - labels.FromStrings("a", "2"), - labels.FromStrings("a", "3"), - }, 100, 0, 1000, labels.FromStrings("ext1", "1")) - require.NoError(t, err) - require.NoError(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) - return tmpDir, bkt, blockID -} - func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -86,7 +69,7 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { } func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) // Write a corrupted index-header for the block. headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) @@ -114,7 +97,7 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { } func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -150,7 +133,7 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { } func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -188,7 +171,7 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { // Run the test for a fixed amount of time. const runDuration = 5 * time.Second - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { require.NoError(t, err) @@ -236,6 +219,56 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { }) } +func TestNewLazyBinaryReader_EagerLoadLazyLoadedIndexHeaders(t *testing.T) { + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) + + testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { + r.EagerLoad() + + require.NoError(t, err) + require.NotNil(t, r.reader, "t.reader must already eagerly loaded") + t.Cleanup(func() { + require.NoError(t, r.Close()) + }) + + require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) + + // The index should already be loaded, the following call will return reader already loaded above + v, err := r.IndexVersion() + require.NoError(t, err) + require.Equal(t, 2, v) + require.True(t, r.reader != nil) + require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) + + labelNames, err := r.LabelNames() + require.NoError(t, err) + require.Equal(t, []string{"a"}, labelNames) + require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) + }) +} + +func initBucketAndBlocksForTest(t *testing.T) (string, *filesystem.Bucket, ulid.ULID) { + ctx := context.Background() + + tmpDir := filepath.Join(t.TempDir(), "test-indexheader") + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, bkt.Close()) }) + + // Create block. + blockID, err := block.CreateBlock(ctx, tmpDir, []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + labels.FromStrings("a", "3"), + }, 100, 0, 1000, labels.FromStrings("ext1", "1")) + require.NoError(t, err) + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) + return tmpDir, bkt, blockID +} + func testLazyBinaryReader(t *testing.T, bkt objstore.BucketReader, dir string, id ulid.ULID, test func(t *testing.T, r *LazyBinaryReader, err error)) { ctx := context.Background() logger := log.NewNopLogger() @@ -250,7 +283,7 @@ func testLazyBinaryReader(t *testing.T, bkt objstore.BucketReader, dir string, i // TestLazyBinaryReader_ShouldBlockMaxConcurrency tests if LazyBinaryReader blocks // concurrent loads such that it doesn't pass the configured maximum. func TestLazyBinaryReader_ShouldBlockMaxConcurrency(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) logger := log.NewNopLogger() @@ -305,7 +338,7 @@ func TestLazyBinaryReader_ShouldBlockMaxConcurrency(t *testing.T) { } func TestLazyBinaryReader_ConcurrentLoadingOfSameIndexReader(t *testing.T) { - tmpDir, bkt, blockID := prepareTempDirBucketAndBlock(t) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) const ( maxLazyLoadConcurrency = 1 diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index 829ffd27733..ef530b34710 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -9,6 +9,7 @@ import ( "bytes" "context" "encoding/json" + "os" "path/filepath" "sync" "time" @@ -47,6 +48,7 @@ func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics { type ReaderPool struct { lazyReaderEnabled bool lazyReaderIdleTimeout time.Duration + eagerLoadReaderEnabled bool sparsePersistenceEnabled bool logger log.Logger metrics *ReaderPoolMetrics @@ -58,14 +60,16 @@ type ReaderPool struct { close chan struct{} // Keep track of all readers managed by the pool. - lazyReadersMx sync.Mutex - lazyReaders map[*LazyBinaryReader]struct{} + lazyReadersMx sync.Mutex + lazyReaders map[*LazyBinaryReader]struct{} + preShutdownLoadedBlocks *lazyLoadedHeadersSnapshot } // LazyLoadedHeadersSnapshotConfig stores information needed to track lazy loaded index headers. type LazyLoadedHeadersSnapshotConfig struct { - Path string - UserID string + Path string + UserID string + EagerLoadingEnabled bool } type lazyLoadedHeadersSnapshot struct { @@ -92,26 +96,48 @@ func (l lazyLoadedHeadersSnapshot) persist(persistDir string) error { // NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers and persisting a list of loaded Readers to disk. func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedSnapshotConfig LazyLoadedHeadersSnapshotConfig) *ReaderPool { - p := newReaderPool(logger, lazyReaderEnabled, lazyReaderIdleTimeout, sparsePersistenceEnabled, lazyLoadingGate, metrics) + var snapshot *lazyLoadedHeadersSnapshot + if lazyReaderEnabled && lazyLoadedSnapshotConfig.EagerLoadingEnabled { + lazyLoadedSnapshotFileName := filepath.Join(lazyLoadedSnapshotConfig.Path, lazyLoadedHeadersListFile) + var err error + snapshot, err = loadLazyLoadedHeadersSnapshot(lazyLoadedSnapshotFileName) + if err != nil { + level.Warn(logger).Log("msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", "file", lazyLoadedSnapshotFileName, "err", err, "tenant", lazyLoadedSnapshotConfig.UserID) + } + // We will remove the file regardless whether err is nil or not nil. + // In the case such as snapshot loading causing OOM, we will still + // remove the snapshot and lazy load after server is restarted. + if err := os.Remove(lazyLoadedSnapshotFileName); err != nil { + level.Warn(logger).Log("msg", "removing the lazy-loaded index-header snapshot failed", "file", lazyLoadedSnapshotFileName, "err", err) + } + } + + p := newReaderPool(logger, lazyReaderEnabled, lazyReaderIdleTimeout, lazyLoadedSnapshotConfig.EagerLoadingEnabled, sparsePersistenceEnabled, lazyLoadingGate, metrics, snapshot) // Start a goroutine to close idle readers (only if required). if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { checkFreq := p.lazyReaderIdleTimeout / 10 go func() { - tickerLazyLoadPersist := time.NewTicker(time.Minute) - defer tickerLazyLoadPersist.Stop() - tickerIdleReader := time.NewTicker(checkFreq) defer tickerIdleReader.Stop() + var lazyLoadC <-chan time.Time + + if p.eagerLoadReaderEnabled { + tickerLazyLoadPersist := time.NewTicker(time.Minute) + defer tickerLazyLoadPersist.Stop() + + lazyLoadC = tickerLazyLoadPersist.C + } + for { select { case <-p.close: return case <-tickerIdleReader.C: p.closeIdleReaders() - case <-tickerLazyLoadPersist.C: + case <-lazyLoadC: snapshot := lazyLoadedHeadersSnapshot{ IndexHeaderLastUsedTime: p.LoadedBlocks(), UserID: lazyLoadedSnapshotConfig.UserID, @@ -122,6 +148,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime } } } + }() } @@ -129,23 +156,38 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime } // newReaderPool makes a new ReaderPool. -func newReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics) *ReaderPool { +func newReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, eagerLoadReaderEnabled bool, sparsePersistenceEnabled bool, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedHeadersSnapshot *lazyLoadedHeadersSnapshot) *ReaderPool { return &ReaderPool{ logger: logger, metrics: metrics, lazyReaderEnabled: lazyReaderEnabled, lazyReaderIdleTimeout: lazyReaderIdleTimeout, + eagerLoadReaderEnabled: eagerLoadReaderEnabled, sparsePersistenceEnabled: sparsePersistenceEnabled, - lazyLoadingGate: lazyLoadingGate, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), + preShutdownLoadedBlocks: lazyLoadedHeadersSnapshot, + lazyLoadingGate: lazyLoadingGate, } } +func loadLazyLoadedHeadersSnapshot(fileName string) (*lazyLoadedHeadersSnapshot, error) { + snapshotBytes, err := os.ReadFile(fileName) + if err != nil { + return nil, err + } + snapshot := &lazyLoadedHeadersSnapshot{} + err = json.Unmarshal(snapshotBytes, snapshot) + if err != nil { + return nil, err + } + return snapshot, nil +} + // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg Config) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg Config, initialSync bool) (Reader, error) { var readerFactory func() (Reader, error) var reader Reader var err error @@ -155,7 +197,19 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt } if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, readerFactory, logger, bkt, dir, id, p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate) + lazyBinaryReader, lazyErr := NewLazyBinaryReader(ctx, readerFactory, logger, bkt, dir, id, p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate) + if lazyErr != nil { + return nil, lazyErr + } + + // we only try to eager load only during initialSync + if initialSync && p.eagerLoadReaderEnabled && p.preShutdownLoadedBlocks != nil { + // we only eager load if we have preShutdownLoadedBlocks for the given block id + if p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime[id] > 0 { + lazyBinaryReader.EagerLoad() + } + } + reader, err = lazyBinaryReader, lazyErr } else { reader, err = readerFactory() } diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storegateway/indexheader/reader_pool_test.go index 51bf9ee27c7..acf3a336649 100644 --- a/pkg/storegateway/indexheader/reader_pool_test.go +++ b/pkg/storegateway/indexheader/reader_pool_test.go @@ -29,54 +29,108 @@ import ( func TestReaderPool_NewBinaryReader(t *testing.T) { tests := map[string]struct { - lazyReaderEnabled bool - lazyReaderIdleTimeout time.Duration + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + eagerLoadReaderEnabled bool + initialSync bool + createLazyLoadedHeadersSnapshotFn func(blockId ulid.ULID) lazyLoadedHeadersSnapshot + expectedLoadCountMetricBeforeLabelNamesCall int + expectedLoadCountMetricAfterLabelNamesCall int }{ "lazy reader is disabled": { - lazyReaderEnabled: false, + lazyReaderEnabled: false, + expectedLoadCountMetricAfterLabelNamesCall: 0, // no lazy loading }, "lazy reader is enabled but close on idle timeout is disabled": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: 0, + lazyReaderEnabled: true, + lazyReaderIdleTimeout: 0, + expectedLoadCountMetricAfterLabelNamesCall: 1, }, "lazy reader and close on idle timeout are both enabled": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: time.Minute, + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + expectedLoadCountMetricAfterLabelNamesCall: 1, + }, + "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + eagerLoadReaderEnabled: true, + initialSync: true, + expectedLoadCountMetricBeforeLabelNamesCall: 1, // the index header will be eagerly loaded before the operation + expectedLoadCountMetricAfterLabelNamesCall: 1, + createLazyLoadedHeadersSnapshotFn: func(blockId ulid.ULID) lazyLoadedHeadersSnapshot { + return lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{blockId: time.Now().UnixMilli()}, + UserID: "anonymous", + } + }, + }, + "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + eagerLoadReaderEnabled: true, + initialSync: false, + expectedLoadCountMetricBeforeLabelNamesCall: 0, // the index header is not eager loaded if not during initial-sync + expectedLoadCountMetricAfterLabelNamesCall: 1, + createLazyLoadedHeadersSnapshotFn: func(blockId ulid.ULID) lazyLoadedHeadersSnapshot { + return lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{blockId: time.Now().UnixMilli()}, + UserID: "anonymous", + } + }, + }, + "block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + eagerLoadReaderEnabled: true, + initialSync: true, + expectedLoadCountMetricBeforeLabelNamesCall: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file. + expectedLoadCountMetricAfterLabelNamesCall: 1, + createLazyLoadedHeadersSnapshotFn: func(_ ulid.ULID) lazyLoadedHeadersSnapshot { + // let's create a random fake blockID to be stored in lazy loaded headers file + fakeBlockID, _ := ulid.New(ulid.Now(), rand.Reader) + // this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test + + return lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()}, + UserID: "anonymous", + } + }, }, } ctx := context.Background() - - tmpDir, err := os.MkdirTemp("", "test-indexheader") - require.NoError(t, err) - defer func() { require.NoError(t, os.RemoveAll(tmpDir)) }() - - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - require.NoError(t, err) - defer func() { require.NoError(t, bkt.Close()) }() - - // Create block. - blockID, err := block.CreateBlock(ctx, tmpDir, []labels.Labels{ - labels.FromStrings("a", "1"), - labels.FromStrings("a", "2"), - labels.FromStrings("a", "3"), - }, 100, 0, 1000, labels.FromStrings("ext1", "1")) - require.NoError(t, err) - require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil)) + tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, true, gate.NewNoop(), NewReaderPoolMetrics(nil), LazyLoadedHeadersSnapshotConfig{}) + snapshotConfig := LazyLoadedHeadersSnapshotConfig{ + Path: tmpDir, + UserID: "anonymous", + EagerLoadingEnabled: testData.eagerLoadReaderEnabled, + } + if testData.createLazyLoadedHeadersSnapshotFn != nil { + lazyLoadedSnapshot := testData.createLazyLoadedHeadersSnapshotFn(blockID) + err := lazyLoadedSnapshot.persist(snapshotConfig.Path) + require.NoError(t, err) + } + + metrics := NewReaderPoolMetrics(nil) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, true, gate.NewNoop(), metrics, snapshotConfig) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{IndexHeaderEagerLoadingStartupEnabled: testData.eagerLoadReaderEnabled}, testData.initialSync) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() + require.Equal(t, float64(testData.expectedLoadCountMetricBeforeLabelNamesCall), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + // Ensure it can read data. labelNames, err := r.LabelNames() require.NoError(t, err) require.Equal(t, []string{"a"}, labelNames) + + require.Equal(t, float64(testData.expectedLoadCountMetricAfterLabelNamesCall), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) }) } } @@ -89,10 +143,10 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { // Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle // Reader instances. We'll manually invoke the cleanup task when we need it as part of this test. - pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, true, gate.NewNoop(), metrics) + pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, false, true, gate.NewNoop(), metrics, nil) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() @@ -151,10 +205,10 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) { // Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle // Reader instances. We'll manually invoke the cleanup task when we need it as part of this test. - pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, metrics) + pool := newReaderPool(log.NewNopLogger(), true, idleTimeout, true, false, gate.NewNoop(), metrics, nil) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }()