Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receive] Export metrics about remote write requests per tenant #5424

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4e0a874
Add write metrics to Thanos Receive
douglascamata Jun 13, 2022
81d03c3
Let the middleware count inflight HTTP requests
douglascamata Jun 14, 2022
8878786
Update Receive write metrics type & definition
douglascamata Jun 15, 2022
48d7ef0
Put option back in its place to avoid big diff
douglascamata Jun 15, 2022
d33957f
Fetch tenant from headers instead of context
douglascamata Jun 15, 2022
33f4c6a
Delete unnecessary tenant parser middleware
douglascamata Jun 15, 2022
3254efa
Refactor & reuse code for HTTP instrumentation
douglascamata Jun 17, 2022
7f1cecb
Add missing copyright to some files
douglascamata Jun 17, 2022
51b6d21
Add changelog entry for Receive & new HTTP metrics
douglascamata Jun 20, 2022
fe750d1
Merge branch 'main' of https://github.com/thanos-io/thanos into dcama…
douglascamata Jun 20, 2022
643ffb9
Remove TODO added by accident
douglascamata Jun 20, 2022
607c4e9
Make error handling code shorter
douglascamata Jun 21, 2022
95a7a37
Make switch statement simpler
douglascamata Jun 21, 2022
3b3592b
Remove method label from timeseries' metrics
douglascamata Jun 21, 2022
2dd4592
Count samples of all series instead of each
douglascamata Jun 21, 2022
0141ea9
Remove in-flight requests metric
douglascamata Jun 21, 2022
493ede8
Change timeseries/samples metrics to histograms
douglascamata Jun 21, 2022
26600c1
Merge branch 'main' of https://github.com/thanos-io/thanos into dcama…
douglascamata Jun 21, 2022
c4662b5
Fix Prometheus registry for histograms
douglascamata Jun 21, 2022
77f7bf7
Fix comment in NewHandler functions
douglascamata Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb.
- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: export metrics regarding size of remote write requests

### Changed

Expand Down
83 changes: 27 additions & 56 deletions pkg/extprom/http/instrument_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/uber/jaeger-client-go"
)
Expand All @@ -37,81 +36,44 @@ func NewNopInstrumentationMiddleware() InstrumentationMiddleware {
}

type defaultInstrumentationMiddleware struct {
requestDuration *prometheus.HistogramVec
requestSize *prometheus.SummaryVec
requestsTotal *prometheus.CounterVec
responseSize *prometheus.SummaryVec
metrics *defaultMetrics
}

// NewInstrumentationMiddleware provides default InstrumentationMiddleware.
// Passing nil as buckets uses the default buckets.
func NewInstrumentationMiddleware(reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
if buckets == nil {
buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}
return &defaultInstrumentationMiddleware{
metrics: newDefaultMetrics(reg, buckets, []string{}),
}

ins := defaultInstrumentationMiddleware{
requestDuration: promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Tracks the latencies for HTTP requests.",
Buckets: buckets,
},
[]string{"code", "handler", "method"},
),

requestSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "Tracks the size of HTTP requests.",
},
[]string{"code", "handler", "method"},
),

requestsTotal: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Tracks the number of HTTP requests.",
}, []string{"code", "handler", "method"},
),

responseSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_response_size_bytes",
Help: "Tracks the size of HTTP responses.",
},
[]string{"code", "handler", "method"},
),
}
return &ins
}

// NewHandler wraps the given HTTP handler for instrumentation. It
// registers four metric collectors (if not already done) and reports HTTP
// registers five metric collectors (if not already done) and reports HTTP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind in-flight? It's usually misleading metrics given 15s interval and requests finishing sooner.. but maybe we can try having it to double check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka my attention adding the in-flight is to allow users of Thanos Receive to get better insights in situations of high load, where the number of concurrent requests will be relevant. Also, later on there will be a "metered gate" to limit the amount of in-flight requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually quite useful to know given atm there is no notion of enqueued requests in receiver. This is particularly useful for troubleshooting issues with load/receiver getting requests that it is not completing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moadz FYI I am leaving the in-flight requests metric for a follow up PR to limit the scope of this one to Thanos Receive.

Copy link
Contributor Author

@douglascamata douglascamata Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split PR with in-flight requests metrics: #5440 .

// metrics to the (newly or already) registered collectors: http_requests_total
// (CounterVec), http_request_duration_seconds (Histogram),
// http_request_size_bytes (Summary), http_response_size_bytes (Summary). Each
// has a constant label named "handler" with the provided handlerName as
// value. http_requests_total is a metric vector partitioned by HTTP method
// (label name "method") and HTTP status code (label name "code").
// http_request_size_bytes (Summary), http_response_size_bytes (Summary),
// http_inflight_requests (Gauge). Each has a constant label named "handler"
// with the provided handlerName as value.
func (ins *defaultInstrumentationMiddleware) NewHandler(handlerName string, handler http.Handler) http.HandlerFunc {
baseLabels := prometheus.Labels{"handler": handlerName}
return httpInstrumentationHandler(baseLabels, ins.metrics, handler)
}

func httpInstrumentationHandler(baseLabels prometheus.Labels, metrics *defaultMetrics, next http.Handler) http.HandlerFunc {
return promhttp.InstrumentHandlerRequestSize(
ins.requestSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.requestSize.MustCurryWith(baseLabels),
promhttp.InstrumentHandlerCounter(
ins.requestsTotal.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.requestsTotal.MustCurryWith(baseLabels),
promhttp.InstrumentHandlerResponseSize(
ins.responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.responseSize.MustCurryWith(baseLabels),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()

wd := &responseWriterDelegator{w: w}
handler.ServeHTTP(wd, r)
next.ServeHTTP(wd, r)

observer := ins.requestDuration.WithLabelValues(
wd.Status(),
handlerName,
strings.ToLower(r.Method),
)
requestLabels := prometheus.Labels{"code": wd.Status(), "method": strings.ToLower(r.Method)}
observer := metrics.requestDuration.MustCurryWith(baseLabels).With(requestLabels)
observer.Observe(time.Since(now).Seconds())

// If we find a tracingID we'll expose it as Exemplar.
Expand Down Expand Up @@ -164,3 +126,12 @@ func (wd *responseWriterDelegator) StatusCode() int {
func (wd *responseWriterDelegator) Status() string {
return fmt.Sprintf("%d", wd.StatusCode())
}

// NewInstrumentHandlerInflightTenant creates a middleware used to export the current amount of concurrent requests
// being handled. It has an optional tenant label whenever a tenant is present in the context.
func NewInstrumentHandlerInflightTenant(gaugeVec *prometheus.GaugeVec, tenantHeader string, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(tenantHeader)
promhttp.InstrumentHandlerInFlight(gaugeVec.With(prometheus.Labels{"tenant": tenant}), next).ServeHTTP(w, r)
}
}
42 changes: 42 additions & 0 deletions pkg/extprom/http/instrument_tenant_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package http

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
)

type tenantInstrumentationMiddleware struct {
metrics *defaultMetrics
tenantHeaderName string
}

// NewTenantInstrumentationMiddleware provides the same instrumentation as defaultInstrumentationMiddleware,
// but with a tenant label fetched from the given tenantHeaderName header.
// Passing nil as buckets uses the default buckets.
func NewTenantInstrumentationMiddleware(tenantHeaderName string, reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
return &tenantInstrumentationMiddleware{
tenantHeaderName: tenantHeaderName,
metrics: newDefaultMetrics(reg, buckets, []string{"tenant"}),
}
}

// NewHandler wraps the given HTTP handler for instrumentation. It
// registers five metric collectors (if not already done) and reports HTTP
// metrics to the (newly or already) registered collectors: http_requests_total
// (CounterVec), http_request_duration_seconds (Histogram),
// http_request_size_bytes (Summary), http_response_size_bytes (Summary),
// http_inflight_requests (Gauge). Each has a constant label named "handler"
// with the provided handlerName as value.
func (ins *tenantInstrumentationMiddleware) NewHandler(handlerName string, next http.Handler) http.HandlerFunc {
tenantWrapper := func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(ins.tenantHeaderName)
baseLabels := prometheus.Labels{"handler": handlerName, "tenant": tenant}
handlerStack := httpInstrumentationHandler(baseLabels, ins.metrics, next)
handlerStack.ServeHTTP(w, r)
}
return tenantWrapper
}
54 changes: 54 additions & 0 deletions pkg/extprom/http/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package http

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type defaultMetrics struct {
requestDuration *prometheus.HistogramVec
requestSize *prometheus.SummaryVec
requestsTotal *prometheus.CounterVec
responseSize *prometheus.SummaryVec
}

func newDefaultMetrics(reg prometheus.Registerer, buckets []float64, extraLabels []string) *defaultMetrics {
if buckets == nil {
buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}
}

return &defaultMetrics{
requestDuration: promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Tracks the latencies for HTTP requests.",
Buckets: buckets,
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
requestSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "Tracks the size of HTTP requests.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
requestsTotal: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Tracks the number of HTTP requests.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
responseSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_response_size_bytes",
Help: "Tracks the size of HTTP responses.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
}
}
76 changes: 57 additions & 19 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Handler struct {
forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand Down Expand Up @@ -159,6 +162,24 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The number of times to replicate incoming write requests.",
},
),
writeTimeseriesTotal: promauto.With(o.Registry).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "write_timeseries",
Help: "The number of timeseries received in the incoming write requests.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
writeSamplesTotal: promauto.With(o.Registry).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "write_samples",
Help: "The number of sampled received in the incoming write requests.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand All @@ -174,21 +195,34 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

ins := extpromhttp.NewNopInstrumentationMiddleware()
if o.Registry != nil {
ins = extpromhttp.NewInstrumentationMiddleware(o.Registry,
ins = extpromhttp.NewTenantInstrumentationMiddleware(
o.TenantHeader,
o.Registry,
[]float64{0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.25, 0.5, 0.75, 1, 2, 3, 4, 5},
)
}

readyf := h.testReady
instrf := func(name string, next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
next = ins.NewHandler(name, http.HandlerFunc(next))

if o.Tracer != nil {
next = tracing.HTTPMiddleware(o.Tracer, name, logger, http.HandlerFunc(next))
}
return next
}

h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP)))))
h.router.Post(
"/api/v1/receive",
instrf(
"receive",
readyf(
middleware.RequestID(
http.HandlerFunc(h.receiveHTTP),
),
),
),
)

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Expand Down Expand Up @@ -409,26 +443,30 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
if err != nil {
responseStatusCode := http.StatusOK
if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err)
switch determineWriteErrorCause(err, 1) {
case errNotReady:
responseStatusCode = http.StatusServiceUnavailable
case errUnavailable:
responseStatusCode = http.StatusServiceUnavailable
case errConflict:
responseStatusCode = http.StatusConflict
case errBadReplica:
responseStatusCode = http.StatusBadRequest
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
responseStatusCode = http.StatusInternalServerError
}
http.Error(w, err.Error(), responseStatusCode)
}

switch determineWriteErrorCause(err, 1) {
case nil:
return
case errNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errUnavailable:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errConflict:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries)))
totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

// forward accepts a write request, batches its time series by
Expand Down