diff --git a/CHANGELOG.md b/CHANGELOG.md index b441f99460..2c26dc918f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931 * [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933 * [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950 +* [ENHANCEMENT] Store Gateway: Log gRPC requests together with headers configured in `http_request_headers_to_log`. #5958 * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 * [CHANGE] Query Frontend/Ruler: Omit empty data field in API response. #5953 #5954 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 diff --git a/go.mod b/go.mod index 043d867495..677e2b58d6 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/thanos-io/objstore v0.0.0-20240309075357-e8336a5fd5f3 github.com/thanos-io/promql-engine v0.0.0-20240405095051-b7d0da367508 - github.com/thanos-io/thanos v0.34.2-0.20240501161908-1e745af6720c + github.com/thanos-io/thanos v0.35.1-0.20240517203736-9e6cbd9fdd9d github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.13 diff --git a/go.sum b/go.sum index 2d4f6926dd..578ff101fa 100644 --- a/go.sum +++ b/go.sum @@ -1421,8 +1421,8 @@ github.com/thanos-io/objstore v0.0.0-20240309075357-e8336a5fd5f3 h1:Q0BjHI7FMe5K github.com/thanos-io/objstore v0.0.0-20240309075357-e8336a5fd5f3/go.mod h1:ptMYNPgbyAR7a2Ab2t7zHA2/0be2ePyawVR7lp7fZtg= github.com/thanos-io/promql-engine v0.0.0-20240405095051-b7d0da367508 h1:4X0ThYb7/wTTKS73wT13ixw0lj5OJ87g45RWIZhPZDA= github.com/thanos-io/promql-engine v0.0.0-20240405095051-b7d0da367508/go.mod h1:FEPnabuTql1bDA4OUM41mwcZOJ20R436k8vq+xtGEG0= -github.com/thanos-io/thanos v0.34.2-0.20240501161908-1e745af6720c h1:clWAhj5L7+Dnw/apO874hUaVxDRHfkm9If4Qv/6CbIo= -github.com/thanos-io/thanos v0.34.2-0.20240501161908-1e745af6720c/go.mod h1:WHGZyM/qwp857mJr8Q0d7K6eQoLtLv+6p7RNpT/yeIE= +github.com/thanos-io/thanos v0.35.1-0.20240517203736-9e6cbd9fdd9d h1:Uomb1Yvuz1HDCJL6s0rTiHpaJpHvr8NRarO5TprM7Cs= +github.com/thanos-io/thanos v0.35.1-0.20240517203736-9e6cbd9fdd9d/go.mod h1:mwjTxpNgULRgeOr5qWmM2IKiyu4SNh/1JypUyPtlrQA= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index d07149dd38..1bd6121134 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -583,6 +583,9 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro bucketStoreReg := prometheus.NewRegistry() bucketStoreOpts := []store.BucketStoreOption{ store.WithLogger(userLogger), + store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger { + return util_log.HeadersFromContext(ctx, logger) + }), store.WithRegistry(bucketStoreReg), store.WithIndexCache(u.indexCache), store.WithQueryGate(u.queryGate), diff --git a/pkg/util/log/wrappers.go b/pkg/util/log/wrappers.go index b6fd2c7591..24266bdedf 100644 --- a/pkg/util/log/wrappers.go +++ b/pkg/util/log/wrappers.go @@ -33,7 +33,7 @@ func WithTraceID(traceID string, l log.Logger) log.Logger { // log := util.WithContext(ctx) // log.Errorf("Could not chunk chunks: %v", err) func WithContext(ctx context.Context, l log.Logger) log.Logger { - l = headersFromContext(ctx, l) + l = HeadersFromContext(ctx, l) // Weaveworks uses "orgs" and "orgID" to represent Cortex users, // even though the code-base generally uses `userID` to refer to the same thing. @@ -57,7 +57,7 @@ func WithSourceIPs(sourceIPs string, l log.Logger) log.Logger { } // HeadersFromContext enables the logging of specified HTTP Headers that have been added to a context -func headersFromContext(ctx context.Context, l log.Logger) log.Logger { +func HeadersFromContext(ctx context.Context, l log.Logger) log.Logger { headerContentsMap := HeaderMapFromContext(ctx) for header, contents := range headerContentsMap { l = log.With(l, header, contents) diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/markers.go b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/markers.go index 83273eb343..0a351a5fab 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/markers.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/markers.go @@ -79,6 +79,8 @@ const ( IndexSizeExceedingNoCompactReason = "index-size-exceeding" // OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked. OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" + // DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average. + DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go index a479ee242d..11567fb06e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go @@ -20,12 +20,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" - "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/tombstones" "gopkg.in/yaml.v3" + "github.com/thanos-io/thanos/pkg/extpromql" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -136,7 +136,7 @@ type Rewrite struct { type Matchers []*labels.Matcher func (m *Matchers) UnmarshalYAML(value *yaml.Node) (err error) { - *m, err = parser.ParseMetricSelector(value.Value) + *m, err = extpromql.ParseMetricSelector(value.Value) if err != nil { return errors.Wrapf(err, "parse metric selector %v", value.Value) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go index 2f0dbfdb27..7232119b0b 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "time" @@ -871,6 +872,21 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } + defer func() { + if p := recover(); p != nil { + var sb strings.Builder + + cgIDs := cg.IDs() + for i, blid := range cgIDs { + _, _ = sb.WriteString(blid.String()) + if i < len(cgIDs)-1 { + _, _ = sb.WriteString(",") + } + } + rerr = fmt.Errorf("paniced while compacting %s: %v", sb.String(), p) + } + }() + errChan := make(chan error, 1) err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go b/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go index 783191cacf..6d7d03eea2 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go @@ -234,6 +234,67 @@ type largeTotalIndexSizeFilter struct { var _ Planner = &largeTotalIndexSizeFilter{} +type verticalCompactionDownsampleFilter struct { + bkt objstore.Bucket + markedForNoCompact prometheus.Counter + + *largeTotalIndexSizeFilter +} + +var _ Planner = &verticalCompactionDownsampleFilter{} + +func WithVerticalCompactionDownsampleFilter(with *largeTotalIndexSizeFilter, bkt objstore.Bucket, markedForNoCompact prometheus.Counter) Planner { + return &verticalCompactionDownsampleFilter{ + markedForNoCompact: markedForNoCompact, + bkt: bkt, + largeTotalIndexSizeFilter: with, + } +} + +func (v *verticalCompactionDownsampleFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { + noCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, 0) +PlanLoop: + for { + plan, err := v.plan(ctx, noCompactMarked, metasByMinTime) + if err != nil { + return nil, err + } + + if len(selectOverlappingMetas(plan)) == 0 { + return plan, nil + } + + // If we have downsampled blocks, we need to mark them as no compact because it's impossible to do that with vertical compaction. + // Technically, the resolution is part of the group key but do not attach ourselves to that level of detail. + var marked = false + for _, m := range plan { + if m.Thanos.Downsample.Resolution == 0 { + continue + } + if err := block.MarkForNoCompact( + ctx, + v.logger, + v.bkt, + m.ULID, + metadata.DownsampleVerticalCompactionNoCompactReason, + "verticalCompactionDownsampleFilter: Downsampled block, see https://github.com/thanos-io/thanos/issues/6775", + v.markedForNoCompact, + ); err != nil { + return nil, errors.Wrapf(err, "mark %v for no compaction", m.ULID.String()) + } + noCompactMarked[m.ULID] = &metadata.NoCompactMark{ID: m.ULID, Version: metadata.NoCompactMarkVersion1} + marked = true + } + + if marked { + continue PlanLoop + } + + return plan, nil + + } +} + // WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size. // When found, it marks block for no compaction by placing no-compact-mark.json and updating cache. // NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes. @@ -243,16 +304,19 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) plan(ctx context.Context, extraNoCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { noCompactMarked := t.noCompBlocksFunc() - copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) + copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)+len(extraNoCompactMarked)) for k, v := range noCompactMarked { copiedNoCompactMarked[k] = v } + for k, v := range extraNoCompactMarked { + copiedNoCompactMarked[k] = v + } PlanLoop: for { - plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) + plan, err := t.tsdbBasedPlanner.plan(copiedNoCompactMarked, metasByMinTime) if err != nil { return nil, err } @@ -303,3 +367,7 @@ PlanLoop: return plan, nil } } + +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { + return t.plan(ctx, nil, metasByMinTime) +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go b/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go new file mode 100644 index 0000000000..43d7188fdc --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/extpromql/parser.go @@ -0,0 +1,66 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package extpromql + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + + "github.com/thanos-io/promql-engine/execution/function" +) + +// ParseExpr parses the input PromQL expression and returns the parsed representation. +func ParseExpr(input string) (parser.Expr, error) { + allFuncs := make(map[string]*parser.Function, len(function.XFunctions)+len(parser.Functions)) + for k, v := range parser.Functions { + allFuncs[k] = v + } + for k, v := range function.XFunctions { + allFuncs[k] = v + } + p := parser.NewParser(input, parser.WithFunctions(allFuncs)) + defer p.Close() + return p.ParseExpr() +} + +// ParseMetricSelector parses the provided textual metric selector into a list of +// label matchers. +func ParseMetricSelector(input string) ([]*labels.Matcher, error) { + expr, err := ParseExpr(input) + // because of the AST checking present in the ParseExpr function, + // we need to ignore the error if it is just the check for empty name matcher. + if err != nil && !isEmptyNameMatcherErr(err) { + return nil, err + } + + vs, ok := expr.(*parser.VectorSelector) + if !ok { + return nil, fmt.Errorf("expected type *parser.VectorSelector, got %T", expr) + } + + matchers := make([]*labels.Matcher, len(vs.LabelMatchers)) + for i, lm := range vs.LabelMatchers { + matchers[i] = &labels.Matcher{ + Type: lm.Type, + Name: lm.Name, + Value: lm.Value, + } + } + + return matchers, nil +} + +func isEmptyNameMatcherErr(err error) bool { + var parseErrs parser.ParseErrors + if errors.As(err, &parseErrs) { + return len(parseErrs) == 1 && + strings.HasSuffix(parseErrs[0].Error(), "vector selector must contain at least one non-empty matcher") + } + + return false +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go b/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go index 2b0e67e056..16e4b7cb31 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go +++ b/vendor/github.com/thanos-io/thanos/pkg/query/remote_engine.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -187,10 +188,11 @@ func (r *remoteEngine) NewRangeQuery(_ context.Context, _ promql.QueryOpts, plan client: r.client, opts: r.opts, - plan: plan, - start: start, - end: end, - interval: interval, + plan: plan, + start: start, + end: end, + interval: interval, + remoteAddr: r.client.GetAddress(), }, nil } @@ -200,10 +202,11 @@ func (r *remoteEngine) NewInstantQuery(_ context.Context, _ promql.QueryOpts, pl client: r.client, opts: r.opts, - plan: plan, - start: ts, - end: ts, - interval: 0, + plan: plan, + start: ts, + end: ts, + interval: 0, + remoteAddr: r.client.GetAddress(), }, nil } @@ -212,10 +215,11 @@ type remoteQuery struct { client Client opts Opts - plan api.RemoteQuery - start time.Time - end time.Time - interval time.Duration + plan api.RemoteQuery + start time.Time + end time.Time + interval time.Duration + remoteAddr string cancel context.CancelFunc } @@ -227,6 +231,18 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { r.cancel = cancel defer cancel() + queryRange := r.end.Sub(r.start) + span, qctx := opentracing.StartSpanFromContext(qctx, "remote_query_exec", opentracing.Tags{ + "query": r.plan.String(), + "remote_address": r.remoteAddr, + "start": r.start.UTC().String(), + "end": r.end.UTC().String(), + "interval_seconds": r.interval.Seconds(), + "range_seconds": queryRange.Seconds(), + "range_human": queryRange, + }) + defer span.Finish() + var maxResolution int64 if r.opts.AutoDownsample { maxResolution = int64(r.interval.Seconds() / 5) diff --git a/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go b/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go index 7b8e849bca..80cb8cb3a8 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go +++ b/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go @@ -22,6 +22,8 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" + + "github.com/thanos-io/thanos/pkg/extpromql" ) var ( @@ -88,7 +90,7 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // // The le label is excluded from sharding. func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { - expr, err := parser.ParseExpr(query) + expr, err := extpromql.ParseExpr(query) if err != nil { return nonShardableQuery(), err } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 43ac0d6c1a..d68d08fdc1 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxChunkFunc BlockEstimator indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc + + requestLoggerFunc RequestLoggerFunc } func (s *BucketStore) validate() error { @@ -449,6 +451,20 @@ func WithLogger(logger log.Logger) BucketStoreOption { } } +type RequestLoggerFunc func(ctx context.Context, log log.Logger) log.Logger + +func NoopRequestLoggerFunc(_ context.Context, logger log.Logger) log.Logger { + return logger +} + +// WithRequestLoggerFunc sets the BucketStore to use the passed RequestLoggerFunc +// to initialize logger during query time. +func WithRequestLoggerFunc(loggerFunc RequestLoggerFunc) BucketStoreOption { + return func(s *BucketStore) { + s.requestLoggerFunc = loggerFunc + } +} + // WithRegistry sets a registry that BucketStore uses to register metrics with. func WithRegistry(reg prometheus.Registerer) BucketStoreOption { return func(s *BucketStore) { @@ -583,6 +599,7 @@ func NewBucketStore( seriesBatchSize: SeriesBatchSize, sortingStrategy: sortingStrategyStore, indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, + requestLoggerFunc: NoopRequestLoggerFunc, } for _, option := range options { @@ -779,7 +796,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er b, err := newBucketBlock( ctx, - log.With(s.logger, "block", meta.ULID), s.metrics, meta, s.bkt, @@ -1037,7 +1053,7 @@ func newBlockSeriesClient( ) *blockSeriesClient { var chunkr *bucketChunkReader if !req.SkipChunks { - chunkr = b.chunkReader() + chunkr = b.chunkReader(logger) } extLset := b.extLset @@ -1053,7 +1069,7 @@ func newBlockSeriesClient( mint: req.MinTime, maxt: req.MaxTime, - indexr: b.indexReader(), + indexr: b.indexReader(logger), chunkr: chunkr, seriesLimiter: seriesLimiter, chunksLimiter: chunksLimiter, @@ -1469,6 +1485,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false + + logger = s.requestLoggerFunc(ctx, s.logger) ) if req.Hints != nil { @@ -1505,7 +1523,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers) if s.debugLogging { - debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) + debugFoundBlockSetOverview(logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) } for _, b := range blocks { @@ -1521,7 +1539,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store blockClient := newBlockSeriesClient( srv.Context(), - s.logger, + log.With(logger, "block", blk.meta.ULID), blk, req, seriesLimiter, @@ -1633,7 +1651,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum)) s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum))) - level.Debug(s.logger).Log("msg", "stats query processed", + level.Debug(logger).Log("msg", "stats query processed", "request", req, "tenant", tenant, "stats", fmt.Sprintf("%+v", stats), "err", err) @@ -1764,6 +1782,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) + var logger = s.requestLoggerFunc(ctx, s.logger) for _, b := range s.blocks { b := b @@ -1785,7 +1804,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader() + blockLogger := log.With(logger, "block", b.meta.ULID) + indexr := b.indexReader(blockLogger) g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{ @@ -1795,7 +1815,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq "block.resolution": b.meta.Thanos.Downsample.Resolution, }) defer span.Finish() - defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") + defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label names") var result []string if len(reqSeriesMatchersNoExtLabels) == 0 { @@ -1826,7 +1846,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } blockClient := newBlockSeriesClient( newCtx, - s.logger, + blockLogger, b, seriesReq, seriesLimiter, @@ -1973,6 +1993,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) + var logger = s.requestLoggerFunc(ctx, s.logger) for _, b := range s.blocks { b := b @@ -2004,7 +2025,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR resHints.AddQueriedBlock(b.meta.ULID) - indexr := b.indexReader() + blockLogger := log.With(logger, "block", b.meta.ULID) + indexr := b.indexReader(blockLogger) + g.Go(func() error { span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{ "block.id": b.meta.ULID, @@ -2013,7 +2036,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR "block.resolution": b.meta.Thanos.Downsample.Resolution, }) defer span.Finish() - defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") + defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label values") var result []string if len(reqSeriesMatchersNoExtLabels) == 0 { @@ -2037,7 +2060,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } blockClient := newBlockSeriesClient( newCtx, - s.logger, + blockLogger, b, seriesReq, seriesLimiter, @@ -2267,7 +2290,6 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M // bucketBlock represents a block that is located in a bucket. It holds intermediate // state for the block on local disk. type bucketBlock struct { - logger log.Logger metrics *bucketStoreMetrics bkt objstore.BucketReader meta *metadata.Meta @@ -2294,7 +2316,6 @@ type bucketBlock struct { func newBucketBlock( ctx context.Context, - logger log.Logger, metrics *bucketStoreMetrics, meta *metadata.Meta, bkt objstore.BucketReader, @@ -2319,7 +2340,6 @@ func newBucketBlock( extLset := labels.FromMap(meta.Thanos.Labels) relabelLabels := labels.NewBuilder(extLset).Set(block.BlockIDLabel, meta.ULID.String()).Labels() b = &bucketBlock{ - logger: logger, metrics: metrics, bkt: bkt, indexCache: indexCache, @@ -2358,12 +2378,12 @@ func (b *bucketBlock) indexFilename() string { return path.Join(b.meta.ULID.String(), block.IndexFilename) } -func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) { +func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64, logger log.Logger) ([]byte, error) { r, err := b.bkt.GetRange(ctx, b.indexFilename(), off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(logger, r, "readIndexRange close range reader") // Preallocate the buffer with the exact size so we don't waste allocations // while progressively growing an initial small buffer. The buffer capacity @@ -2376,7 +2396,7 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([] return buf.Bytes(), nil } -func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) { +func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges, logger log.Logger) (*[]byte, error) { if seq < 0 || seq >= len(b.chunkObjs) { return nil, errors.Errorf("unknown segment file for index %d", seq) } @@ -2386,7 +2406,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i if err != nil { return nil, errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader") // Get a buffer from the pool. chunkBuffer, err := b.chunkPool.Get(chunkRanges.size()) @@ -2410,14 +2430,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) } -func (b *bucketBlock) indexReader() *bucketIndexReader { +func (b *bucketBlock) indexReader(logger log.Logger) *bucketIndexReader { b.pendingReaders.Add(1) - return newBucketIndexReader(b) + return newBucketIndexReader(b, logger) } -func (b *bucketBlock) chunkReader() *bucketChunkReader { +func (b *bucketBlock) chunkReader(logger log.Logger) *bucketChunkReader { b.pendingReaders.Add(1) - return newBucketChunkReader(b) + return newBucketChunkReader(b, logger) } // matchRelabelLabels verifies whether the block matches the given matchers. @@ -2454,9 +2474,10 @@ type bucketIndexReader struct { loadedSeries map[storage.SeriesRef][]byte indexVersion int + logger log.Logger } -func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { +func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { r := &bucketIndexReader{ block: block, dec: &index.Decoder{ @@ -2464,6 +2485,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { }, stats: &queryStats{}, loadedSeries: map[storage.SeriesRef][]byte{}, + logger: logger, } return r } @@ -2863,13 +2885,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, }() // If failed to decode or expand cached postings, return and expand postings again. if err != nil { - level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } ps, err := ExpandPostingsWithContext(ctx, p) if err != nil { - level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) + level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil } @@ -3009,7 +3031,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab if err != nil { return errors.Wrap(err, "read postings range") } - defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader") + defer runutil.CloseWithLogOnErr(r.logger, partReader, "readIndexRange close range reader") brdr.Reset(partReader) rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length) @@ -3194,7 +3216,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series stats.DataDownloadedSizeSum += units.Base2Bytes(end - start) } - b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) + b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger) if err != nil { return errors.Wrap(err, "read series range") } @@ -3218,7 +3240,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series // Inefficient, but should be rare. r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc() - level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) + level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant) @@ -3408,17 +3430,19 @@ type bucketChunkReader struct { chunkBytesMtx sync.Mutex stats *queryStats chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. + logger log.Logger loadingChunksMtx sync.Mutex loadingChunks bool finishLoadingChks chan struct{} } -func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { +func newBucketChunkReader(block *bucketBlock, logger log.Logger) *bucketChunkReader { return &bucketChunkReader{ block: block, stats: &queryStats{}, toLoad: make([][]loadIdx, len(block.chunkObjs)), + logger: logger, } } @@ -3525,7 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a if err != nil { return errors.Wrap(err, "get range reader") } - defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader") + defer runutil.CloseWithLogOnErr(r.logger, reader, "readChunkRange close range reader") bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) stats.chunksFetchCount++ @@ -3605,7 +3629,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a } stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen) - nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) + nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go index cfcf987e14..1858b7dee4 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go @@ -60,7 +60,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups continue } if rng.End <= rng.Start { - level.Error(r.block.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") + level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") return postingGroups, false, nil } // Each range starts from the #entries field which is 4 bytes. diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go b/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go index 778879e117..6dbe5df7a3 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go @@ -427,7 +427,7 @@ func storeMatches(ctx context.Context, s Client, debugLogging bool, mint, maxt i } extLset := s.LabelSets() - if !labelSetsMatch(matchers, extLset...) { + if !LabelSetsMatch(matchers, extLset...) { if debugLogging { reason = fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers) } @@ -449,7 +449,7 @@ func storeMatchDebugMetadata(s Client, storeDebugMatchers [][]*labels.Matcher) ( match := false for _, sm := range storeDebugMatchers { - match = match || labelSetsMatch(sm, labels.FromStrings("__address__", addr)) + match = match || LabelSetsMatch(sm, labels.FromStrings("__address__", addr)) } if !match { return false, fmt.Sprintf("__address__ %v does not match debug store metadata matchers: %v", addr, storeDebugMatchers) @@ -457,8 +457,8 @@ func storeMatchDebugMetadata(s Client, storeDebugMatchers [][]*labels.Matcher) ( return true, "" } -// labelSetsMatch returns false if all label-set do not match the matchers (aka: OR is between all label-sets). -func labelSetsMatch(matchers []*labels.Matcher, lset ...labels.Labels) bool { +// LabelSetsMatch returns false if all label-set do not match the matchers (aka: OR is between all label-sets). +func LabelSetsMatch(matchers []*labels.Matcher, lset ...labels.Labels) bool { if len(lset) == 0 { return true } diff --git a/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go b/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go index aec0bad86a..9da1372933 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go +++ b/vendor/github.com/thanos-io/thanos/pkg/tenancy/tenancy.go @@ -11,8 +11,9 @@ import ( "github.com/pkg/errors" "github.com/prometheus-community/prom-label-proxy/injectproxy" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" "google.golang.org/grpc/metadata" + + "github.com/thanos-io/thanos/pkg/extpromql" ) type contextKey int @@ -148,7 +149,7 @@ func EnforceQueryTenancy(tenantLabel string, tenant string, query string) (strin e := injectproxy.NewEnforcer(false, labelMatcher) - expr, err := parser.ParseExpr(query) + expr, err := extpromql.ParseExpr(query) if err != nil { return "", errors.Wrap(err, "error parsing query string, when enforcing tenenacy") } @@ -178,7 +179,7 @@ func getLabelMatchers(formMatchers []string, tenant string, enforceTenancy bool, } for _, s := range formMatchers { - matchers, err := parser.ParseMetricSelector(s) + matchers, err := extpromql.ParseMetricSelector(s) if err != nil { return nil, err } diff --git a/vendor/modules.txt b/vendor/modules.txt index 6aeeae769f..a79e225713 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -969,7 +969,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.34.2-0.20240501161908-1e745af6720c +# github.com/thanos-io/thanos v0.35.1-0.20240517203736-9e6cbd9fdd9d ## explicit; go 1.21 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block @@ -993,6 +993,7 @@ github.com/thanos-io/thanos/pkg/extgrpc/snappy github.com/thanos-io/thanos/pkg/extkingpin github.com/thanos-io/thanos/pkg/extprom github.com/thanos-io/thanos/pkg/extprom/http +github.com/thanos-io/thanos/pkg/extpromql github.com/thanos-io/thanos/pkg/gate github.com/thanos-io/thanos/pkg/info/infopb github.com/thanos-io/thanos/pkg/losertree