Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy: Only generate debug messages in debug mode #6228

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6201](https://github.com/thanos-io/thanos/pull/6201) Query-Frontend: Disable absent and absent_over_time for vertical sharding.
- [#6212](https://github.com/thanos-io/thanos/pull/6212) Query-Frontend: Disable scalar for vertical sharding.
- [#6107](https://github.com/thanos-io/thanos/pull/6082) Change default user id in container image from 0(root) to 1001
- [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat.

### Removed

11 changes: 9 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
@@ -229,7 +229,7 @@ func registerQuery(app *extkingpin.App) {
var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
@@ -278,6 +278,7 @@ func registerQuery(app *extkingpin.App) {
return runQuery(
g,
logger,
debugLogging,
reg,
tracer,
httpLogOpts,
@@ -353,6 +354,7 @@ func registerQuery(app *extkingpin.App) {
func runQuery(
g *run.Group,
logger log.Logger,
debugLogging bool,
reg *prometheus.Registry,
tracer opentracing.Tracer,
httpLogOpts []logging.Option,
@@ -490,6 +492,11 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
}

var (
endpoints = query.NewEndpointSet(
time.Now,
@@ -541,7 +548,7 @@ func runQuery(
endpointInfoTimeout,
queryConnMetricLabels...,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy))
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
10 changes: 9 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ func registerReceive(app *extkingpin.App) {
conf := &receiveConfig{}
conf.registerFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
lset, err := parseFlagLabels(conf.labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
@@ -97,6 +97,7 @@ func registerReceive(app *extkingpin.App) {
return runReceive(
g,
logger,
debugLogging,
reg,
tracer,
grpcLogOpts, tagOpts,
@@ -113,6 +114,7 @@ func registerReceive(app *extkingpin.App) {
func runReceive(
g *run.Group,
logger log.Logger,
debugLogging bool,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcLogOpts []grpc_logging.Option,
@@ -305,6 +307,11 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
}

proxy := store.NewProxyStore(
logger,
reg,
@@ -313,6 +320,7 @@ func runReceive(
labels.Labels{},
0,
store.LazyRetrieval,
options...,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
42 changes: 35 additions & 7 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
@@ -76,6 +76,7 @@ type ProxyStore struct {
responseTimeout time.Duration
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
debugLogging bool
}

type proxyStoreMetrics struct {
@@ -99,6 +100,16 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
}
}

// BucketStoreOption are functions that configure BucketStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging enables debug logging.
func WithProxyStoreDebugLogging() ProxyStoreOption {
return func(s *ProxyStore) {
s.debugLogging = true
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
@@ -109,6 +120,7 @@ func NewProxyStore(
selectorLabels labels.Labels,
responseTimeout time.Duration,
retrievalStrategy RetrievalStrategy,
options ...ProxyStoreOption,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
@@ -128,6 +140,11 @@ func NewProxyStore(
metrics: metrics,
retrievalStrategy: retrievalStrategy,
}

for _, option := range options {
option(s)
}

return s
}

@@ -273,7 +290,9 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(srv.Context(), st, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
}
continue
}

@@ -289,8 +308,9 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.

for _, st := range stores {
st := st

storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
}

respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
if err != nil {
@@ -412,10 +432,14 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques

// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, r.Start, r.End); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
}
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}

g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
@@ -486,10 +510,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ

// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, r.Start, r.End); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
}
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}

g.Go(func() error {
resp, err := st.LabelValues(gctx, &storepb.LabelValuesRequest{