From 6906b52bc06acfad6214df7a98396240e54e7db9 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 16 May 2024 12:50:04 -0700 Subject: [PATCH 1/8] allow configurable request logger for Store Gateway Signed-off-by: Ben Ye --- cmd/thanos/store.go | 8 +++ pkg/store/bucket.go | 94 ++++++++++++++++++++------------- pkg/store/bucket_test.go | 38 +++++++------ pkg/store/lazy_postings.go | 8 +-- pkg/store/lazy_postings_test.go | 4 +- 5 files changed, 89 insertions(+), 63 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 6c752ce15d..b292ec62ea 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -6,6 +6,7 @@ package main import ( "context" "fmt" + "github.com/thanos-io/thanos/pkg/server/http/middleware" "strconv" "strings" "time" @@ -394,6 +395,13 @@ func runStore( options := []store.BucketStoreOption{ store.WithLogger(logger), + store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger { + reqID, ok := middleware.RequestIDFromContext(ctx) + if ok { + return log.With(logger, "request-id", reqID) + } + return logger + }), store.WithRegistry(reg), store.WithIndexCache(indexCache), store.WithQueryGate(queriesGate), diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 43ac0d6c1a..b31adaf9b9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxChunkFunc BlockEstimator indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc + + requestLoggerFunc RequestLoggerFunc } func (s *BucketStore) validate() error { @@ -449,6 +451,16 @@ func WithLogger(logger log.Logger) BucketStoreOption { } } +type RequestLoggerFunc func(ctx context.Context, log log.Logger) log.Logger + +// WithRequestLoggerFunc sets the BucketStore to use the passed RequestLoggerFunc +// to initialize logger during query time. +func WithRequestLoggerFunc(loggerFunc RequestLoggerFunc) BucketStoreOption { + return func(s *BucketStore) { + s.requestLoggerFunc = loggerFunc + } +} + // WithRegistry sets a registry that BucketStore uses to register metrics with. func WithRegistry(reg prometheus.Registerer) BucketStoreOption { return func(s *BucketStore) { @@ -583,6 +595,9 @@ func NewBucketStore( seriesBatchSize: SeriesBatchSize, sortingStrategy: sortingStrategyStore, indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, + requestLoggerFunc: func(ctx context.Context, logger log.Logger) log.Logger { + return logger + }, } for _, option := range options { @@ -779,7 +794,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er b, err := newBucketBlock( ctx, - log.With(s.logger, "block", meta.ULID), s.metrics, meta, s.bkt, @@ -1116,7 +1130,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant, b.logger) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1206,7 +1220,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error { b.chunkr.reset() } - if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter, b.tenant); err != nil { + if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter, b.tenant, b.logger); err != nil { return errors.Wrap(err, "preload series") } @@ -1298,7 +1312,7 @@ OUTER: } if !b.skipChunks { - if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant, b.logger); err != nil { return errors.Wrap(err, "load chunks") } } @@ -1469,6 +1483,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false + + logger = s.requestLoggerFunc(ctx, s.logger) ) if req.Hints != nil { @@ -1505,7 +1521,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) if s.debugLogging { - debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) + debugFoundBlockSetOverview(logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) } for _, b := range blocks { @@ -1521,7 +1537,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blockClient := newBlockSeriesClient( srv.Context(), - s.logger, + log.With(logger, "block", blk.meta.ULID), blk, req, seriesLimiter, @@ -1633,7 +1649,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum)) s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) - level.Debug(s.logger).Log("msg", "stats query processed", + level.Debug(logger).Log("msg", "stats query processed", "request", req, "tenant", tenant, "stats", fmt.Sprintf("%+v", stats), "err", err) @@ -1764,6 +1780,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) + var logger = s.requestLoggerFunc(ctx, s.logger) for _, b := range s.blocks { b := b @@ -1787,6 +1804,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq indexr := b.indexReader() + blockLogger := log.With(logger, "block", b.meta.ULID) + g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{ "block.id": b.meta.ULID, @@ -1795,7 +1814,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq "block.resolution": b.meta.Thanos.Downsample.Resolution, }) defer span.Finish() - defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") + defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label names") var result []string if len(reqSeriesMatchersNoExtLabels) == 0 { @@ -1826,7 +1845,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } blockClient := newBlockSeriesClient( newCtx, - s.logger, + blockLogger, b, seriesReq, seriesLimiter, @@ -1973,6 +1992,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) + var logger = s.requestLoggerFunc(ctx, s.logger) for _, b := range s.blocks { b := b @@ -2005,6 +2025,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR resHints.AddQueriedBlock(b.meta.ULID) indexr := b.indexReader() + + blockLogger := log.With(logger, "block", b.meta.ULID) + g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{ "block.id": b.meta.ULID, @@ -2013,7 +2036,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR "block.resolution": b.meta.Thanos.Downsample.Resolution, }) defer span.Finish() - defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") + defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label values") var result []string if len(reqSeriesMatchersNoExtLabels) == 0 { @@ -2037,7 +2060,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } blockClient := newBlockSeriesClient( newCtx, - s.logger, + blockLogger, b, seriesReq, seriesLimiter, @@ -2267,7 +2290,6 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M // bucketBlock represents a block that is located in a bucket. It holds intermediate // state for the block on local disk. type bucketBlock struct { - logger log.Logger metrics *bucketStoreMetrics bkt objstore.BucketReader meta *metadata.Meta @@ -2294,7 +2316,6 @@ type bucketBlock struct { func newBucketBlock( ctx context.Context, - logger log.Logger, metrics *bucketStoreMetrics, meta *metadata.Meta, bkt objstore.BucketReader, @@ -2319,7 +2340,6 @@ func newBucketBlock( extLset := labels.FromMap(meta.Thanos.Labels) relabelLabels := labels.NewBuilder(extLset).Set(block.BlockIDLabel, meta.ULID.String()).Labels() b = &bucketBlock{ - logger: logger, metrics: metrics, bkt: bkt, indexCache: indexCache, @@ -2358,12 +2378,12 @@ func (b *bucketBlock) indexFilename() string { return path.Join(b.meta.ULID.String(), block.IndexFilename) } -func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) { +func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64, logger log.Logger) ([]byte, error) { r, err := b.bkt.GetRange(ctx, b.indexFilename(), off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(logger, r, "readIndexRange close range reader") // Preallocate the buffer with the exact size so we don't waste allocations // while progressively growing an initial small buffer. The buffer capacity @@ -2376,7 +2396,7 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([] return buf.Bytes(), nil } -func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) { +func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges, logger log.Logger) (*[]byte, error) { if seq < 0 || seq >= len(b.chunkObjs) { return nil, errors.Errorf("unknown segment file for index %d", seq) } @@ -2386,7 +2406,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i if err != nil { return nil, errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader") // Get a buffer from the pool. chunkBuffer, err := b.chunkPool.Get(chunkRanges.size()) @@ -2494,14 +2514,14 @@ func (r *bucketIndexReader) reset(size int) { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string, logger log.Logger) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { return nil, nil } - hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant) + hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant, logger) if err != nil { return nil, err } @@ -2546,7 +2566,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant, logger) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } @@ -2844,7 +2864,7 @@ type postingPtr struct { ptr index.Range } -func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string, logger log.Logger) (bool, []storage.SeriesRef, error) { dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) if !hit { return false, nil, nil @@ -2863,13 +2883,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, }() // If failed to decode or expand cached postings, return and expand postings again. if err != nil { - level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } ps, err := ExpandPostingsWithContext(ctx, p) if err != nil { - level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } @@ -2910,7 +2930,7 @@ var bufioReaderPool = sync.Pool{ // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string, logger log.Logger) ([]index.Postings, []func(), error) { var closeFns []func() timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) @@ -3009,7 +3029,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab if err != nil { return errors.Wrap(err, "read postings range") } - defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(logger, partReader, "readIndexRange close range reader") brdr.Reset(partReader) rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) @@ -3146,7 +3166,7 @@ func (it *bigEndianPostings) length() int { return len(it.list) / 4 } -func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string) error { +func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration.WithLabelValues(tenant)) defer func() { d := timer.ObserveDuration() @@ -3174,13 +3194,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser i, j := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { - return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant) + return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant, logger) }) } return g.Wait() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { begin := time.Now() stats := new(queryStats) defer func() { @@ -3194,7 +3214,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series stats.DataDownloadedSizeSum += units.Base2Bytes(end - start) } - b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) + b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), logger) if err != nil { return errors.Wrap(err, "read series range") } @@ -3218,10 +3238,10 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series // Inefficient, but should be rare. r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc() - level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) + level.Warn(logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. - return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant) + return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant, logger) } c = c[n : n+int(l)] r.loadedSeriesMtx.Lock() @@ -3465,7 +3485,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) } // load loads all added chunks and saves resulting aggrs to refs. -func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { r.loadingChunksMtx.Lock() r.loadingChunks = true r.loadingChunksMtx.Unlock() @@ -3503,7 +3523,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ p := p indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant) + return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant, logger) }) } } @@ -3512,7 +3532,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { fetchBegin := time.Now() stats := new(queryStats) defer func() { @@ -3525,7 +3545,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a if err != nil { return errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader") bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) stats.chunksFetchCount++ @@ -3605,7 +3625,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a } stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen) - nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, logger) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e6c64621e1..9872f56e83 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -216,7 +216,7 @@ func TestBucketFilterExtLabelsMatchers(t *testing.T) { }, }, } - b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) + b, _ := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) ms := []*labels.Matcher{ {Type: labels.MatchNotEqual, Name: "a", Value: "b"}, } @@ -264,7 +264,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { }, } - b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) + b, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) testutil.Ok(t, err) cases := []struct { @@ -1080,10 +1080,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { }, }, bkt: bkt, - logger: log.NewNopLogger(), metrics: s, indexCache: noopCache{}, } + logger := log.NewNopLogger() buf := encoding.Encbuf{} buf.PutByte(0) @@ -1103,7 +1103,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { } // Success with no refetches. - testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1113,7 +1113,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with 2 refetches. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1123,7 +1123,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with refetch on first element. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), }, r.loadedSeries) @@ -1137,7 +1137,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) // Fail, but no recursion at least. - testutil.NotOk(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.NotOk(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { @@ -1277,6 +1277,7 @@ func benchmarkExpandedPostings( }) iRegexBigValueSet := labels.MustNewMatcher(labels.MatchRegexp, "uniq", strings.Join(bigValueSet, "|")) + logger := log.NewNopLogger() series = series / 5 cases := []struct { name string @@ -1309,7 +1310,6 @@ func benchmarkExpandedPostings( for _, c := range cases { t.Run(c.name, func(t testutil.TB) { b := &bucketBlock{ - logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, @@ -1322,7 +1322,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1342,7 +1342,6 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil)) testutil.Ok(t, err) b := &bucketBlock{ - logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, @@ -1351,13 +1350,14 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } + logger := log.NewNopLogger() indexr := newBucketIndexReader(b) matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") ctx := context.Background() dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -1376,7 +1376,6 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, DefaultPostingOffsetInMemorySampling, indexheader.NewBinaryReaderMetrics(nil)) testutil.Ok(t, err) b := &bucketBlock{ - logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), indexHeaderReader: r, indexCache: noopCache{}, @@ -1386,6 +1385,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { estimatedMaxSeriesSize: 20, } + logger := log.NewNopLogger() indexr := newBucketIndexReader(b) // matcher1 and matcher2 will match nothing after intersection. matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") @@ -1393,7 +1393,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+") ctx := context.Background() dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant, logger) testutil.Ok(t, err) // We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers. testutil.Equals(t, ps, emptyLazyPostings) @@ -1716,7 +1716,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1 = &bucketBlock{ indexCache: indexCache, - logger: logger, metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, @@ -1757,7 +1756,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b2 = &bucketBlock{ indexCache: indexCache, - logger: logger, metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, @@ -2754,7 +2752,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil) + blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil) testutil.Ok(b, err) b.ResetTimer() @@ -2763,7 +2761,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { offset := int64(0) length := readLengths[n%len(readLengths)] - _, err := blk.readChunkRange(ctx, 0, offset, length, byteRanges{{offset: 0, length: int(length)}}) + _, err := blk.readChunkRange(ctx, 0, offset, length, byteRanges{{offset: 0, length: int(length)}}, logger) if err != nil { b.Fatal(err.Error()) } @@ -2843,7 +2841,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil) + blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil) testutil.Ok(b, err) return blk, blockMeta } @@ -3459,6 +3457,7 @@ func TestExpandedPostingsRace(t *testing.T) { testutil.Ok(t, bkt.Close()) }) + logger := log.NewNopLogger() // Create a block. head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, "head"), @@ -3498,7 +3497,6 @@ func TestExpandedPostingsRace(t *testing.T) { blk, err := newBucketBlock( context.Background(), - log.NewLogfmtLogger(os.Stderr), newBucketStoreMetrics(nil), m, bkt, @@ -3543,7 +3541,7 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) + refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index cfcf987e14..3eb50e6751 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -8,6 +8,7 @@ import ( "math" "strings" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -192,6 +193,7 @@ func fetchLazyExpandedPostings( lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string, + logger log.Logger, ) (*lazyExpandedPostings, error) { var ( err error @@ -222,7 +224,7 @@ func fetchLazyExpandedPostings( } } - ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant) + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant, logger) if err != nil { return nil, err } @@ -267,9 +269,9 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label return keys, lazyMatchers } -func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) { +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string, logger log.Logger) ([]storage.SeriesRef, []*labels.Matcher, error) { keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant, logger) defer func() { for _, closeFn := range closeFns { closeFn() diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 08e30b6d5b..9f1293beeb 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/efficientgo/core/testutil" - "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -254,7 +253,6 @@ func (h *mockIndexHeaderReader) LabelNames() ([]string, error) { return nil, nil func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { ctx := context.Background() - logger := log.NewNopLogger() dir := t.TempDir() bkt, err := filesystem.NewBucket(dir) testutil.Ok(t, err) @@ -555,7 +553,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { t.Run(tc.name, func(t *testing.T) { headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} registry := prometheus.NewRegistry() - block, err := newBucketBlock(ctx, logger, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) testutil.Ok(t, err) ir := newBucketIndexReader(block) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) From b2581ab54dbbd47541c2e1c62bdfb40a1c884605 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 16 May 2024 12:53:56 -0700 Subject: [PATCH 2/8] lint Signed-off-by: Ben Ye --- cmd/thanos/store.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b292ec62ea..8e1253544c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -6,7 +6,6 @@ package main import ( "context" "fmt" - "github.com/thanos-io/thanos/pkg/server/http/middleware" "strconv" "strings" "time" @@ -18,7 +17,6 @@ import ( grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" "github.com/oklog/run" - "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" commonmodel "github.com/prometheus/common/model" @@ -46,6 +44,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" + "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" From fbf2b502c749c75497fe34074b93e36f0c0d2494 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 16 May 2024 12:56:47 -0700 Subject: [PATCH 3/8] lint Signed-off-by: Ben Ye --- cmd/thanos/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 8e1253544c..30e9776bdd 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -17,6 +17,7 @@ import ( grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" "github.com/oklog/run" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" commonmodel "github.com/prometheus/common/model" From a7e2e73c2fd3aa61eee98b5aaba9af862ddc2241 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 16 May 2024 12:58:33 -0700 Subject: [PATCH 4/8] fix tests Signed-off-by: Ben Ye --- pkg/store/lazy_postings.go | 5 +++-- pkg/store/lazy_postings_test.go | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 3eb50e6751..4bccc1b5f4 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -40,7 +40,7 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { return p != nil && len(p.matchers) > 0 } -func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, logger log.Logger) ([]*postingGroup, bool, error) { if len(postingGroups) <= 1 { return postingGroups, false, nil } @@ -61,7 +61,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups continue } if rng.End <= rng.Start { - level.Error(r.block.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") + level.Error(logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } // Each range starts from the #entries field which is 4 bytes. @@ -215,6 +215,7 @@ func fetchLazyExpandedPostings( int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. lazyExpandedPostingSizeBytes, + logger, ) if err != nil { return nil, err diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 9f1293beeb..72075f457e 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -258,6 +259,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() + logger := log.NewNopLogger() inputError := errors.New("random") blockID := ulid.MustNew(1, nil) meta := &metadata.Meta{ @@ -557,7 +559,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { testutil.Ok(t, err) ir := newBucketIndexReader(block) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) - pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter, logger) if err != nil { testutil.Equals(t, tc.expectedError, err.Error()) return From 7252b34699671b9036bab80d85ad4911669717be Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 16 May 2024 13:47:58 -0700 Subject: [PATCH 5/8] fix test Signed-off-by: Ben Ye --- pkg/store/bucket.go | 8 +++++--- pkg/store/bucket_test.go | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b31adaf9b9..21a7dbef88 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -453,6 +453,10 @@ func WithLogger(logger log.Logger) BucketStoreOption { type RequestLoggerFunc func(ctx context.Context, log log.Logger) log.Logger +func NoopRequestLoggerFunc(_ context.Context, logger log.Logger) log.Logger { + return logger +} + // WithRequestLoggerFunc sets the BucketStore to use the passed RequestLoggerFunc // to initialize logger during query time. func WithRequestLoggerFunc(loggerFunc RequestLoggerFunc) BucketStoreOption { @@ -595,9 +599,7 @@ func NewBucketStore( seriesBatchSize: SeriesBatchSize, sortingStrategy: sortingStrategyStore, indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, - requestLoggerFunc: func(ctx context.Context, logger log.Logger) log.Logger { - return logger - }, + requestLoggerFunc: NoopRequestLoggerFunc, } for _, option := range options { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 9872f56e83..a46620cba3 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1787,6 +1787,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), seriesBatchSize: SeriesBatchSize, + requestLoggerFunc: NoopRequestLoggerFunc, } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { From 627d662f8409a9d132b40fa49517d6e44f5a09e1 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 17 May 2024 09:46:06 -0700 Subject: [PATCH 6/8] address comments Signed-off-by: Ben Ye --- pkg/store/bucket.go | 62 +++++++++++++++++---------------- pkg/store/bucket_test.go | 14 ++++---- pkg/store/lazy_postings.go | 13 +++---- pkg/store/lazy_postings_test.go | 4 +-- 4 files changed, 46 insertions(+), 47 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 21a7dbef88..0aed3368b1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1053,7 +1053,7 @@ func newBlockSeriesClient( ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { - chunkr = b.chunkReader() + chunkr = b.chunkReader(logger) } extLset := b.extLset @@ -1069,7 +1069,7 @@ func newBlockSeriesClient( mint: req.MinTime, maxt: req.MaxTime, - indexr: b.indexReader(), + indexr: b.indexReader(logger), chunkr: chunkr, seriesLimiter: seriesLimiter, chunksLimiter: chunksLimiter, @@ -1132,7 +1132,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant, b.logger) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1314,7 +1314,7 @@ OUTER: } if !b.skipChunks { - if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant, b.logger); err != nil { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "load chunks") } } @@ -1804,9 +1804,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader() - blockLogger := log.With(logger, "block", b.meta.ULID) + indexr := b.indexReader(blockLogger) g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{ @@ -2026,9 +2025,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader() - blockLogger := log.With(logger, "block", b.meta.ULID) + indexr := b.indexReader(blockLogger) g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{ @@ -2432,14 +2430,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) } -func (b *bucketBlock) indexReader() *bucketIndexReader { +func (b *bucketBlock) indexReader(logger log.Logger) *bucketIndexReader { b.pendingReaders.Add(1) - return newBucketIndexReader(b) + return newBucketIndexReader(b, logger) } -func (b *bucketBlock) chunkReader() *bucketChunkReader { +func (b *bucketBlock) chunkReader(logger log.Logger) *bucketChunkReader { b.pendingReaders.Add(1) - return newBucketChunkReader(b) + return newBucketChunkReader(b, logger) } // matchRelabelLabels verifies whether the block matches the given matchers. @@ -2476,9 +2474,10 @@ type bucketIndexReader struct { loadedSeries map[storage.SeriesRef][]byte indexVersion int + logger log.Logger } -func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { +func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { r := &bucketIndexReader{ block: block, dec: &index.Decoder{ @@ -2486,6 +2485,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { }, stats: &queryStats{}, loadedSeries: map[storage.SeriesRef][]byte{}, + logger: logger, } return r } @@ -2516,14 +2516,14 @@ func (r *bucketIndexReader) reset(size int) { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string, logger log.Logger) (*lazyExpandedPostings, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) { // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { return nil, nil } - hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant, logger) + hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant) if err != nil { return nil, err } @@ -2568,7 +2568,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant, logger) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } @@ -2866,7 +2866,7 @@ type postingPtr struct { ptr index.Range } -func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string, logger log.Logger) (bool, []storage.SeriesRef, error) { +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant) if !hit { return false, nil, nil @@ -2885,13 +2885,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, }() // If failed to decode or expand cached postings, return and expand postings again. if err != nil { - level.Error(logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } ps, err := ExpandPostingsWithContext(ctx, p) if err != nil { - level.Error(logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } @@ -2932,7 +2932,7 @@ var bufioReaderPool = sync.Pool{ // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string, logger log.Logger) ([]index.Postings, []func(), error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { var closeFns []func() timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) @@ -3031,7 +3031,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab if err != nil { return errors.Wrap(err, "read postings range") } - defer runutil.CloseWithLogOnErr(logger, partReader, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(r.logger, partReader, "readIndexRange close range reader") brdr.Reset(partReader) rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) @@ -3168,7 +3168,7 @@ func (it *bigEndianPostings) length() int { return len(it.list) / 4 } -func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { +func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string) error { timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration.WithLabelValues(tenant)) defer func() { d := timer.ObserveDuration() @@ -3196,13 +3196,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser i, j := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { - return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant, logger) + return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant) }) } return g.Wait() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string) error { begin := time.Now() stats := new(queryStats) defer func() { @@ -3216,7 +3216,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series stats.DataDownloadedSizeSum += units.Base2Bytes(end - start) } - b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), logger) + b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger) if err != nil { return errors.Wrap(err, "read series range") } @@ -3430,17 +3430,19 @@ type bucketChunkReader struct { chunkBytesMtx sync.Mutex stats *queryStats chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. + logger log.Logger loadingChunksMtx sync.Mutex loadingChunks bool finishLoadingChks chan struct{} } -func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { +func newBucketChunkReader(block *bucketBlock, logger log.Logger) *bucketChunkReader { return &bucketChunkReader{ block: block, stats: &queryStats{}, toLoad: make([][]loadIdx, len(block.chunkObjs)), + logger: logger, } } @@ -3487,7 +3489,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) } // load loads all added chunks and saves resulting aggrs to refs. -func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { r.loadingChunksMtx.Lock() r.loadingChunks = true r.loadingChunksMtx.Unlock() @@ -3525,7 +3527,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ p := p indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant, logger) + return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant) }) } } @@ -3534,7 +3536,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error { fetchBegin := time.Now() stats := new(queryStats) defer func() { @@ -3547,7 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a if err != nil { return errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(r.logger, reader, "readChunkRange close range reader") bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) stats.chunksFetchCount++ diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a46620cba3..9058ccb0bf 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1318,11 +1318,11 @@ func benchmarkExpandedPostings( partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } - indexr := newBucketIndexReader(b) + indexr := newBucketIndexReader(b, logger) t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1351,13 +1351,13 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { } logger := log.NewNopLogger() - indexr := newBucketIndexReader(b) + indexr := newBucketIndexReader(b, logger) matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") // Match nothing. matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*") ctx := context.Background() dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -1386,14 +1386,14 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { } logger := log.NewNopLogger() - indexr := newBucketIndexReader(b) + indexr := newBucketIndexReader(b, logger) // matcher1 and matcher2 will match nothing after intersection. matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*") matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+") ctx := context.Background() dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant, logger) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) // We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers. testutil.Equals(t, ps, emptyLazyPostings) @@ -3542,7 +3542,7 @@ func TestExpandedPostingsRace(t *testing.T) { i := i bb := bb go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger) + refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 4bccc1b5f4..1858b7dee4 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -8,7 +8,6 @@ import ( "math" "strings" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -40,7 +39,7 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { return p != nil && len(p.matchers) > 0 } -func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, logger log.Logger) ([]*postingGroup, bool, error) { +func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) { if len(postingGroups) <= 1 { return postingGroups, false, nil } @@ -61,7 +60,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups continue } if rng.End <= rng.Start { - level.Error(logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") + level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } // Each range starts from the #entries field which is 4 bytes. @@ -193,7 +192,6 @@ func fetchLazyExpandedPostings( lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string, - logger log.Logger, ) (*lazyExpandedPostings, error) { var ( err error @@ -215,7 +213,6 @@ func fetchLazyExpandedPostings( int64(r.block.estimatedMaxSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. lazyExpandedPostingSizeBytes, - logger, ) if err != nil { return nil, err @@ -225,7 +222,7 @@ func fetchLazyExpandedPostings( } } - ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant, logger) + ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant) if err != nil { return nil, err } @@ -270,9 +267,9 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label return keys, lazyMatchers } -func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string, logger log.Logger) ([]storage.SeriesRef, []*labels.Matcher, error) { +func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) { keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups) - fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant, logger) + fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant) defer func() { for _, closeFn := range closeFns { closeFn() diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 72075f457e..bec95ac413 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -557,9 +557,9 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { registry := prometheus.NewRegistry() block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) testutil.Ok(t, err) - ir := newBucketIndexReader(block) + ir := newBucketIndexReader(block, logger) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) - pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter, logger) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter) if err != nil { testutil.Equals(t, tc.expectedError, err.Error()) return From b8ba0e72c12a48e1df93593a2a50eb6785447499 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 17 May 2024 09:49:02 -0700 Subject: [PATCH 7/8] fix tests Signed-off-by: Ben Ye --- pkg/store/bucket.go | 8 ++++---- pkg/store/bucket_test.go | 9 +++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 0aed3368b1..d68d08fdc1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1222,7 +1222,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error { b.chunkr.reset() } - if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter, b.tenant, b.logger); err != nil { + if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "preload series") } @@ -3240,10 +3240,10 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series // Inefficient, but should be rare. r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc() - level.Warn(logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) + level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. - return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant, logger) + return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant) } c = c[n : n+int(l)] r.loadedSeriesMtx.Lock() @@ -3629,7 +3629,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a } stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen) - nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, logger) + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 9058ccb0bf..0314a7ad30 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1100,10 +1100,11 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { block: b, stats: &queryStats{}, loadedSeries: map[storage.SeriesRef][]byte{}, + logger: logger, } // Success with no refetches. - testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1113,7 +1114,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with 2 refetches. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1123,7 +1124,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with refetch on first element. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), }, r.loadedSeries) @@ -1137,7 +1138,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) // Fail, but no recursion at least. - testutil.NotOk(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant, logger)) + testutil.NotOk(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { From a3703211dcdccd4bb400bfacd78d8c317750e1f1 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 17 May 2024 10:01:12 -0700 Subject: [PATCH 8/8] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 418ca3b3e3..5d19a70879 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration. +- [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs. ### Changed