Skip to content

Commit

Permalink
[receive] Export metrics about remote write requests per tenant (#5424)
Browse files Browse the repository at this point in the history
* Add write metrics to Thanos Receive

Signed-off-by: Douglas Camata <[email protected]>

* Let the middleware count inflight HTTP requests

Signed-off-by: Douglas Camata <[email protected]>

* Update Receive write metrics type & definition

Signed-off-by: Douglas Camata <[email protected]>

* Put option back in its place to avoid big diff

Signed-off-by: Douglas Camata <[email protected]>

* Fetch tenant from headers instead of context

It might not be in the context in some cases.

Signed-off-by: Douglas Camata <[email protected]>

* Delete unnecessary tenant parser middleware

Signed-off-by: Douglas Camata <[email protected]>

* Refactor & reuse code for HTTP instrumentation

Signed-off-by: Douglas Camata <[email protected]>

* Add missing copyright to some files

Signed-off-by: Douglas Camata <[email protected]>

* Add changelog entry for Receive & new HTTP metrics

Signed-off-by: Douglas Camata <[email protected]>

* Remove TODO added by accident

Signed-off-by: Douglas Camata <[email protected]>

* Make error handling code shorter

Co-authored-by: Bartlomiej Plotka <[email protected]>
Signed-off-by: Douglas Camata <[email protected]>

* Make switch statement simpler

Signed-off-by: Douglas Camata <[email protected]>

* Remove method label from timeseries' metrics

Signed-off-by: Douglas Camata <[email protected]>

* Count samples of all series instead of each

Signed-off-by: Douglas Camata <[email protected]>

* Remove in-flight requests metric

Will add this in a follow-up PR to keep this small.

Signed-off-by: Douglas Camata <[email protected]>

* Change timeseries/samples metrics to histograms

The buckets were picked based on the fact that Prometheus' default
remote write configuration
(see https://prometheus.io/docs/practices/remote_write/#memory-usage)
set a max of 500 samples sent per second.

Signed-off-by: Douglas Camata <[email protected]>

* Fix Prometheus registry for histograms

Signed-off-by: Douglas Camata <[email protected]>

* Fix comment in NewHandler functions

There are now four metrics instead of five.

Signed-off-by: Douglas Camata <[email protected]>

Co-authored-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
douglascamata and bwplotka authored Jun 23, 2022
1 parent ce84ec5 commit a0f4181
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb.
- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: export metrics regarding size of remote write requests

### Changed

Expand Down
80 changes: 25 additions & 55 deletions pkg/extprom/http/instrument_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/uber/jaeger-client-go"
)
Expand All @@ -37,81 +36,43 @@ func NewNopInstrumentationMiddleware() InstrumentationMiddleware {
}

type defaultInstrumentationMiddleware struct {
requestDuration *prometheus.HistogramVec
requestSize *prometheus.SummaryVec
requestsTotal *prometheus.CounterVec
responseSize *prometheus.SummaryVec
metrics *defaultMetrics
}

// NewInstrumentationMiddleware provides default InstrumentationMiddleware.
// Passing nil as buckets uses the default buckets.
func NewInstrumentationMiddleware(reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
if buckets == nil {
buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}
return &defaultInstrumentationMiddleware{
metrics: newDefaultMetrics(reg, buckets, []string{}),
}

ins := defaultInstrumentationMiddleware{
requestDuration: promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Tracks the latencies for HTTP requests.",
Buckets: buckets,
},
[]string{"code", "handler", "method"},
),

requestSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "Tracks the size of HTTP requests.",
},
[]string{"code", "handler", "method"},
),

requestsTotal: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Tracks the number of HTTP requests.",
}, []string{"code", "handler", "method"},
),

responseSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_response_size_bytes",
Help: "Tracks the size of HTTP responses.",
},
[]string{"code", "handler", "method"},
),
}
return &ins
}

// NewHandler wraps the given HTTP handler for instrumentation. It
// registers four metric collectors (if not already done) and reports HTTP
// metrics to the (newly or already) registered collectors: http_requests_total
// (CounterVec), http_request_duration_seconds (Histogram),
// http_request_size_bytes (Summary), http_response_size_bytes (Summary). Each
// has a constant label named "handler" with the provided handlerName as
// value. http_requests_total is a metric vector partitioned by HTTP method
// (label name "method") and HTTP status code (label name "code").
// http_request_size_bytes (Summary), http_response_size_bytes (Summary).
// Each has a constant label named "handler" with the provided handlerName as value.
func (ins *defaultInstrumentationMiddleware) NewHandler(handlerName string, handler http.Handler) http.HandlerFunc {
baseLabels := prometheus.Labels{"handler": handlerName}
return httpInstrumentationHandler(baseLabels, ins.metrics, handler)
}

func httpInstrumentationHandler(baseLabels prometheus.Labels, metrics *defaultMetrics, next http.Handler) http.HandlerFunc {
return promhttp.InstrumentHandlerRequestSize(
ins.requestSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.requestSize.MustCurryWith(baseLabels),
promhttp.InstrumentHandlerCounter(
ins.requestsTotal.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.requestsTotal.MustCurryWith(baseLabels),
promhttp.InstrumentHandlerResponseSize(
ins.responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}),
metrics.responseSize.MustCurryWith(baseLabels),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()

wd := &responseWriterDelegator{w: w}
handler.ServeHTTP(wd, r)
next.ServeHTTP(wd, r)

observer := ins.requestDuration.WithLabelValues(
wd.Status(),
handlerName,
strings.ToLower(r.Method),
)
requestLabels := prometheus.Labels{"code": wd.Status(), "method": strings.ToLower(r.Method)}
observer := metrics.requestDuration.MustCurryWith(baseLabels).With(requestLabels)
observer.Observe(time.Since(now).Seconds())

// If we find a tracingID we'll expose it as Exemplar.
Expand Down Expand Up @@ -164,3 +125,12 @@ func (wd *responseWriterDelegator) StatusCode() int {
func (wd *responseWriterDelegator) Status() string {
return fmt.Sprintf("%d", wd.StatusCode())
}

// NewInstrumentHandlerInflightTenant creates a middleware used to export the current amount of concurrent requests
// being handled. It has an optional tenant label whenever a tenant is present in the context.
func NewInstrumentHandlerInflightTenant(gaugeVec *prometheus.GaugeVec, tenantHeader string, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(tenantHeader)
promhttp.InstrumentHandlerInFlight(gaugeVec.With(prometheus.Labels{"tenant": tenant}), next).ServeHTTP(w, r)
}
}
41 changes: 41 additions & 0 deletions pkg/extprom/http/instrument_tenant_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package http

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
)

type tenantInstrumentationMiddleware struct {
metrics *defaultMetrics
tenantHeaderName string
}

// NewTenantInstrumentationMiddleware provides the same instrumentation as defaultInstrumentationMiddleware,
// but with a tenant label fetched from the given tenantHeaderName header.
// Passing nil as buckets uses the default buckets.
func NewTenantInstrumentationMiddleware(tenantHeaderName string, reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
return &tenantInstrumentationMiddleware{
tenantHeaderName: tenantHeaderName,
metrics: newDefaultMetrics(reg, buckets, []string{"tenant"}),
}
}

// NewHandler wraps the given HTTP handler for instrumentation. It
// registers four metric collectors (if not already done) and reports HTTP
// metrics to the (newly or already) registered collectors: http_requests_total
// (CounterVec), http_request_duration_seconds (Histogram),
// http_request_size_bytes (Summary), http_response_size_bytes (Summary).
// Each has a constant label named "handler" with the provided handlerName as value.
func (ins *tenantInstrumentationMiddleware) NewHandler(handlerName string, next http.Handler) http.HandlerFunc {
tenantWrapper := func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(ins.tenantHeaderName)
baseLabels := prometheus.Labels{"handler": handlerName, "tenant": tenant}
handlerStack := httpInstrumentationHandler(baseLabels, ins.metrics, next)
handlerStack.ServeHTTP(w, r)
}
return tenantWrapper
}
54 changes: 54 additions & 0 deletions pkg/extprom/http/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package http

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type defaultMetrics struct {
requestDuration *prometheus.HistogramVec
requestSize *prometheus.SummaryVec
requestsTotal *prometheus.CounterVec
responseSize *prometheus.SummaryVec
}

func newDefaultMetrics(reg prometheus.Registerer, buckets []float64, extraLabels []string) *defaultMetrics {
if buckets == nil {
buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}
}

return &defaultMetrics{
requestDuration: promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Tracks the latencies for HTTP requests.",
Buckets: buckets,
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
requestSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_request_size_bytes",
Help: "Tracks the size of HTTP requests.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
requestsTotal: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Tracks the number of HTTP requests.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
responseSize: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Name: "http_response_size_bytes",
Help: "Tracks the size of HTTP responses.",
},
append([]string{"code", "handler", "method"}, extraLabels...),
),
}
}
76 changes: 57 additions & 19 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Handler struct {
forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand Down Expand Up @@ -159,6 +162,24 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The number of times to replicate incoming write requests.",
},
),
writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "write_timeseries",
Help: "The number of timeseries received in the incoming write requests.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
writeSamplesTotal: promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "write_samples",
Help: "The number of sampled received in the incoming write requests.",
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand All @@ -174,21 +195,34 @@ func NewHandler(logger log.Logger, o *Options) *Handler {

ins := extpromhttp.NewNopInstrumentationMiddleware()
if o.Registry != nil {
ins = extpromhttp.NewInstrumentationMiddleware(o.Registry,
ins = extpromhttp.NewTenantInstrumentationMiddleware(
o.TenantHeader,
o.Registry,
[]float64{0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.25, 0.5, 0.75, 1, 2, 3, 4, 5},
)
}

readyf := h.testReady
instrf := func(name string, next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
next = ins.NewHandler(name, http.HandlerFunc(next))

if o.Tracer != nil {
next = tracing.HTTPMiddleware(o.Tracer, name, logger, http.HandlerFunc(next))
}
return next
}

h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP)))))
h.router.Post(
"/api/v1/receive",
instrf(
"receive",
readyf(
middleware.RequestID(
http.HandlerFunc(h.receiveHTTP),
),
),
),
)

statusAPI := statusapi.New(statusapi.Options{
GetStats: h.getStats,
Expand Down Expand Up @@ -409,26 +443,30 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
if err != nil {
responseStatusCode := http.StatusOK
if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err)
switch determineWriteErrorCause(err, 1) {
case errNotReady:
responseStatusCode = http.StatusServiceUnavailable
case errUnavailable:
responseStatusCode = http.StatusServiceUnavailable
case errConflict:
responseStatusCode = http.StatusConflict
case errBadReplica:
responseStatusCode = http.StatusBadRequest
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
responseStatusCode = http.StatusInternalServerError
}
http.Error(w, err.Error(), responseStatusCode)
}

switch determineWriteErrorCause(err, 1) {
case nil:
return
case errNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errUnavailable:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case errConflict:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
default:
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries)))
totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

// forward accepts a write request, batches its time series by
Expand Down

0 comments on commit a0f4181

Please sign in to comment.