diff --git a/.gitignore b/.gitignore index 8d780757c5e..da19d6ca435 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ /tempodb/encoding/benchmark_block /cmd/tempo-serverless/vendor/ /pkg/traceql/y.output -private-key.key \ No newline at end of file +private-key.key diff --git a/CHANGELOG.md b/CHANGELOG.md index 48ba2d1b4b2..714b7ba90b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ ## main / unreleased * [FEATURE] Add support for Azure Workload Identity authentication [#2195](https://github.com/grafana/tempo/pull/2195) (@LambArchie) +* [ENHANCEMENT] Add Throughput and SLO Metrics with SLOConfig in Query Frontend [#2008](https://github.com/grafana/tempo/pull/2008) (@electron0zero) + - **BREAKING CHANGE** `query_frontend_result_metrics_inspected_bytes` metric removed in favour of `query_frontend_bytes_processed_per_second` * [FEATURE] Add flag to check configuration [#2131](https://github.com/grafana/tempo/issues/2131) (@robertscherbarth @agrib-01) * [FEATURE] Add flag to optionally enable all available Go runtime metrics [#2005](https://github.com/grafana/tempo/pull/2005) (@andreasgerstmayr) * [FEATURE] Add support for span `kind` to TraceQL [#2217](https://github.com/grafana/tempo/pull/2217) (@joe-elliott) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 482e7d65f13..74d1a8ad397 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -400,6 +400,16 @@ query_frontend: # (default: 30m) [query_ingesters_until: ] + # If set to a non-zero value, it's value will be used to decide if query is within SLO or not. + # Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data. + # NOTE: `duration_slo` and `throughput_bytes_slo` both must be configured for it to work + [duration_slo: | default = 0s ] + + # If set to a non-zero value, it's value will be used to decide if query is within SLO or not. + # Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data. + [throughput_bytes_slo: | default = 0 ] + + # Trace by ID lookup configuration trace_by_id: # The number of shards to split a trace by id query into. @@ -413,6 +423,10 @@ query_frontend: # The maximum number of requests to execute when hedging. # Requires hedge_requests_at to be set. Must be greater than 0. [hedge_requests_up_to: | default = 2 ] + + # If set to a non-zero value, it's value will be used to decide if query is within SLO or not. + # Query is within SLO if it returned 200 within duration_slo seconds. + [duration_slo: | default = 0s ] ``` ## Querier diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 1faee69983a..902a52cee44 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -2,9 +2,8 @@ package frontend import ( "flag" - "time" - "net/http" + "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -28,11 +27,13 @@ type Config struct { type SearchConfig struct { Sharder SearchSharderConfig `yaml:",inline"` + SLO SLOConfig `yaml:",inline"` } type TraceByIDConfig struct { QueryShards int `yaml:"query_shards,omitempty"` Hedging HedgingConfig `yaml:",inline"` + SLO SLOConfig `yaml:",inline"` } type HedgingConfig struct { @@ -40,7 +41,17 @@ type HedgingConfig struct { HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"` } +type SLOConfig struct { + DurationSLO time.Duration `yaml:"duration_slo,omitempty"` + ThroughputBytesSLO float64 `yaml:"throughput_bytes_slo,omitempty"` +} + func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { + slo := SLOConfig{ + DurationSLO: 0, + ThroughputBytesSLO: 0, + } + cfg.Config.MaxOutstandingPerTenant = 2000 cfg.MaxRetries = 2 cfg.TolerateFailedBlocks = 0 @@ -54,9 +65,11 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, }, + SLO: slo, } cfg.TraceByID = TraceByIDConfig{ QueryShards: 50, + SLO: slo, Hedging: HedgingConfig{ HedgeRequestsAt: 2 * time.Second, HedgeRequestsUpTo: 2, diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 906996a175f..732d93fe608 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -34,7 +34,6 @@ const ( type QueryFrontend struct { TraceByID, Search http.Handler logger log.Logger - queriesPerTenant *prometheus.CounterVec store storage.Store } @@ -70,21 +69,16 @@ func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store stora traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare) searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, store, logger), retryWare) - traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{ - "op": traceByIDOp, - }) - searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{ - "op": searchOp, - }) + traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp}) + searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp}) traces := traceByIDMiddleware.Wrap(next) search := searchMiddleware.Wrap(next) return &QueryFrontend{ - TraceByID: newHandler(traces, traceByIDCounter, logger), - Search: newHandler(search, searchCounter, logger), - logger: logger, - queriesPerTenant: queriesPerTenant, - store: store, + TraceByID: newHandler(traces, traceByIDCounter, logger), + Search: newHandler(search, searchCounter, logger), + logger: logger, + store: store, }, nil } @@ -98,7 +92,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware { rt := NewRoundTripper( next, newDeduper(logger), - newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, logger), + newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, cfg.TraceByID.SLO, logger), newHedgedRequestWare(cfg.TraceByID.Hedging), ) @@ -181,7 +175,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware { func newSearchMiddleware(cfg Config, o *overrides.Overrides, reader tempodb.Reader, logger log.Logger) Middleware { return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { ingesterSearchRT := next - backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, logger)) + backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, cfg.Search.SLO, logger)) return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { // backend search queries require sharding so we pass through a special roundtripper diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go index 66d70d87947..352d88d5405 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/frontend_test.go @@ -27,12 +27,14 @@ func TestFrontendRoundTripsSearch(t *testing.T) { f, err := New(Config{ TraceByID: TraceByIDConfig{ QueryShards: minQueryShards, + SLO: testSLOcfg, }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, }, + SLO: testSLOcfg, }, }, next, nil, nil, log.NewNopLogger(), nil) require.NoError(t, err) @@ -55,6 +57,7 @@ func TestFrontendBadConfigFails(t *testing.T) { ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, }, + SLO: testSLOcfg, }, }, nil, nil, nil, log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)") @@ -63,12 +66,14 @@ func TestFrontendBadConfigFails(t *testing.T) { f, err = New(Config{ TraceByID: TraceByIDConfig{ QueryShards: maxQueryShards + 1, + SLO: testSLOcfg, }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, }, + SLO: testSLOcfg, }, }, nil, nil, nil, log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)") @@ -77,12 +82,14 @@ func TestFrontendBadConfigFails(t *testing.T) { f, err = New(Config{ TraceByID: TraceByIDConfig{ QueryShards: maxQueryShards, + SLO: testSLOcfg, }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: 0, TargetBytesPerRequest: defaultTargetBytesPerRequest, }, + SLO: testSLOcfg, }, }, nil, nil, nil, log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0") @@ -91,12 +98,14 @@ func TestFrontendBadConfigFails(t *testing.T) { f, err = New(Config{ TraceByID: TraceByIDConfig{ QueryShards: maxQueryShards, + SLO: testSLOcfg, }, Search: SearchConfig{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: 0, }, + SLO: testSLOcfg, }, }, nil, nil, nil, log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0") @@ -105,6 +114,7 @@ func TestFrontendBadConfigFails(t *testing.T) { f, err = New(Config{ TraceByID: TraceByIDConfig{ QueryShards: maxQueryShards, + SLO: testSLOcfg, }, Search: SearchConfig{ Sharder: SearchSharderConfig{ @@ -113,6 +123,7 @@ func TestFrontendBadConfigFails(t *testing.T) { QueryIngestersUntil: time.Minute, QueryBackendAfter: time.Hour, }, + SLO: testSLOcfg, }, }, nil, nil, nil, log.NewNopLogger(), nil) assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until") diff --git a/modules/frontend/searchsharding.go b/modules/frontend/searchsharding.go index a2b13da8178..a66c37b5ac4 100644 --- a/modules/frontend/searchsharding.go +++ b/modules/frontend/searchsharding.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strings" "time" @@ -30,12 +31,23 @@ const ( ) var ( - metricInspectedBytes = promauto.NewHistogram(prometheus.HistogramOpts{ + queryThroughput = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "tempo", - Name: "query_frontend_result_metrics_inspected_bytes", - Help: "Inspected Bytes in a search query", + Name: "query_frontend_bytes_processed_per_second", + Help: "Bytes processed per second in the query per tenant", Buckets: prometheus.ExponentialBuckets(1024*1024, 2, 10), // from 1MB up to 1GB - }) + }, []string{"tenant", "op"}) + + searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp}) + + sloQueriesPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "query_frontend_queries_within_slo_total", + Help: "Total Queries within SLO per tenant", + }, []string{"tenant", "op"}) + + sloTraceByIDCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp}) + sloSearchCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp}) ) type searchSharder struct { @@ -44,6 +56,7 @@ type searchSharder struct { overrides *overrides.Overrides cfg SearchSharderConfig + sloCfg SLOConfig logger log.Logger } @@ -58,14 +71,15 @@ type SearchSharderConfig struct { } // newSearchSharder creates a sharding middleware for search -func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, logger log.Logger) Middleware { +func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, sloCfg SLOConfig, logger log.Logger) Middleware { return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { return searchSharder{ next: next, reader: reader, overrides: o, - logger: logger, cfg: cfg, + sloCfg: sloCfg, + logger: logger, } }) } @@ -78,8 +92,6 @@ func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchS // start= // end= func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { - // create search related query metrics here?? - // this is the place where we agg search response. searchReq, err := api.ParseSearchRequest(r) if err != nil { return &http.Response{ @@ -102,6 +114,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardSearch") defer span.Finish() + reqStart := time.Now() // sub context to cancel in-progress sub requests subCtx, subCancel := context.WithCancel(ctx) defer subCancel() @@ -216,25 +229,21 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { // print out request metrics cancelledReqs := startedReqs - overallResponse.finishedRequests - level.Info(s.logger).Log( - "msg", "sharded search query request stats", - "raw_query", r.URL.RawQuery, - "total", len(reqs), - "started", startedReqs, - "finished", overallResponse.finishedRequests, - "cancelled", cancelledReqs) - - // all goroutines have finished, we can safely access searchResults fields directly now - span.SetTag("inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks) - span.SetTag("skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks) - span.SetTag("inspectedBytes", overallResponse.resultsMetrics.InspectedBytes) - span.SetTag("inspectedTraces", overallResponse.resultsMetrics.InspectedTraces) - span.SetTag("skippedTraces", overallResponse.resultsMetrics.SkippedTraces) - span.SetTag("totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes) + reqTime := time.Since(reqStart) + throughput := float64(overallResponse.resultsMetrics.InspectedBytes) / reqTime.Seconds() + searchThroughput.WithLabelValues(tenantID).Observe(throughput) + query, _ := url.PathUnescape(r.URL.RawQuery) + span.SetTag("query", query) level.Info(s.logger).Log( - "msg", "sharded search query SearchMetrics", - "raw_query", r.URL.RawQuery, + "msg", "sharded search query request stats and SearchMetrics", + "query", query, + "duration_seconds", reqTime, + "request_throughput", throughput, + "total_requests", len(reqs), + "started_requests", startedReqs, + "cancelled_requests", cancelledReqs, + "finished_requests", overallResponse.finishedRequests, "inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks, "skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks, "inspectedBytes", overallResponse.resultsMetrics.InspectedBytes, @@ -242,8 +251,13 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { "skippedTraces", overallResponse.resultsMetrics.SkippedTraces, "totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes) - // Collect inspectedBytes metrics - metricInspectedBytes.Observe(float64(overallResponse.resultsMetrics.InspectedBytes)) + // all goroutines have finished, we can safely access searchResults fields directly now + span.SetTag("inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks) + span.SetTag("skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks) + span.SetTag("inspectedBytes", overallResponse.resultsMetrics.InspectedBytes) + span.SetTag("inspectedTraces", overallResponse.resultsMetrics.InspectedTraces) + span.SetTag("skippedTraces", overallResponse.resultsMetrics.SkippedTraces) + span.SetTag("totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes) if overallResponse.err != nil { return nil, overallResponse.err @@ -266,6 +280,15 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } + // only record metric when it's enabled and within slo + if s.sloCfg.DurationSLO != 0 && s.sloCfg.ThroughputBytesSLO != 0 { + if reqTime < s.sloCfg.DurationSLO || throughput > s.sloCfg.ThroughputBytesSLO { + // query is within SLO if query returned 200 within DurationSLO seconds OR + // processed ThroughputBytesSLO bytes/s data + sloSearchCounter.WithLabelValues(tenantID).Inc() + } + } + return &http.Response{ StatusCode: http.StatusOK, Header: http.Header{ diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index 6c1ada0133b..116faf97f9e 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -29,6 +29,11 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) +var testSLOcfg = SLOConfig{ + ThroughputBytesSLO: 0, + DurationSLO: 0, +} + // implements tempodb.Reader interface type mockReader struct { metas []*backend.BlockMeta @@ -551,7 +556,7 @@ func TestSearchSharderRoundTrip(t *testing.T) { }, o, SearchSharderConfig{ ConcurrentRequests: 1, // 1 concurrent request to force order TargetBytesPerRequest: defaultTargetBytesPerRequest, - }, log.NewNopLogger()) + }, testSLOcfg, log.NewNopLogger()) testRT := NewRoundTripper(next, sharder) req := httptest.NewRequest("GET", "/?start=1000&end=1500", nil) @@ -597,7 +602,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, MaxDuration: 5 * time.Minute, - }, log.NewNopLogger()) + }, testSLOcfg, log.NewNopLogger()) testRT := NewRoundTripper(next, sharder) // no org id @@ -626,7 +631,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, MaxDuration: 5 * time.Minute, - }, log.NewNopLogger()) + }, testSLOcfg, log.NewNopLogger()) testRT = NewRoundTripper(next, sharder) req = httptest.NewRequest("GET", "/?start=1000&end=1500", nil) diff --git a/modules/frontend/tracebyidsharding.go b/modules/frontend/tracebyidsharding.go index 3decb5a13cc..a5b14606b53 100644 --- a/modules/frontend/tracebyidsharding.go +++ b/modules/frontend/tracebyidsharding.go @@ -10,6 +10,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -27,11 +28,12 @@ const ( maxQueryShards = 256 ) -func newTraceByIDSharder(queryShards, maxFailedBlocks int, logger log.Logger) Middleware { +func newTraceByIDSharder(queryShards, maxFailedBlocks int, sloCfg SLOConfig, logger log.Logger) Middleware { return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { return shardQuery{ next: next, queryShards: queryShards, + sloCfg: sloCfg, logger: logger, blockBoundaries: createBlockBoundaries(queryShards - 1), // one shard will be used to query ingesters maxFailedBlocks: uint32(maxFailedBlocks), @@ -42,6 +44,7 @@ func newTraceByIDSharder(queryShards, maxFailedBlocks int, logger log.Logger) Mi type shardQuery struct { next http.RoundTripper queryShards int + sloCfg SLOConfig logger log.Logger blockBoundaries [][]byte maxFailedBlocks uint32 @@ -53,6 +56,16 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery") defer span.Finish() + tenantID, err := user.ExtractOrgID(ctx) + if err != nil { + return &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader(err.Error())), + }, nil + } + + reqStart := time.Now() + // context propagation r = r.WithContext(ctx) reqs, err := s.buildShardedRequests(r) @@ -146,6 +159,8 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) { } wg.Wait() + reqTime := time.Since(reqStart) + if overallError != nil { return nil, overallError } @@ -181,6 +196,15 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } + // only record metric when it's enabled and within slo + if s.sloCfg.DurationSLO != 0 { + if reqTime < s.sloCfg.DurationSLO { + // we are within SLO if query returned 200 within DurationSLO seconds + // TODO: we don't have throughput metrics for TraceByID. + sloTraceByIDCounter.WithLabelValues(tenantID).Inc() + } + } + return &http.Response{ StatusCode: http.StatusOK, Header: http.Header{ diff --git a/modules/frontend/tracebyidsharding_test.go b/modules/frontend/tracebyidsharding_test.go index 0693b52e44e..192efcb6279 100644 --- a/modules/frontend/tracebyidsharding_test.go +++ b/modules/frontend/tracebyidsharding_test.go @@ -264,7 +264,7 @@ func TestShardingWareDoRequest(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - sharder := newTraceByIDSharder(2, 2, log.NewNopLogger()) + sharder := newTraceByIDSharder(2, 2, testSLOcfg, log.NewNopLogger()) next := RoundTripperFunc(func(r *http.Request) (*http.Response, error) { var testTrace *tempopb.Trace