From 02604e87e5cdbcdfaba6db09e8cadc5edf048bd6 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 17 May 2024 13:37:36 -0700 Subject: [PATCH] Allow configurable request logger in Store Gateway (#7367) * allow configurable request logger for Store Gateway Signed-off-by: Ben Ye * lint Signed-off-by: Ben Ye * lint Signed-off-by: Ben Ye * fix tests Signed-off-by: Ben Ye * fix test Signed-off-by: Ben Ye * address comments Signed-off-by: Ben Ye * fix tests Signed-off-by: Ben Ye * changelog Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + cmd/thanos/store.go | 8 +++ pkg/store/bucket.go | 88 +++++++++++++++++++++------------ pkg/store/bucket_test.go | 32 ++++++------ pkg/store/lazy_postings.go | 2 +- pkg/store/lazy_postings_test.go | 6 +-- 6 files changed, 85 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 512780fe06..28be55b046 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration. +- [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs. - [#7363](https://github.com/thanos-io/thanos/pull/7363) Query-frontend: set value of remote_user field in Slow Query Logs from HTTP header ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 6c752ce15d..30e9776bdd 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -45,6 +45,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" + "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -394,6 +395,13 @@ func runStore( options := []store.BucketStoreOption{ store.WithLogger(logger), + store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger { + reqID, ok := middleware.RequestIDFromContext(ctx) + if ok { + return log.With(logger, "request-id", reqID) + } + return logger + }), store.WithRegistry(reg), store.WithIndexCache(indexCache), store.WithQueryGate(queriesGate), diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f0fd1feda6..ad3dfa0c19 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxChunkFunc BlockEstimator indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc + + requestLoggerFunc RequestLoggerFunc } func (s *BucketStore) validate() error { @@ -449,6 +451,20 @@ func WithLogger(logger log.Logger) BucketStoreOption { } } +type RequestLoggerFunc func(ctx context.Context, log log.Logger) log.Logger + +func NoopRequestLoggerFunc(_ context.Context, logger log.Logger) log.Logger { + return logger +} + +// WithRequestLoggerFunc sets the BucketStore to use the passed RequestLoggerFunc +// to initialize logger during query time. +func WithRequestLoggerFunc(loggerFunc RequestLoggerFunc) BucketStoreOption { + return func(s *BucketStore) { + s.requestLoggerFunc = loggerFunc + } +} + // WithRegistry sets a registry that BucketStore uses to register metrics with. func WithRegistry(reg prometheus.Registerer) BucketStoreOption { return func(s *BucketStore) { @@ -583,6 +599,7 @@ func NewBucketStore( seriesBatchSize: SeriesBatchSize, sortingStrategy: sortingStrategyStore, indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, + requestLoggerFunc: NoopRequestLoggerFunc, } for _, option := range options { @@ -789,7 +806,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er b, err := newBucketBlock( ctx, - log.With(s.logger, "block", meta.ULID), s.metrics, meta, s.bkt, @@ -1047,7 +1063,7 @@ func newBlockSeriesClient( ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { - chunkr = b.chunkReader() + chunkr = b.chunkReader(logger) } extLset := b.extLset @@ -1063,7 +1079,7 @@ func newBlockSeriesClient( mint: req.MinTime, maxt: req.MaxTime, - indexr: b.indexReader(), + indexr: b.indexReader(logger), chunkr: chunkr, seriesLimiter: seriesLimiter, chunksLimiter: chunksLimiter, @@ -1479,6 +1495,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false + + logger = s.requestLoggerFunc(ctx, s.logger) ) if req.Hints != nil { @@ -1515,7 +1533,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) if s.debugLogging { - debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) + debugFoundBlockSetOverview(logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) } for _, b := range blocks { @@ -1531,7 +1549,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blockClient := newBlockSeriesClient( srv.Context(), - s.logger, + log.With(logger, "block", blk.meta.ULID), blk, req, seriesLimiter, @@ -1641,7 +1659,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum)) s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) - level.Debug(s.logger).Log("msg", "stats query processed", + level.Debug(logger).Log("msg", "stats query processed", "request", req, "tenant", tenant, "stats", fmt.Sprintf("%+v", stats), "err", err) @@ -1772,6 +1790,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) + var logger = s.requestLoggerFunc(ctx, s.logger) for _, b := range s.blocks { b := b @@ -1793,7 +1812,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader() + blockLogger := log.With(logger, "block", b.meta.ULID) + indexr := b.indexReader(blockLogger) g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{ @@ -1803,7 +1823,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq "block.resolution": b.meta.Thanos.Downsample.Resolution, }) defer span.Finish() - defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") + defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label names") var result []string if len(reqSeriesMatchersNoExtLabels) == 0 { @@ -1834,7 +1854,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } blockClient := newBlockSeriesClient( newCtx, - s.logger, + blockLogger, b, seriesReq, seriesLimiter, @@ -1981,6 +2001,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) + var logger = s.requestLoggerFunc(ctx, s.logger) for _, b := range s.blocks { b := b @@ -2012,7 +2033,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader() + blockLogger := log.With(logger, "block", b.meta.ULID) + indexr := b.indexReader(blockLogger) + g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{ "block.id": b.meta.ULID, @@ -2021,7 +2044,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR "block.resolution": b.meta.Thanos.Downsample.Resolution, }) defer span.Finish() - defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") + defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label values") var result []string if len(reqSeriesMatchersNoExtLabels) == 0 { @@ -2045,7 +2068,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } blockClient := newBlockSeriesClient( newCtx, - s.logger, + blockLogger, b, seriesReq, seriesLimiter, @@ -2275,7 +2298,6 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M // bucketBlock represents a block that is located in a bucket. It holds intermediate // state for the block on local disk. type bucketBlock struct { - logger log.Logger metrics *bucketStoreMetrics bkt objstore.BucketReader meta *metadata.Meta @@ -2302,7 +2324,6 @@ type bucketBlock struct { func newBucketBlock( ctx context.Context, - logger log.Logger, metrics *bucketStoreMetrics, meta *metadata.Meta, bkt objstore.BucketReader, @@ -2327,7 +2348,6 @@ func newBucketBlock( extLset := labels.FromMap(meta.Thanos.Labels) relabelLabels := labels.NewBuilder(extLset).Set(block.BlockIDLabel, meta.ULID.String()).Labels() b = &bucketBlock{ - logger: logger, metrics: metrics, bkt: bkt, indexCache: indexCache, @@ -2366,12 +2386,12 @@ func (b *bucketBlock) indexFilename() string { return path.Join(b.meta.ULID.String(), block.IndexFilename) } -func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) { +func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64, logger log.Logger) ([]byte, error) { r, err := b.bkt.GetRange(ctx, b.indexFilename(), off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(logger, r, "readIndexRange close range reader") // Preallocate the buffer with the exact size so we don't waste allocations // while progressively growing an initial small buffer. The buffer capacity @@ -2384,7 +2404,7 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([] return buf.Bytes(), nil } -func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) { +func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges, logger log.Logger) (*[]byte, error) { if seq < 0 || seq >= len(b.chunkObjs) { return nil, errors.Errorf("unknown segment file for index %d", seq) } @@ -2394,7 +2414,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i if err != nil { return nil, errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader") // Get a buffer from the pool. chunkBuffer, err := b.chunkPool.Get(chunkRanges.size()) @@ -2418,14 +2438,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) } -func (b *bucketBlock) indexReader() *bucketIndexReader { +func (b *bucketBlock) indexReader(logger log.Logger) *bucketIndexReader { b.pendingReaders.Add(1) - return newBucketIndexReader(b) + return newBucketIndexReader(b, logger) } -func (b *bucketBlock) chunkReader() *bucketChunkReader { +func (b *bucketBlock) chunkReader(logger log.Logger) *bucketChunkReader { b.pendingReaders.Add(1) - return newBucketChunkReader(b) + return newBucketChunkReader(b, logger) } // matchRelabelLabels verifies whether the block matches the given matchers. @@ -2462,9 +2482,10 @@ type bucketIndexReader struct { loadedSeries map[storage.SeriesRef][]byte indexVersion int + logger log.Logger } -func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { +func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { r := &bucketIndexReader{ block: block, dec: &index.Decoder{ @@ -2472,6 +2493,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { }, stats: &queryStats{}, loadedSeries: map[storage.SeriesRef][]byte{}, + logger: logger, } return r } @@ -2871,13 +2893,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, }() // If failed to decode or expand cached postings, return and expand postings again. if err != nil { - level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } ps, err := ExpandPostingsWithContext(ctx, p) if err != nil { - level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } @@ -3017,7 +3039,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab if err != nil { return errors.Wrap(err, "read postings range") } - defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(r.logger, partReader, "readIndexRange close range reader") brdr.Reset(partReader) rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) @@ -3202,7 +3224,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series stats.DataDownloadedSizeSum += units.Base2Bytes(end - start) } - b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) + b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger) if err != nil { return errors.Wrap(err, "read series range") } @@ -3226,7 +3248,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series // Inefficient, but should be rare. r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc() - level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) + level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant) @@ -3416,17 +3438,19 @@ type bucketChunkReader struct { chunkBytesMtx sync.Mutex stats *queryStats chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. + logger log.Logger loadingChunksMtx sync.Mutex loadingChunks bool finishLoadingChks chan struct{} } -func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { +func newBucketChunkReader(block *bucketBlock, logger log.Logger) *bucketChunkReader { return &bucketChunkReader{ block: block, stats: &queryStats{}, toLoad: make([][]loadIdx, len(block.chunkObjs)), + logger: logger, } } @@ -3533,7 +3557,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a if err != nil { return errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(r.logger, reader, "readChunkRange close range reader") bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) stats.chunksFetchCount++ @@ -3620,7 +3644,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a } stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen) - nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 39ebb6f99e..3d9e1f2daa 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -216,7 +216,7 @@ func TestBucketFilterExtLabelsMatchers(t *testing.T) { }, }, } - b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) + b, _ := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) ms := []*labels.Matcher{ {Type: labels.MatchNotEqual, Name: "a", Value: "b"}, } @@ -264,7 +264,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { }, } - b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) + b, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) testutil.Ok(t, err) cases := []struct { @@ -1080,10 +1080,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { }, }, bkt: bkt, - logger: log.NewNopLogger(), metrics: s, indexCache: noopCache{}, } + logger := log.NewNopLogger() buf := encoding.Encbuf{} buf.PutByte(0) @@ -1100,6 +1100,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { block: b, stats: &queryStats{}, loadedSeries: map[storage.SeriesRef][]byte{}, + logger: logger, } // Success with no refetches. @@ -1277,6 +1278,7 @@ func benchmarkExpandedPostings( }) iRegexBigValueSet := labels.MustNewMatcher(labels.MatchRegexp, "uniq", strings.Join(bigValueSet, "|")) + logger := log.NewNopLogger() series = series / 5 cases := []struct { name string @@ -1309,7 +1311,6 @@ func benchmarkExpandedPostings( for _, c := range cases { t.Run(c.name, func(t testutil.TB) { b := &bucketBlock{ - logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, @@ -1318,7 +1319,7 @@ func benchmarkExpandedPostings( partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } - indexr := newBucketIndexReader(b) + indexr := newBucketIndexReader(b, logger) t.ResetTimer() for i := 0; i < t.N(); i++ { @@ -1342,7 +1343,6 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil)) testutil.Ok(t, err) b := &bucketBlock{ - logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, @@ -1351,7 +1351,8 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } - indexr := newBucketIndexReader(b) + logger := log.NewNopLogger() + indexr := newBucketIndexReader(b, logger) matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") @@ -1376,7 +1377,6 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil)) testutil.Ok(t, err) b := &bucketBlock{ - logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, @@ -1386,7 +1386,8 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { estimatedMaxSeriesSize: 20, } - indexr := newBucketIndexReader(b) + logger := log.NewNopLogger() + indexr := newBucketIndexReader(b, logger) // matcher1 and matcher2 will match nothing after intersection. matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*") @@ -1716,7 +1717,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1 = &bucketBlock{ indexCache: indexCache, - logger: logger, metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, @@ -1757,7 +1757,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b2 = &bucketBlock{ indexCache: indexCache, - logger: logger, metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, @@ -1789,6 +1788,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), seriesBatchSize: SeriesBatchSize, + requestLoggerFunc: NoopRequestLoggerFunc, } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -2754,7 +2754,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil) + blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil) testutil.Ok(b, err) b.ResetTimer() @@ -2763,7 +2763,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { offset := int64(0) length := readLengths[n%len(readLengths)] - _, err := blk.readChunkRange(ctx, 0, offset, length, byteRanges{{offset: 0, length: int(length)}}) + _, err := blk.readChunkRange(ctx, 0, offset, length, byteRanges{{offset: 0, length: int(length)}}, logger) if err != nil { b.Fatal(err.Error()) } @@ -2843,7 +2843,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil) + blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil) testutil.Ok(b, err) return blk, blockMeta } @@ -3459,6 +3459,7 @@ func TestExpandedPostingsRace(t *testing.T) { testutil.Ok(t, bkt.Close()) }) + logger := log.NewNopLogger() // Create a block. head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, "head"), @@ -3498,7 +3499,6 @@ func TestExpandedPostingsRace(t *testing.T) { blk, err := newBucketBlock( context.Background(), - log.NewLogfmtLogger(os.Stderr), newBucketStoreMetrics(nil), m, bkt, @@ -3543,7 +3543,7 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index cfcf987e14..1858b7dee4 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -60,7 +60,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups continue } if rng.End <= rng.Start { - level.Error(r.block.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") + level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } // Each range starts from the #entries field which is 4 bytes. diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 08e30b6d5b..bec95ac413 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -254,12 +254,12 @@ func (h *mockIndexHeaderReader) LabelNames() ([]string, error) { return nil, nil func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { ctx := context.Background() - logger := log.NewNopLogger() dir := t.TempDir() bkt, err := filesystem.NewBucket(dir) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() + logger := log.NewNopLogger() inputError := errors.New("random") blockID := ulid.MustNew(1, nil) meta := &metadata.Meta{ @@ -555,9 +555,9 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { t.Run(tc.name, func(t *testing.T) { headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} registry := prometheus.NewRegistry() - block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) testutil.Ok(t, err) - ir := newBucketIndexReader(block) + ir := newBucketIndexReader(block, logger) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter) if err != nil {