Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
electron0zero committed Feb 9, 2023
1 parent 8f69ea6 commit 249b145
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 115 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@
/cmd/tempo-serverless/vendor/
/pkg/traceql/y.output
private-key.key
image.sh
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [ENHANCEMENT] Add Throughput and SLO Metrics with SLOConfig in Query Frontend [#2008](https://github.com/grafana/tempo/pull/2008) (@electron0zero)
- **BREAKING CHANGE** `query_frontend_result_metrics_inspected_bytes` metric removed in favour of `query_frontend_bytes_processed_per_seconds`
* [FEATURE] Add flag to optionally enable all available Go runtime metrics [#2005](https://github.com/grafana/tempo/pull/2005) (@andreasgerstmayr)
* [BUGFIX] Suppress logspam in single binary mode when metrics generator is disabled. [#2058](https://github.com/grafana/tempo/pull/2058) (@joe-elliott)
* [BUGFIX] Error more gracefully while reading some blocks written by an interim commit between 1.5 and 2.0 [#2055](https://github.com/grafana/tempo/pull/2055) (@mdisibio)
Expand Down
20 changes: 6 additions & 14 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,12 @@ query_frontend:
[query_ingesters_until: <duration>]
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# tempo_query_frontend_search_queries_within_slo_total is incremented if query
# completes in duration less than value of duration_slo.
# This works in boolean OR with throughput_slo config,
# Query is within SLO if it returned within duration_slo seconds OR processed throughput_slo bytes/s data.
[duration_slo: <duration> | default = 5s ]
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[duration_slo: <duration> | default = 0s ]
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# tempo_query_frontend_search_queries_within_slo_total is incremented if query
# throughput (data processed per second) more than value of throughput_slo.
# This works in boolean OR with throughput_slo config,
# Query is within SLO if it returned within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_slo: <float> | default = 104857600 ]
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_slo: <float> | default = 0 ]
# Trace by ID lookup configuration
Expand All @@ -419,10 +413,8 @@ query_frontend:
[hedge_requests_up_to: <int> | default = 2 ]
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# tempo_query_frontend_tracebyid_queries_within_slo_total is incremented if query
# completes in duration less than value of duration_slo.
# Query is within SLO if it returned within duration_slo seconds.
[duration_slo: <duration> | default = 5s ]
# Query is within SLO if it returned 200 within duration_slo seconds.
[duration_slo: <duration> | default = 0s ]
```

## Querier
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type SLOConfig struct {

func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
slo := SLOConfig{
DurationSLO: 5 * time.Second,
ThroughputSLO: 100 * 1024 * 1024, // 100 MB
DurationSLO: 0,
ThroughputSLO: 0,
}

cfg.Config.MaxOutstandingPerTenant = 2000
Expand Down
4 changes: 0 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store stora
return nil, fmt.Errorf("frontend search target bytes per request should be greater than 0")
}

if cfg.Search.SLO.ThroughputSLO <= 0 || cfg.TraceByID.SLO.ThroughputSLO <= 0 {
return nil, fmt.Errorf("frontend search or trace by id throughput slo should be greater than 0")
}

if cfg.Search.Sharder.QueryIngestersUntil < cfg.Search.Sharder.QueryBackendAfter {
return nil, fmt.Errorf("query backend after should be less than or equal to query ingester until")
}
Expand Down
38 changes: 0 additions & 38 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,42 +133,4 @@ func TestFrontendBadConfigFails(t *testing.T) {
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)

f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: SLOConfig{
ThroughputSLO: -1,
DurationSLO: 1 * time.Second,
},
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: sloCfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search or trace by id throughput slo should be greater than 0")
assert.Nil(t, f)

f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: sloCfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: SLOConfig{
ThroughputSLO: -1,
DurationSLO: 1 * time.Second,
},
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search or trace by id throughput slo should be greater than 0")
assert.Nil(t, f)
}
56 changes: 24 additions & 32 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand All @@ -32,18 +31,23 @@ const (
)

var (
searchThroughput = promauto.NewHistogramVec(prometheus.HistogramOpts{
queryThroughput = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Name: "query_frontend_search_bytes_processed_per_seconds",
Help: "Bytes processed per second in the search query",
Name: "query_frontend_bytes_processed_per_seconds",
Help: "Bytes processed per second in the query per tenant",
Buckets: prometheus.ExponentialBuckets(1024*1024, 2, 10), // from 1MB up to 1GB
}, []string{"tenant", "status"})
}, []string{"tenant", "op"})

sloSearchPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})

sloQueriesPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_search_queries_within_slo_total",
Help: "Total search queries within SLO per tenant",
}, []string{"tenant", "status"})
Name: "query_frontend_queries_within_slo_total",
Help: "Total Queries within SLO per tenant",
}, []string{"tenant", "op"})

sloTraceByIDCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
sloSearchCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
)

type searchSharder struct {
Expand Down Expand Up @@ -226,9 +230,8 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
// print out request metrics
cancelledReqs := startedReqs - overallResponse.finishedRequests
reqTime := time.Since(reqStart)
// FIXME: InspectedBytes is 0 for TraceQL Search, SLO and throughput will be incorrect
throughput := float64(overallResponse.resultsMetrics.InspectedBytes) / reqTime.Seconds()
var statusCode int
searchThroughput.WithLabelValues(tenantID).Observe(throughput)

query, _ := url.PathUnescape(r.URL.RawQuery)
span.SetTag("query", query)
Expand Down Expand Up @@ -261,19 +264,15 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
"totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)

if overallResponse.err != nil {
statusCode = overallResponse.statusCode
s.recordMetrics(tenantID, statusCode, throughput, reqTime)
return nil, overallResponse.err
}

if overallResponse.statusCode != http.StatusOK {
// translate all non-200s into 500s. if, for instance, we get a 400 back from an internal component
// it means that we created a bad request. 400 should not be propagated back to the user b/c
// the bad request was due to a bug on our side, so return 500 instead.
statusCode = http.StatusInternalServerError
s.recordMetrics(tenantID, statusCode, throughput, reqTime)
return &http.Response{
StatusCode: statusCode,
StatusCode: http.StatusInternalServerError,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(overallResponse.statusMsg)),
}, nil
Expand All @@ -282,15 +281,19 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
m := &jsonpb.Marshaler{}
bodyString, err := m.MarshalToString(overallResponse.result())
if err != nil {
statusCode = http.StatusInternalServerError
s.recordMetrics(tenantID, statusCode, throughput, reqTime)
return nil, err
}

statusCode = http.StatusOK
s.recordMetrics(tenantID, statusCode, throughput, reqTime)
// only record metric when it's enabled and within slo
if s.sloCfg.DurationSLO != 0 && s.sloCfg.ThroughputSLO != 0 {
if reqTime < s.sloCfg.DurationSLO || throughput > s.sloCfg.ThroughputSLO {
// query is within SLO if query returned 200 within DurationSLO seconds OR processed ThroughputSLO bytes/s data
sloSearchCounter.WithLabelValues(tenantID).Inc()
}
}

return &http.Response{
StatusCode: statusCode,
StatusCode: http.StatusOK,
Header: http.Header{
api.HeaderContentType: {api.HeaderAcceptJSON},
},
Expand All @@ -299,17 +302,6 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

// recordSearchMetrics records throughput and SLO metrics
func (s *searchSharder) recordMetrics(tenantID string, status int, throughput float64, reqTime time.Duration) {
sts := strconv.Itoa(status)
searchThroughput.WithLabelValues(tenantID, sts).Observe(throughput)

// query is within SLO if query returned within DurationSLO seconds OR processed ThroughputSLO bytes/s data
if reqTime < s.sloCfg.DurationSLO || throughput > s.sloCfg.ThroughputSLO {
sloSearchPerTenant.WithLabelValues(tenantID, sts).Inc()
}
}

// blockMetas returns all relevant blockMetas given a start/end
func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
// reduce metas to those in the requested range
Expand Down
36 changes: 12 additions & 24 deletions modules/frontend/tracebyidsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -21,8 +20,6 @@ import (
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
)

Expand All @@ -31,14 +28,6 @@ const (
maxQueryShards = 256
)

var (
sloTraceByIDPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_tracebyid_queries_within_slo_total",
Help: "Total TraceByID Search queries within SLO per tenant",
}, []string{"tenant", "status"})
)

func newTraceByIDSharder(queryShards, maxFailedBlocks int, sloCfg SLOConfig, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return shardQuery{
Expand Down Expand Up @@ -173,7 +162,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
reqTime := time.Since(reqStart) // time request

if overallError != nil {
s.recordMetrics(tenantID, statusCode, reqTime)
// s.recordMetrics(tenantID, statusCode, reqTime)
return nil, overallError
}

Expand All @@ -189,7 +178,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
if statusCode != http.StatusNotFound {
statusCode = 500
}
s.recordMetrics(tenantID, statusCode, reqTime)
// s.recordMetrics(tenantID, statusCode, reqTime)

return &http.Response{
StatusCode: statusCode,
Expand All @@ -206,11 +195,19 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "error marshalling response to proto", "err", err)
s.recordMetrics(tenantID, statusCode, reqTime)
// s.recordMetrics(tenantID, statusCode, reqTime)
return nil, err
}

s.recordMetrics(tenantID, statusCode, reqTime)
// only record metric when it's enabled and within slo
if s.sloCfg.DurationSLO != 0 {
if reqTime < s.sloCfg.DurationSLO {
// we are within SLO if query returned 200 within DurationSLO seconds
// TODO: we don't have throughput metrics for TraceByID.
sloTraceByIDCounter.WithLabelValues(tenantID).Inc()
}
}

return &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{
Expand All @@ -221,15 +218,6 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

// recordTraceByIDMetrics records throughput and SLO metrics
func (s shardQuery) recordMetrics(tenantID string, status int, reqTime time.Duration) {
// we are within SLO if query returned within DurationSLO seconds
// TODO: we don't have throughput metrics for TraceByID yet
if reqTime < s.sloCfg.DurationSLO {
sloTraceByIDPerTenant.WithLabelValues(tenantID, strconv.Itoa(status)).Inc()
}
}

// buildShardedRequests returns a slice of requests sharded on the precalculated
// block boundaries
func (s *shardQuery) buildShardedRequests(parent *http.Request) ([]*http.Request, error) {
Expand Down

0 comments on commit 249b145

Please sign in to comment.