Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querier/Sidecar/StoreGW: Implement Info service and add --endpoint flag in Querier #4282

Merged
merged 16 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,24 @@ func registerQuery(app *extkingpin.App) {
selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()

stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
endpoints := cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
PlaceHolder("<endpoint>").Strings()

stores := cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>").Strings()

// TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
ruleEndpoints := cmd.Flag("rule", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<rule>").Strings()

metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
metadataEndpoints := cmd.Flag("metadata", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
Hidden().PlaceHolder("<metadata>").Strings()

exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
exemplarEndpoints := cmd.Flag("exemplar", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups.").
Hidden().PlaceHolder("<exemplar>").Strings()

// TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600.
targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
targetEndpoints := cmd.Flag("target", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups.").
Hidden().PlaceHolder("<target>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
Expand Down Expand Up @@ -264,6 +267,7 @@ func registerQuery(app *extkingpin.App) {
*queryReplicaLabels,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
*stores,
*ruleEndpoints,
*targetEndpoints,
Expand Down Expand Up @@ -329,6 +333,7 @@ func runQuery(
queryReplicaLabels []string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
Expand Down Expand Up @@ -376,6 +381,12 @@ func runQuery(
}
}

dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
dns.ResolverType(dnsSDResolver),
)

dnsRuleProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg),
Expand Down Expand Up @@ -410,7 +421,14 @@ func runQuery(
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

for _, dnsProvider := range []*dns.Provider{dnsStoreProvider, dnsRuleProvider, dnsExemplarProvider, dnsMetadataProvider, dnsTargetProvider} {
for _, dnsProvider := range []*dns.Provider{
dnsStoreProvider,
dnsRuleProvider,
dnsExemplarProvider,
dnsMetadataProvider,
dnsTargetProvider,
dnsEndpointProvider,
} {
var tmpSpecs []query.EndpointSpec

for _, addr := range dnsProvider.Addresses() {
Expand Down Expand Up @@ -527,6 +545,10 @@ func runQuery(
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
}
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err)

}
return nil
})
}, func(error) {
Expand Down
40 changes: 39 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/info"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
meta "github.com/thanos-io/thanos/pkg/metadata"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/reloader"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -241,12 +245,46 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

exemplarSrv := exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels)

infoSrv := info.NewInfoServer(
component.Sidecar.String(),
info.WithLabelSet(func() []labelpb.ZLabelSet {
return promStore.LabelSet()
}),
info.WithStoreInfo(func() *infopb.StoreInfo {
mint, maxt := promStore.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
}
}),
info.WithExemplarsInfo(func() *infopb.ExemplarsInfo {
// Currently Exemplars API does not expose metadata such as min/max time,
// so we are using default minimum and maximum possible values as min/max time.
return &infopb.ExemplarsInfo{
MinTime: query.MinTime,
MaxTime: query.MaxTime,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use global variable for the two values to avoid calculating them each time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hitanshu-mehta We are still calling .Unix() every time. Can't we just initialize v1.MinTime and v1.MaxTime as int64? Are they used anywhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, they are not being used anywhere else. Agree, I should've used int64. I'll correct it. Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is we can have kind of constants somewhere in global level. Can we do that? It's still to be imp[roved?

}),
info.WithRulesInfo(func() *infopb.RulesInfo {
return &infopb.RulesInfo{}
}),
info.WithTargetInfo(func() *infopb.TargetsInfo {
return &infopb.TargetsInfo{}
}),
info.WithMetricMetadataInfo(func() *infopb.MetricMetadataInfo {
return &infopb.MetricMetadataInfo{}
}),
)

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplarSrv)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
19 changes: 19 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/info"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand All @@ -40,6 +42,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -382,6 +385,21 @@ func runStore(
cancel()
})
}

infoSrv := info.NewInfoServer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the deprecation plan for ths old Info methods? Can we unify implementation somehow at least?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really sorry for delay.
Here is the deprecation plan I had in mind.

  1. Remove old flags (i.e. --store, --rule, --metadata, --exemplar, --target) and all the boilerplate code we have for all commands. ( eg. duplication check, dnsProviders ...)
  2. Remove storeset.go file ( I am working on to move new endpoint flow in new file as discussed here ) and all dependencies of this file from the code base.
  3. Remove Info rpc from proto file of Store API and all its corresponding implementations. ( to be specific there 5 different implementations we have of StoreClient - BucketStore, MultiTSDBStore, PrometheusStore, ProxyStore and TSDBStore)

All these steps will be done in single PR. I think this plan should work .
Please let me know if im missing something :)

Copy link
Contributor Author

@hitanshu-mehta hitanshu-mehta Jul 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify implementation somehow at least?

Why do we need to unify old info methods?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. When we want to change flag we usually deprecate flag first (add in help that it will be removed after next 2 releases). Then add new flags that works simultaneously. Fail if both new and old way is used. Map old flags to new one really before starting the program.
  2. Agree
  3. Generally makes sense, but we need to make sure that e.g old Querier having a client against old StoreAPI will work with new InfoAPI. Probably we will need to leave old Info Method (add deprecated comment) in old APIs. In the same time the new Querier should work with old APIs.
    • Potential flow: 1. Try (reflect)? InfoAPI end endpoint. If gRPC server does not support it we fallback to store, if store does it work then ruler ..... only for first call / error.
    • e2e test
    • IMPORTANT: This does not mean we need to implement separate info method for every server. We can implement one Info method and use the same one for both InfoAPI and e.g StoreAPI.

component.Store.String(),
info.WithLabelSet(func() []labelpb.ZLabelSet {
return bs.LabelSet()
}),
info.WithStoreInfo(func() *infopb.StoreInfo {
mint, maxt := bs.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
}
}),
)

// Start query (proxy) gRPC StoreAPI.
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)
Expand All @@ -391,6 +409,7 @@ func runStore(

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
16 changes: 11 additions & 5 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ Flags:
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
--endpoint=<endpoint> ... Addresses of statically configured Thanos API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
Thanos API servers through respective DNS
lookups.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down Expand Up @@ -367,11 +372,12 @@ Flags:
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
--store=<store> ... Addresses of statically configured store API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
store API servers through respective DNS
lookups.
--store=<store> ... Deprecation Warning - This flag is deprecated
and replaced with `endpoint`. Addresses of
statically configured store API servers
(repeatable). The scheme may be prefixed with
'dns+' or 'dnssrv+' to detect store API servers
through respective DNS lookups.
--store-strict=<staticstore> ...
Addresses of only statically configured store
API servers that are always used, even if the
Expand Down
122 changes: 122 additions & 0 deletions pkg/info/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package info

import (
"context"

"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"google.golang.org/grpc"
)

type InfoServer struct {
infopb.UnimplementedInfoServer

component string

getLabelSet func() []labelpb.ZLabelSet
getStoreInfo func() *infopb.StoreInfo
getExemplarsInfo func() *infopb.ExemplarsInfo
getRulesInfo func() *infopb.RulesInfo
getTargetsInfo func() *infopb.TargetsInfo
getMetricMetadataInfo func() *infopb.MetricMetadataInfo
}

func NewInfoServer(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional nice to have: Would be cool to use functional options for this, instead of having to pass nil for the ones not available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response.

SGTM!

component string,
options ...func(*InfoServer),
) *InfoServer {
srv := &InfoServer{
component: component,
}

for _, o := range options {
o(srv)
}

return srv
}

func WithLabelSet(getLabelSet func() []labelpb.ZLabelSet) func(*InfoServer) {
return func(s *InfoServer) {
s.getLabelSet = getLabelSet
}
}

func WithStoreInfo(getStoreInfo func() *infopb.StoreInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getStoreInfo = getStoreInfo
}
}

func WithRulesInfo(getRulesInfo func() *infopb.RulesInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getRulesInfo = getRulesInfo
}
}

func WithExemplarsInfo(getExemplarsInfo func() *infopb.ExemplarsInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getExemplarsInfo = getExemplarsInfo
}
}

func WithTargetInfo(getTargetsInfo func() *infopb.TargetsInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getTargetsInfo = getTargetsInfo
}
}

func WithMetricMetadataInfo(getMetricMetadataInfo func() *infopb.MetricMetadataInfo) func(*InfoServer) {
return func(s *InfoServer) {
s.getMetricMetadataInfo = getMetricMetadataInfo
}
}

// RegisterInfoServer register info server.
func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) {
return func(s *grpc.Server) {
infopb.RegisterInfoServer(s, infoSrv)
}
}

func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*infopb.InfoResponse, error) {

if srv.getLabelSet == nil {
srv.getLabelSet = func() []labelpb.ZLabelSet { return nil }
}

if srv.getStoreInfo == nil {
srv.getStoreInfo = func() *infopb.StoreInfo { return nil }
}

if srv.getExemplarsInfo == nil {
srv.getExemplarsInfo = func() *infopb.ExemplarsInfo { return nil }
}

if srv.getRulesInfo == nil {
srv.getRulesInfo = func() *infopb.RulesInfo { return nil }
}

if srv.getTargetsInfo == nil {
srv.getTargetsInfo = func() *infopb.TargetsInfo { return nil }
}

if srv.getMetricMetadataInfo == nil {
srv.getMetricMetadataInfo = func() *infopb.MetricMetadataInfo { return nil }
}

resp := &infopb.InfoResponse{
LabelSets: srv.getLabelSet(),
ComponentType: srv.component,
Store: srv.getStoreInfo(),
Exemplars: srv.getExemplarsInfo(),
Rules: srv.getRulesInfo(),
Targets: srv.getTargetsInfo(),
MetricMetadata: srv.getMetricMetadataInfo(),
}

return resp, nil
}
18 changes: 11 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,16 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
return mint, maxt
}

func (s *BucketStore) LabelSet() []labelpb.ZLabelSet {
labelSets := s.advLabelSets

if s.enableCompatibilityLabel && len(labelSets) > 0 {
labelSets = append(labelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}})
}

return labelSets
}

// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
Expand All @@ -691,14 +701,8 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
}

s.mtx.RLock()
res.LabelSets = s.advLabelSets
res.LabelSets = s.LabelSet()
s.mtx.RUnlock()

if s.enableCompatibilityLabel && len(res.LabelSets) > 0 {
// This is for compatibility with Querier v0.7.0.
// See query.StoreCompatibilityTypeLabelName comment for details.
res.LabelSets = append(res.LabelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}})
}
return res, nil
}

Expand Down
Loading