Skip to content

Commit

Permalink
Implement suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <[email protected]>
  • Loading branch information
saswatamcode committed Sep 14, 2022
1 parent 07762c5 commit f1cade9
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func runReceive(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(15*time.Second, ctx.Done(), func() error {
if err := webHandler.Limiter.HeadSeriesLimiter.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil {
if err := webHandler.Limiter.HeadSeriesLimiter.QueryMetaMonitoring(ctx); err != nil {
level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error())
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ Every Receive Router/RouterIngestor node, queries meta-monitoring for active ser

To use the feature, one should specify the following limiting config options:

Under `global`,
Under `global`:
- `meta_monitoring_url`: Specifies Prometheus Query API compatible meta-monitoring endpoint.
- `meta_monitoring_limit_query`: Option to specify PromQL query to execute against meta-monitoring. If not specified it is set to `sum(prometheus_tsdb_head_series) by (tenant)` by default.
- `meta_monitoring_http_client`: Optional YAML field specifying HTTP client config for meta-monitoring.

Under `default` and per `tenant`,
Under `default` and per `tenant`:
- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.

NOTE:
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}
defer h.Limiter.writeGate.Done()

under, err := h.Limiter.HeadSeriesLimiter.isUnderLimit(tenant, tLogger)
under, err := h.Limiter.HeadSeriesLimiter.isUnderLimit(tenant)
if err != nil {
level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error())
}
Expand Down
45 changes: 18 additions & 27 deletions pkg/receive/head_series_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
)

// headSeriesLimiter encompasses active series limiting logic.
type headSeriesLimiter interface {
QueryMetaMonitoring(context.Context, log.Logger) error
isUnderLimit(string, log.Logger) (bool, error)
}

// headSeriesLimit implements activeSeriesLimiter interface.
// headSeriesLimit implements headSeriesLimiter interface.
type headSeriesLimit struct {
mtx sync.RWMutex
limit map[string]uint64
Expand All @@ -36,10 +30,11 @@ type headSeriesLimit struct {
metaMonitoringClient *http.Client
metaMonitoringQuery string

configuredTenantLimit *prometheus.GaugeVec
configuredDefaultLimit prometheus.Gauge
limitedRequests *prometheus.CounterVec
metaMonitoringErr prometheus.Counter
configuredTenantLimit *prometheus.GaugeVec
limitedRequests *prometheus.CounterVec
metaMonitoringErr prometheus.Counter

logger log.Logger
}

func NewHeadSeriesLimit(w WriteLimitsConfig, registerer prometheus.Registerer, logger log.Logger) *headSeriesLimit {
Expand All @@ -49,16 +44,10 @@ func NewHeadSeriesLimit(w WriteLimitsConfig, registerer prometheus.Registerer, l
defaultLimit: w.DefaultLimits.HeadSeriesLimit,
configuredTenantLimit: promauto.With(registerer).NewGaugeVec(
prometheus.GaugeOpts{
Name: "thanos_receive_tenant_head_series_limit",
Name: "thanos_receive_head_series_limit",
Help: "The configured limit for active (head) series of tenants.",
}, []string{"tenant"},
),
configuredDefaultLimit: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_default_head_series_limit",
Help: "The configured default limit for active (head) series of tenants.",
},
),
limitedRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_head_series_limited_requests_total",
Expand All @@ -71,9 +60,11 @@ func NewHeadSeriesLimit(w WriteLimitsConfig, registerer prometheus.Registerer, l
Help: "The total number of meta-monitoring queries that failed while limiting.",
},
),
logger: log.With(logger, "component", "receive-head-series-limiter"),
}

limit.configuredDefaultLimit.Set(float64(w.DefaultLimits.HeadSeriesLimit))
// Record default limit with empty tenant label.
limit.configuredTenantLimit.WithLabelValues("").Set(float64(limit.defaultLimit))

// Initialize map for configured limits of each tenant.
limit.limit = map[string]uint64{}
Expand Down Expand Up @@ -112,16 +103,16 @@ func NewHeadSeriesLimit(w WriteLimitsConfig, registerer prometheus.Registerer, l
// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring
// solution with the configured query for getting current active (head) series of all tenants.
// It then populates tenantCurrentSeries map with result.
func (h *headSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error {
c := promclient.NewWithTracingClient(logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent)
func (h *headSeriesLimit) QueryMetaMonitoring(ctx context.Context) error {
c := promclient.NewWithTracingClient(h.logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent)

vectorRes, _, err := c.QueryInstant(ctx, h.metaMonitoringURL, h.metaMonitoringQuery, time.Now(), promclient.QueryOptions{})
if err != nil {
h.metaMonitoringErr.Inc()
return err
}

level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))
level.Debug(h.logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))

h.mtx.Lock()
defer h.mtx.Unlock()
Expand All @@ -130,7 +121,7 @@ func (h *headSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Lo
for k, v := range e.Metric {
if k == "tenant" {
h.tenantCurrentSeriesMap[string(v)] = float64(e.Value)
level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
level.Debug(h.logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
}
}
}
Expand All @@ -140,7 +131,7 @@ func (h *headSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Lo

// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit.
// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits.
func (h *headSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) {
func (h *headSeriesLimit) isUnderLimit(tenant string) (bool, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
if len(h.limit) == 0 && h.defaultLimit == 0 {
Expand All @@ -165,7 +156,7 @@ func (h *headSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool,
}

if v >= float64(limit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", limit)
level.Error(h.logger).Log("msg", "tenant above limit", "tenant", tenant, "currentSeries", v, "limit", limit)
h.limitedRequests.WithLabelValues(tenant).Inc()
return false, nil
}
Expand All @@ -180,10 +171,10 @@ func NewNopSeriesLimit() *nopSeriesLimit {
return &nopSeriesLimit{}
}

func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context, _ log.Logger) error {
func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context) error {
return nil
}

func (a *nopSeriesLimit) isUnderLimit(_ string, _ log.Logger) (bool, error) {
func (a *nopSeriesLimit) isUnderLimit(_ string) (bool, error) {
return true, nil
}
15 changes: 15 additions & 0 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package receive

import (
"context"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/extprom"
Expand All @@ -16,6 +18,19 @@ type limiter struct {
HeadSeriesLimiter headSeriesLimiter
}

// requestLimiter encompasses logic for limiting remote write requests.
type requestLimiter interface {
AllowSizeBytes(tenant string, contentLengthBytes int64) bool
AllowSeries(tenant string, amount int64) bool
AllowSamples(tenant string, amount int64) bool
}

// headSeriesLimiter encompasses active/head series limiting logic.
type headSeriesLimiter interface {
QueryMetaMonitoring(context.Context) error
isUnderLimit(tenant string) (bool, error)
}

func newLimiter(root *RootLimitsConfig, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) *limiter {
limiter := &limiter{
writeGate: gate.NewNoop(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/limiter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type WriteLimitConfig struct {
HeadSeriesLimit *uint64 `yaml:"head_series_limit"`
}

// Utils for initialzing.
// Utils for initializing.
func NewEmptyWriteLimitConfig() *WriteLimitConfig {
return &WriteLimitConfig{}
}
Expand All @@ -106,7 +106,7 @@ type requestLimitsConfig struct {
SamplesLimit *int64 `yaml:"samples_limit"`
}

// Utils for initialzing.
// Utils for initializing.
func newEmptyRequestLimitsConfig() *requestLimitsConfig {
return &requestLimitsConfig{}
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/receive/request_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ var unlimitedRequestLimitsConfig = newEmptyRequestLimitsConfig().
SetSeriesLimit(0).
SetSamplesLimit(0)

type requestLimiter interface {
AllowSizeBytes(tenant string, contentLengthBytes int64) bool
AllowSeries(tenant string, amount int64) bool
AllowSamples(tenant string, amount int64) bool
}

// configRequestLimiter implements requestLimiter interface.
type configRequestLimiter struct {
tenantLimits map[string]*requestLimitsConfig
cachedDefaultLimits *requestLimitsConfig
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func TestReceive(t *testing.T) {

// Here for exceed-tenant we go above limit by 10, which results in 0 value.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_default_head_series_limit{instance=\"e2e-multitenant-active-series-limiting-receive-i1:8080\", job=\"receive-i1\"}"
return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"e2e-multitenant-active-series-limiting-receive-i1:8080\", job=\"receive-i1\"}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -739,7 +739,7 @@ func TestReceive(t *testing.T) {

// For under-tenant we stay at -5, as we have only pushed 5 series.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_default_head_series_limit{instance=\"e2e-multitenant-active-series-limiting-receive-i1:8080\", job=\"receive-i1\"}"
return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"e2e-multitenant-active-series-limiting-receive-i1:8080\", job=\"receive-i1\"}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand Down

0 comments on commit f1cade9

Please sign in to comment.