-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose tsdb status in receiver #5402
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
// Copyright 2016 The Prometheus Authors | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package status | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
"net/http" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/opentracing/opentracing-go" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/common/route" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/tsdb" | ||
"github.com/prometheus/prometheus/tsdb/index" | ||
v1 "github.com/prometheus/prometheus/web/api/v1" | ||
"github.com/thanos-io/thanos/pkg/api" | ||
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" | ||
"github.com/thanos-io/thanos/pkg/logging" | ||
) | ||
|
||
// Stat holds the information about individual cardinality. | ||
type Stat struct { | ||
Name string `json:"name"` | ||
Value uint64 `json:"value"` | ||
} | ||
|
||
func convertStats(stats []index.Stat) []Stat { | ||
result := make([]Stat, 0, len(stats)) | ||
for _, item := range stats { | ||
item := Stat{Name: item.Name, Value: item.Count} | ||
result = append(result, item) | ||
} | ||
return result | ||
} | ||
|
||
// TSDBStatus has information of cardinality statistics from postings. | ||
type TSDBStatus struct { | ||
HeadStats v1.HeadStats `json:"headStats"` | ||
SeriesCountByMetricName []Stat `json:"seriesCountByMetricName"` | ||
LabelValueCountByLabelName []Stat `json:"labelValueCountByLabelName"` | ||
MemoryInBytesByLabelName []Stat `json:"memoryInBytesByLabelName"` | ||
SeriesCountByLabelValuePair []Stat `json:"seriesCountByLabelValuePair"` | ||
} | ||
|
||
type GetStatsFunc func(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) | ||
|
||
type Options struct { | ||
GetStats GetStatsFunc | ||
Registry *prometheus.Registry | ||
} | ||
|
||
// TODO(fpetkovski): replace with upstream struct after dependency update. | ||
type StatusAPI struct { | ||
getTSDBStats GetStatsFunc | ||
options Options | ||
} | ||
|
||
func New(opts Options) *StatusAPI { | ||
return &StatusAPI{ | ||
getTSDBStats: opts.GetStats, | ||
options: opts, | ||
} | ||
} | ||
|
||
func (sapi *StatusAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) { | ||
instr := api.GetInstr(tracer, logger, ins, logMiddleware, false) | ||
r.Get("/api/v1/status/tsdb", instr("tsdb_status", sapi.httpServeStats)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other federated APIs like rules, targets, exemplars are exposed as gRPC APIs at the store, and federations are done at the Querier level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we have this issue at the moment: #5395 Maybe we can discuss federation there, but I think it can be a good idea to do it. We just have to see how merging is going to work, and whether we want to merge the results in a single json object, or have one object per store target. I think we have the same question if we want to expose tsdb stats for all tenants. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding merging, I'm wondering how data replication for Receive will affect it. AFAIU, we use quorum-based logic for replication, so maybe merging this would lead to some inaccuracies in TSDB statistics for a tenant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes agreed. The stats can get very inaccurate if we merge across replicas. |
||
} | ||
|
||
func (sapi *StatusAPI) httpServeStats(r *http.Request) (interface{}, []error, *api.ApiError) { | ||
s, err := sapi.getTSDBStats(r, labels.MetricName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this API can only fetch the TSDB stats for a specific tenant. Admins cannot use this API to to see the global TSDB stats (cross tenant). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's correct. We could extend the endpoint to provide a global view by traversing all tenants and merging the results. This could maybe happen when an explicit parameter is set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. Let's have an issue to track it after this pr is merged. |
||
if err != nil { | ||
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: err} | ||
} | ||
|
||
if s == nil { | ||
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: fmt.Errorf("unknown tenant")} | ||
} | ||
|
||
metrics, err := sapi.options.Registry.Gather() | ||
if err != nil { | ||
return nil, []error{err}, nil | ||
} | ||
|
||
chunkCount := int64(math.NaN()) | ||
for _, mF := range metrics { | ||
if *mF.Name == "prometheus_tsdb_head_chunks" { | ||
m := *mF.Metric[0] | ||
if m.Gauge != nil { | ||
chunkCount = int64(m.Gauge.GetValue()) | ||
break | ||
} | ||
} | ||
} | ||
|
||
return TSDBStatus{ | ||
HeadStats: v1.HeadStats{ | ||
NumSeries: s.NumSeries, | ||
ChunkCount: chunkCount, | ||
MinTime: s.MinTime, | ||
MaxTime: s.MaxTime, | ||
NumLabelPairs: s.IndexPostingStats.NumLabelPairs, | ||
}, | ||
SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats), | ||
LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats), | ||
MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats), | ||
SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats), | ||
}, nil, nil | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,9 @@ import ( | |
"sync" | ||
"time" | ||
|
||
statusapi "github.com/thanos-io/thanos/pkg/api/status" | ||
"github.com/thanos-io/thanos/pkg/logging" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/gogo/protobuf/proto" | ||
|
@@ -72,7 +75,7 @@ var ( | |
type Options struct { | ||
Writer *Writer | ||
ListenAddress string | ||
Registry prometheus.Registerer | ||
Registry *prometheus.Registry | ||
TenantHeader string | ||
DefaultTenantID string | ||
ReplicaHeader string | ||
|
@@ -84,6 +87,7 @@ type Options struct { | |
DialOpts []grpc.DialOption | ||
ForwardTimeout time.Duration | ||
RelabelConfigs []*relabel.Config | ||
GetTSDBStats GetStatsFunc | ||
} | ||
|
||
// Handler serves a Prometheus remote write receiving HTTP endpoint. | ||
|
@@ -111,6 +115,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler { | |
logger = log.NewNopLogger() | ||
} | ||
|
||
var registerer prometheus.Registerer = nil | ||
if o.Registry != nil { | ||
registerer = o.Registry | ||
} | ||
|
||
h := &Handler{ | ||
logger: logger, | ||
writer: o.Writer, | ||
|
@@ -124,19 +133,19 @@ func NewHandler(logger log.Logger, o *Options) *Handler { | |
Max: 30 * time.Second, | ||
Jitter: true, | ||
}, | ||
forwardRequests: promauto.With(o.Registry).NewCounterVec( | ||
forwardRequests: promauto.With(registerer).NewCounterVec( | ||
prometheus.CounterOpts{ | ||
Name: "thanos_receive_forward_requests_total", | ||
Help: "The number of forward requests.", | ||
}, []string{"result"}, | ||
), | ||
replications: promauto.With(o.Registry).NewCounterVec( | ||
replications: promauto.With(registerer).NewCounterVec( | ||
prometheus.CounterOpts{ | ||
Name: "thanos_receive_replications_total", | ||
Help: "The number of replication operations done by the receiver. The success of replication is fulfilled when a quorum is met.", | ||
}, []string{"result"}, | ||
), | ||
replicationFactor: promauto.With(o.Registry).NewGauge( | ||
replicationFactor: promauto.With(registerer).NewGauge( | ||
prometheus.GaugeOpts{ | ||
Name: "thanos_receive_replication_factor", | ||
Help: "The number of times to replicate incoming write requests.", | ||
|
@@ -173,6 +182,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler { | |
|
||
h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP))))) | ||
|
||
statusAPI := statusapi.New(statusapi.Options{ | ||
GetStats: h.getStats, | ||
Registry: o.Registry, | ||
}) | ||
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger)) | ||
|
||
return h | ||
} | ||
|
||
|
@@ -214,6 +229,19 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc { | |
} | ||
} | ||
|
||
func (h *Handler) getStats(r *http.Request, statsByLabelName string) (*tsdb.Stats, error) { | ||
if !h.isReady() { | ||
return nil, fmt.Errorf("service unavailable") | ||
} | ||
|
||
tenantID := r.Header.Get(h.options.TenantHeader) | ||
if tenantID == "" { | ||
tenantID = h.options.DefaultTenantID | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmmmmm this is going to hit us IMO, better to fail 🤔 I can imagine lots of confusion here. But the trade-off is that it works for default receive with no configuration I guess... Maybe fixable with documentation. |
||
} | ||
|
||
return h.options.GetTSDBStats(tenantID, statsByLabelName), nil | ||
} | ||
|
||
// Close stops the Handler. | ||
func (h *Handler) Close() { | ||
if h.listener != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove these structs if prometheus/prometheus#10783 gets merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was merged, we can re-use the struct now 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I need to go get the latest prometheus version from master/main, or do we need to wait for a new point release? I'm not sure how Thanos manages dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the latest main directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am getting the following conflict
Seems that Prometheus has introduced some changes in public functions, and the cortex fork that Thanos uses is not up to date yet. Should I submit a PR there to update to latest prometheus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so while adding groupcache we had to make some changes in Thanos, which affected cortex imports just a little. Then we decided to update my fork of cortex and point Thanos to it, and once groupcache changes were merged we were going to create a PR to Cortex, to have those changes merged and point Thanos back to the original cortex.
Now #4651 is open, it's been reviewed by @GiedriusS and @bwplotka but not by any cortex maintainer yet. I am not sure what the next step is. @GiedriusS would you like to help me out with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While waiting for that to get merged, should we maybe rebase the fork against the latest master in Cortex in order to get this PR unblocked? Otherwise, we will blocked whenever we try to update the prometheus/prometheus dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for relase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could rebase cortex fork.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's time to remove dep on Cortex, it's quite painful recently (: