Skip to content

Commit

Permalink
Add Throughput and SLO Metrics with SLOConfig in Query Frontend (#2008)
Browse files Browse the repository at this point in the history
* Collect inspectedBytes from SearchMetrics

* rename metric

* Add throughput and request duration metric

* Add slo counter

* add log lines

* Add SLO metric for tracebyid path

* Remove queriesPerTenant from QueryFrontend

* cleanup

* Add SLOConfig and SLOConfig Docs

* Update CHANGELOG.md

* Add frontend config test and fix search test

* log url decoded query

* Apply suggestions from code review

Co-authored-by: Kim Nylander <[email protected]>

* Address review comments

* Default SLO config in tests

* cleanup

* address review comments

* Address review comments

---------

Co-authored-by: Kim Nylander <[email protected]>
  • Loading branch information
electron0zero and knylander-grafana authored Mar 28, 2023
1 parent 00ae6e0 commit 415afbf
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
/tempodb/encoding/benchmark_block
/cmd/tempo-serverless/vendor/
/pkg/traceql/y.output
private-key.key
private-key.key
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## main / unreleased

* [FEATURE] Add support for Azure Workload Identity authentication [#2195](https://github.com/grafana/tempo/pull/2195) (@LambArchie)
* [ENHANCEMENT] Add Throughput and SLO Metrics with SLOConfig in Query Frontend [#2008](https://github.com/grafana/tempo/pull/2008) (@electron0zero)
- **BREAKING CHANGE** `query_frontend_result_metrics_inspected_bytes` metric removed in favour of `query_frontend_bytes_processed_per_second`
* [FEATURE] Add flag to check configuration [#2131](https://github.com/grafana/tempo/issues/2131) (@robertscherbarth @agrib-01)
* [FEATURE] Add flag to optionally enable all available Go runtime metrics [#2005](https://github.com/grafana/tempo/pull/2005) (@andreasgerstmayr)
* [FEATURE] Add support for span `kind` to TraceQL [#2217](https://github.com/grafana/tempo/pull/2217) (@joe-elliott)
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,16 @@ query_frontend:
# (default: 30m)
[query_ingesters_until: <duration>]
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
# NOTE: `duration_slo` and `throughput_bytes_slo` both must be configured for it to work
[duration_slo: <duration> | default = 0s ]

# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]


# Trace by ID lookup configuration
trace_by_id:
# The number of shards to split a trace by id query into.
Expand All @@ -413,6 +423,10 @@ query_frontend:
# The maximum number of requests to execute when hedging.
# Requires hedge_requests_at to be set. Must be greater than 0.
[hedge_requests_up_to: <int> | default = 2 ]

# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds.
[duration_slo: <duration> | default = 0s ]
```
## Querier
Expand Down
17 changes: 15 additions & 2 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package frontend

import (
"flag"
"time"

"net/http"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,19 +27,31 @@ type Config struct {

type SearchConfig struct {
Sharder SearchSharderConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
}

type TraceByIDConfig struct {
QueryShards int `yaml:"query_shards,omitempty"`
Hedging HedgingConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
}

type HedgingConfig struct {
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"`
}

type SLOConfig struct {
DurationSLO time.Duration `yaml:"duration_slo,omitempty"`
ThroughputBytesSLO float64 `yaml:"throughput_bytes_slo,omitempty"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
slo := SLOConfig{
DurationSLO: 0,
ThroughputBytesSLO: 0,
}

cfg.Config.MaxOutstandingPerTenant = 2000
cfg.MaxRetries = 2
cfg.TolerateFailedBlocks = 0
Expand All @@ -54,9 +65,11 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: slo,
}
cfg.TraceByID = TraceByIDConfig{
QueryShards: 50,
SLO: slo,
Hedging: HedgingConfig{
HedgeRequestsAt: 2 * time.Second,
HedgeRequestsUpTo: 2,
Expand Down
22 changes: 8 additions & 14 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
type QueryFrontend struct {
TraceByID, Search http.Handler
logger log.Logger
queriesPerTenant *prometheus.CounterVec
store storage.Store
}

Expand Down Expand Up @@ -70,21 +69,16 @@ func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store stora
traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, store, logger), retryWare)

traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{
"op": traceByIDOp,
})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{
"op": searchOp,
})
traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})

traces := traceByIDMiddleware.Wrap(next)
search := searchMiddleware.Wrap(next)
return &QueryFrontend{
TraceByID: newHandler(traces, traceByIDCounter, logger),
Search: newHandler(search, searchCounter, logger),
logger: logger,
queriesPerTenant: queriesPerTenant,
store: store,
TraceByID: newHandler(traces, traceByIDCounter, logger),
Search: newHandler(search, searchCounter, logger),
logger: logger,
store: store,
}, nil
}

Expand All @@ -98,7 +92,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
rt := NewRoundTripper(
next,
newDeduper(logger),
newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, logger),
newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, cfg.TraceByID.SLO, logger),
newHedgedRequestWare(cfg.TraceByID.Hedging),
)

Expand Down Expand Up @@ -181,7 +175,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
func newSearchMiddleware(cfg Config, o *overrides.Overrides, reader tempodb.Reader, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
ingesterSearchRT := next
backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, logger))
backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, cfg.Search.SLO, logger))

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// backend search queries require sharding so we pass through a special roundtripper
Expand Down
11 changes: 11 additions & 0 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
f, err := New(Config{
TraceByID: TraceByIDConfig{
QueryShards: minQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, next, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)
Expand All @@ -55,6 +57,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
Expand All @@ -63,12 +66,14 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards + 1,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
Expand All @@ -77,12 +82,14 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: 0,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
Expand All @@ -91,12 +98,14 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: 0,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
Expand All @@ -105,6 +114,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
Expand All @@ -113,6 +123,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
QueryIngestersUntil: time.Minute,
QueryBackendAfter: time.Hour,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
Expand Down
77 changes: 50 additions & 27 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -30,12 +31,23 @@ const (
)

var (
metricInspectedBytes = promauto.NewHistogram(prometheus.HistogramOpts{
queryThroughput = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Name: "query_frontend_result_metrics_inspected_bytes",
Help: "Inspected Bytes in a search query",
Name: "query_frontend_bytes_processed_per_second",
Help: "Bytes processed per second in the query per tenant",
Buckets: prometheus.ExponentialBuckets(1024*1024, 2, 10), // from 1MB up to 1GB
})
}, []string{"tenant", "op"})

searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})

sloQueriesPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_queries_within_slo_total",
Help: "Total Queries within SLO per tenant",
}, []string{"tenant", "op"})

sloTraceByIDCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
sloSearchCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
)

type searchSharder struct {
Expand All @@ -44,6 +56,7 @@ type searchSharder struct {
overrides *overrides.Overrides

cfg SearchSharderConfig
sloCfg SLOConfig
logger log.Logger
}

Expand All @@ -58,14 +71,15 @@ type SearchSharderConfig struct {
}

// newSearchSharder creates a sharding middleware for search
func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, logger log.Logger) Middleware {
func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, sloCfg SLOConfig, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return searchSharder{
next: next,
reader: reader,
overrides: o,
logger: logger,
cfg: cfg,
sloCfg: sloCfg,
logger: logger,
}
})
}
Expand All @@ -78,8 +92,6 @@ func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchS
// start=<unix epoch seconds>
// end=<unix epoch seconds>
func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
// create search related query metrics here??
// this is the place where we agg search response.
searchReq, err := api.ParseSearchRequest(r)
if err != nil {
return &http.Response{
Expand All @@ -102,6 +114,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardSearch")
defer span.Finish()

reqStart := time.Now()
// sub context to cancel in-progress sub requests
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
Expand Down Expand Up @@ -216,34 +229,35 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// print out request metrics
cancelledReqs := startedReqs - overallResponse.finishedRequests
level.Info(s.logger).Log(
"msg", "sharded search query request stats",
"raw_query", r.URL.RawQuery,
"total", len(reqs),
"started", startedReqs,
"finished", overallResponse.finishedRequests,
"cancelled", cancelledReqs)

// all goroutines have finished, we can safely access searchResults fields directly now
span.SetTag("inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks)
span.SetTag("skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks)
span.SetTag("inspectedBytes", overallResponse.resultsMetrics.InspectedBytes)
span.SetTag("inspectedTraces", overallResponse.resultsMetrics.InspectedTraces)
span.SetTag("skippedTraces", overallResponse.resultsMetrics.SkippedTraces)
span.SetTag("totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)
reqTime := time.Since(reqStart)
throughput := float64(overallResponse.resultsMetrics.InspectedBytes) / reqTime.Seconds()
searchThroughput.WithLabelValues(tenantID).Observe(throughput)

query, _ := url.PathUnescape(r.URL.RawQuery)
span.SetTag("query", query)
level.Info(s.logger).Log(
"msg", "sharded search query SearchMetrics",
"raw_query", r.URL.RawQuery,
"msg", "sharded search query request stats and SearchMetrics",
"query", query,
"duration_seconds", reqTime,
"request_throughput", throughput,
"total_requests", len(reqs),
"started_requests", startedReqs,
"cancelled_requests", cancelledReqs,
"finished_requests", overallResponse.finishedRequests,
"inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks,
"skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks,
"inspectedBytes", overallResponse.resultsMetrics.InspectedBytes,
"inspectedTraces", overallResponse.resultsMetrics.InspectedTraces,
"skippedTraces", overallResponse.resultsMetrics.SkippedTraces,
"totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)

// Collect inspectedBytes metrics
metricInspectedBytes.Observe(float64(overallResponse.resultsMetrics.InspectedBytes))
// all goroutines have finished, we can safely access searchResults fields directly now
span.SetTag("inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks)
span.SetTag("skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks)
span.SetTag("inspectedBytes", overallResponse.resultsMetrics.InspectedBytes)
span.SetTag("inspectedTraces", overallResponse.resultsMetrics.InspectedTraces)
span.SetTag("skippedTraces", overallResponse.resultsMetrics.SkippedTraces)
span.SetTag("totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)

if overallResponse.err != nil {
return nil, overallResponse.err
Expand All @@ -266,6 +280,15 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, err
}

// only record metric when it's enabled and within slo
if s.sloCfg.DurationSLO != 0 && s.sloCfg.ThroughputBytesSLO != 0 {
if reqTime < s.sloCfg.DurationSLO || throughput > s.sloCfg.ThroughputBytesSLO {
// query is within SLO if query returned 200 within DurationSLO seconds OR
// processed ThroughputBytesSLO bytes/s data
sloSearchCounter.WithLabelValues(tenantID).Inc()
}
}

return &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{
Expand Down
Loading

0 comments on commit 415afbf

Please sign in to comment.