diff --git a/CHANGELOG.md b/CHANGELOG.md index a843f4839b..5a3b11d5d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4679](https://github.com/thanos-io/thanos/pull/4679) Added `enable-feature` flag to enable negative offsets and @ modifier, similar to Prometheus. - [#4696](https://github.com/thanos-io/thanos/pull/4696) Query: add cache name to tracing spans. - [#4764](https://github.com/thanos-io/thanos/pull/4764) Compactor: add `block-viewer.global.sync-block-timeout` flag to set the timeout of synchronization block metas. +- [#4389](https://github.com/thanos-io/thanos/pull/4389) Querier: add `endpoint.configuration` and `endpoint.configuration-file` for granular endpoint configuration YAML content or file. + - *:warning:* This also deprecates the following flags `store.sd-interval`, `store.sd-dns-interval`, `store.sd-dns-resolver`, `store.sd-files` and all `grpc-client-.*`. They will be removed in v0.27.0. ### Fixed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 373cd57913..e15ceb7359 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -25,14 +25,16 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" + "github.com/thanos-io/thanos/pkg/exthttp" + extflag "github.com/efficientgo/tools/extkingpin" grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" v1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/exemplars" + "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" @@ -40,14 +42,17 @@ import ( "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/metadata" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" + "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/targets" + "github.com/thanos-io/thanos/pkg/targets/targetspb" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/ui" ) @@ -65,12 +70,13 @@ func registerQuery(app *extkingpin.App) { httpBindAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd) grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA, grpcMaxConnAge := extkingpin.RegisterGRPCFlags(cmd) - secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() - skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() - cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() - key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String() - caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String() - serverName := cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() + // TODO(bwplotka): Remove in 0.27.0. + secure := cmd.Flag("grpc-client-tls-secure", "Deprecated: Use endpoint.config instead. Use TLS when talking to the gRPC server").Default("false").Bool() + skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Deprecated: Use endpoint.config instead. Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() + cert := cmd.Flag("grpc-client-tls-cert", "Deprecated: Use endpoint.config instead. TLS Certificates to use to identify this client to the server").Default("").String() + key := cmd.Flag("grpc-client-tls-key", "Deprecated: Use endpoint.config instead. TLS Key for the client's certificate").Default("").String() + caCert := cmd.Flag("grpc-client-tls-ca", "Deprecated: Use endpoint.config instead. TLS CA Certificates to use to verify gRPC servers").Default("").String() + serverName := cmd.Flag("grpc-client-server-name", "Deprecated: Use endpoint.config instead. Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() webRoutePrefix := cmd.Flag("web.route-prefix", "Prefix for API and UI endpoints. This allows thanos UI to be served on a sub-path. Defaults to the value of --web.external-prefix. This option is analogous to --web.route-prefix of Prometheus.").Default("").String() webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() @@ -101,37 +107,33 @@ func registerQuery(app *extkingpin.App) { selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("=\"\"").Strings() - stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). - PlaceHolder("").Strings() - // TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups."). Hidden().PlaceHolder("").Strings() - metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). Hidden().PlaceHolder("").Strings() - - exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups."). + exemplarEndpoints := cmd.Flag("exemplar", "Experimental: Use endpoint or endpoint.config. Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups."). Hidden().PlaceHolder("").Strings() - - // TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). + targetEndpoints := cmd.Flag("target", "Experimental: Use endpoint or endpoint.config. Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). Hidden().PlaceHolder("").Strings() + endpointConfig := extflag.RegisterPathOrContent(cmd, "endpoint.config", "YAML file that contains set of endpoints (e.g Store API) with optional TLS options. To enable TLS either use this option or deprecated ones --grpc-client-tls* .", extflag.WithEnvSubstitution()) + + stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). + PlaceHolder("").Strings() + strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). PlaceHolder("").Strings() - fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). + // TODO(bwplotka): Remove in 0.27.0. + fileSDFiles := cmd.Flag("store.sd-files", "Deprecated: Use endpoint.config instead. Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). PlaceHolder("").Strings() - - fileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "Refresh interval to re-read file SD files. It is used as a resync fallback."). + fileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "Deprecated: Use endpoint.config instead. Refresh interval to re-read file SD files. It is used as a resync fallback."). Default("5m")) - // TODO(bwplotka): Grab this from TTL at some point. - dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). + dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Deprecated: Use endpoint.config instead. Interval between DNS resolutions."). Default("30s")) - - dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). + dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Deprecated: Use endpoint.config instead. Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). Default(string(dns.MiekgdnsResolverType)).Hidden().String() unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) @@ -210,13 +212,21 @@ func registerQuery(app *extkingpin.App) { return errors.Errorf("Address %s is duplicated for --target flag.", dup) } - var fileSD *file.Discovery + endpointConfigYAML, err := endpointConfig.Content() + if err != nil { + return err + } + + if *secure && len(endpointConfigYAML) != 0 { + return errors.Errorf("deprecated flags --grpc-client-tls* and new --endpoint.config flag cannot be specified at the same time; use either of those") + } + + var fileSDConfig *file.SDConfig if len(*fileSDFiles) > 0 { - conf := &file.SDConfig{ + fileSDConfig = &file.SDConfig{ Files: *fileSDFiles, RefreshInterval: *fileSDInterval, } - fileSD = file.NewDiscovery(conf, logger) } if *webRoutePrefix == "" { @@ -275,7 +285,8 @@ func registerQuery(app *extkingpin.App) { *enableTargetPartialResponse, *enableMetricMetadataPartialResponse, *enableExemplarPartialResponse, - fileSD, + fileSDConfig, + endpointConfigYAML, time.Duration(*dnsSDInterval), *dnsSDResolver, time.Duration(*unhealthyStoreTimeout), @@ -340,7 +351,8 @@ func runQuery( enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, - fileSD *file.Discovery, + fileSDConfig *file.SDConfig, + endpointConfigYAML []byte, dnsSDInterval time.Duration, dnsSDResolver string, unhealthyStoreTimeout time.Duration, @@ -358,78 +370,97 @@ func runQuery( Help: "The number of times a duplicated store addresses is detected from the different configs in query", }) - dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName) - if err != nil { - return errors.Wrap(err, "building gRPC client") + // TLSConfig for endpoints provided in --store, --store.sd-files and --store-strict. + var TLSConfig exthttp.TLSConfig + if secure { + TLSConfig = exthttp.TLSConfig{ + CertFile: cert, + KeyFile: key, + CAFile: caCert, + ServerName: serverName, + } } - fileSDCache := cache.New() - dnsStoreProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_store_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) + // TODO(bwplotka): Allow filtering by API through config. + combinedAddresses := storeAddrs + combinedAddresses = append(combinedAddresses, ruleAddrs...) + combinedAddresses = append(combinedAddresses, metadataAddrs...) + combinedAddresses = append(combinedAddresses, exemplarAddrs...) + combinedAddresses = append(combinedAddresses, targetAddrs...) - for _, store := range strictStores { - if dns.IsDynamicNode(store) { - return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store) - } + // Create endpoint config combining flag-based options with --endpoint.config. + endpointConfig, err := query.LoadConfig(endpointConfigYAML, combinedAddresses, strictStores, fileSDConfig, TLSConfig) + if err != nil { + return errors.Wrap(err, "loading endpoint config") } - dnsRuleProvider := dns.NewProvider( + dnsProvider := dns.NewProvider( logger, - extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg), + extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), dns.ResolverType(dnsSDResolver), ) - dnsTargetProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) + engineOpts := promql.EngineOpts{ + Logger: logger, + Reg: reg, + // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + LookbackDelta: lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { + return defaultEvaluationInterval.Milliseconds() + }, + EnableAtModifier: enableAtModifier, + EnableNegativeOffset: enableNegativeOffset, + } - dnsMetadataProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) + var groups []*query.EndpointGroup + endpointSetGRPCMetrics := extgrpc.ClientGRPCMetrics(reg, "endpointset") + for _, config := range endpointConfig { + dialOpts, err := extgrpc.ClientGRPCOpts(logger, tracer, endpointSetGRPCMetrics, config.GRPCClientConfig) + if err != nil { + return errors.Wrap(err, "building gRPC options") + } - dnsExemplarProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_exemplar_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) + var g *query.EndpointGroup - var ( - endpoints = query.NewEndpointSet( - logger, - reg, - func() (specs []query.EndpointSpec) { - // Add strict & static nodes. - for _, addr := range strictStores { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) + var spec []query.EndpointSpec + if config.Mode == query.StrictEndpointMode { + // Add strict & static nodes. + for _, addr := range config.EndpointsConfig.Addresses { + if dns.IsDynamicNode(addr) { + return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", addr) } + spec = append(spec, query.NewGRPCEndpointSpec(addr, true)) + } - for _, dnsProvider := range []*dns.Provider{dnsStoreProvider, dnsRuleProvider, dnsExemplarProvider, dnsMetadataProvider, dnsTargetProvider} { - var tmpSpecs []query.EndpointSpec + // No dynamic resources when endpoint is strict. + g = query.NewEndpointGroup(nil, dialOpts) + } else { + // TODO(bwplotka): Consider adding provider per config name, for instrumentation purposes, but only if strongly requested. + d, err := extgrpc.NewDiscoverer(logger, config.EndpointsConfig, dnsProvider.Clone()) + if err != nil { + return errors.Wrap(err, "building discoverer") + } - for _, addr := range dnsProvider.Addresses() { - tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false)) - } - tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs) - specs = append(specs, tmpSpecs...) - } + addDiscoveryGroups(g, d, ??) + g = query.NewEndpointGroup(d, dialOpts) + } + } - return specs - }, - dialOpts, - unhealthyStoreTimeout, - ) - proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout) - rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) - targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) - metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients) - exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset) + endpointSet := query.NewEndpointSet( + logger, + reg, + groups, + unhealthyStoreTimeout, + ) + + var ( + proxy = store.NewProxyStore(logger, reg, endpointSet.GetStoreClients, component.Query, selectorLset, storeResponseTimeout) + rulesProxy = rules.NewProxy(logger, endpointSet.GetRulesClients) + targetsProxy = targets.NewProxy(logger, endpointSet.GetTargetsClients) + metadataProxy = metadata.NewProxy(logger, endpointSet.GetMetricMetadataClients) + exemplarsProxy = exemplars.NewProxy(logger, endpointSet.GetExemplarsStores, selectorLset) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), @@ -437,109 +468,13 @@ func runQuery( maxConcurrentSelects, queryTimeout, ) - engineOpts = promql.EngineOpts{ - Logger: logger, - Reg: reg, - // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. - MaxSamples: math.MaxInt32, - Timeout: queryTimeout, - LookbackDelta: lookbackDelta, - NoStepSubqueryIntervalFn: func(int64) int64 { - return defaultEvaluationInterval.Milliseconds() - }, - } - ) - - // Periodically update the store set with the addresses we see in our cluster. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - endpoints.Update(ctx) - return nil - }) - }, func(error) { - cancel() - endpoints.Close() - }) - } - // Run File Service Discovery and update the store set when the files are modified. - if fileSD != nil { - var fileSDUpdates chan []*targetgroup.Group - ctxRun, cancelRun := context.WithCancel(context.Background()) - - fileSDUpdates = make(chan []*targetgroup.Group) - - g.Add(func() error { - fileSD.Run(ctxRun, fileSDUpdates) - return nil - }, func(error) { - cancelRun() - }) - - engineOpts.EnableAtModifier = enableAtModifier - engineOpts.EnableNegativeOffset = enableNegativeOffset - - ctxUpdate, cancelUpdate := context.WithCancel(context.Background()) - g.Add(func() error { - for { - select { - case update := <-fileSDUpdates: - // Discoverers sometimes send nil updates so need to check for it to avoid panics. - if update == nil { - continue - } - fileSDCache.Update(update) - endpoints.Update(ctxUpdate) - - if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - - // Rules apis do not support file service discovery as of now. - case <-ctxUpdate.Done(): - return nil - } - } - }, func(error) { - cancelUpdate() - }) - } - // Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval) - defer resolveCancel() - if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) - } - if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) - } - if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) - } - if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) - } - return nil - }) - }, func(error) { - cancel() - }) - } - - grpcProbe := prober.NewGRPC() - httpProbe := prober.NewHTTP() - statusProber := prober.Combine( - httpProbe, - grpcProbe, - prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), + grpcProbe = prober.NewGRPC() + httpProbe = prober.NewHTTP() + statusProber = prober.Combine( + httpProbe, + grpcProbe, + prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), + ) ) // Start query API + UI HTTP server. @@ -565,11 +500,11 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName).Register(router, ins) + ui.NewQueryUI(logger, endpointSet.GetEndpointStatus, webExternalPrefix, webPrefixHeaderName).Register(router, ins) api := v1.NewQueryAPI( logger, - endpoints.GetEndpointStatus, + endpointSet.GetEndpointStatus, engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta), queryableCreator, // NOTE: Will share the same replica label as the query for now. @@ -648,23 +583,6 @@ func runQuery( return nil } -func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.EndpointSpec) []query.EndpointSpec { - set := make(map[string]query.EndpointSpec) - for _, spec := range specs { - addr := spec.Addr() - if _, ok := set[addr]; ok { - level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) - duplicatedStores.Inc() - } - set[addr] = spec - } - deduplicated := make([]query.EndpointSpec, 0, len(set)) - for _, value := range set { - deduplicated = append(deduplicated, value) - } - return deduplicated -} - // firstDuplicate returns the first duplicate string in the given string slice // or empty string if none was found. func firstDuplicate(ss []string) string { diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index ed0e8794e7..cbeef9af55 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/exthttp" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/exemplars" @@ -120,16 +121,21 @@ func runReceive( return err } + TLSConfig := exthttp.TLSConfig{ + CertFile: conf.rwClientCert, + KeyFile: conf.rwClientKey, + CAFile: conf.rwClientServerCA, + ServerName: conf.rwClientServerName, + } + dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, tracer, + "", *conf.grpcCert != "", *conf.grpcClientCA == "", - conf.rwClientCert, - conf.rwClientKey, - conf.rwClientServerCA, - conf.rwClientServerName, + TLSConfig, ) if err != nil { return err diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index d5893edd2a..8ee9783196 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -33,8 +33,8 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/strutil" "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/exthttp" "github.com/thanos-io/thanos/pkg/extkingpin" - "github.com/thanos-io/thanos/pkg/httpconfig" extflag "github.com/efficientgo/tools/extkingpin" "github.com/thanos-io/thanos/pkg/alert" @@ -265,29 +265,29 @@ func runRule( ) error { metrics := newRuleMetrics(reg) - var queryCfg []httpconfig.Config + var queryCfg []exthttp.Config var err error if len(conf.queryConfigYAML) > 0 { - queryCfg, err = httpconfig.LoadConfigs(conf.queryConfigYAML) + queryCfg, err = exthttp.LoadConfigs(conf.queryConfigYAML) if err != nil { return err } } else { - queryCfg, err = httpconfig.BuildConfig(conf.query.addrs) + queryCfg, err = exthttp.BuildConfig(conf.query.addrs) if err != nil { return errors.Wrap(err, "query configuration") } // Build the query configuration from the legacy query flags. - var fileSDConfigs []httpconfig.FileSDConfig + var fileSDConfigs []exthttp.FileSDConfig if len(conf.query.sdFiles) > 0 { - fileSDConfigs = append(fileSDConfigs, httpconfig.FileSDConfig{ + fileSDConfigs = append(fileSDConfigs, exthttp.FileSDConfig{ Files: conf.query.sdFiles, RefreshInterval: model.Duration(conf.query.sdInterval), }) queryCfg = append(queryCfg, - httpconfig.Config{ - EndpointsConfig: httpconfig.EndpointsConfig{ + exthttp.Config{ + EndpointsConfig: exthttp.EndpointsConfig{ Scheme: "http", FileSDConfigs: fileSDConfigs, }, @@ -301,22 +301,21 @@ func runRule( extprom.WrapRegistererWithPrefix("thanos_rule_query_apis_", reg), dns.ResolverType(conf.query.dnsSDResolver), ) - var queryClients []*httpconfig.Client + var queryClients []*exthttp.Client queryClientMetrics := extpromhttp.NewClientMetrics(extprom.WrapRegistererWith(prometheus.Labels{"client": "query"}, reg)) for _, cfg := range queryCfg { - cfg.HTTPClientConfig.ClientMetrics = queryClientMetrics - c, err := httpconfig.NewHTTPClient(cfg.HTTPClientConfig, "query") + c, err := exthttp.NewHTTPClient(cfg.HTTPClientConfig, "query", queryClientMetrics) if err != nil { return err } c.Transport = tracing.HTTPTripperware(logger, c.Transport) - queryClient, err := httpconfig.NewClient(logger, cfg.EndpointsConfig, c, queryProvider.Clone()) + queryClient, err := exthttp.NewClient(logger, cfg.EndpointsConfig, c, queryProvider.Clone()) if err != nil { return err } queryClients = append(queryClients, queryClient) // Discover and resolve query addresses. - addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) + addDiscoveryGroups(g, queryClient.Discoverer, conf.query.dnsSDInterval) } db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) @@ -379,19 +378,18 @@ func runRule( extprom.WrapRegistererWith(prometheus.Labels{"client": "alertmanager"}, reg), ) for _, cfg := range alertingCfg.Alertmanagers { - cfg.HTTPClientConfig.ClientMetrics = amClientMetrics - c, err := httpconfig.NewHTTPClient(cfg.HTTPClientConfig, "alertmanager") + c, err := exthttp.NewHTTPClient(cfg.HTTPClientConfig, "alertmanager", amClientMetrics) if err != nil { return err } c.Transport = tracing.HTTPTripperware(logger, c.Transport) // Each Alertmanager client has a different list of targets thus each needs its own DNS provider. - amClient, err := httpconfig.NewClient(logger, cfg.EndpointsConfig, c, amProvider.Clone()) + amClient, err := exthttp.NewClient(logger, cfg.EndpointsConfig, c, amProvider.Clone()) if err != nil { return err } // Discover and resolve Alertmanager addresses. - addDiscoveryGroups(g, amClient, conf.alertmgr.alertmgrsDNSSDInterval) + addDiscoveryGroups(g, amClient.Discoverer, conf.alertmgr.alertmgrsDNSSDInterval) alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion)) } @@ -705,7 +703,7 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh func queryFuncCreator( logger log.Logger, - queriers []*httpconfig.Client, + queriers []*exthttp.Client, duplicatedQuery prometheus.Counter, ruleEvalWarnings *prometheus.CounterVec, httpMethod string, @@ -761,10 +759,10 @@ func queryFuncCreator( } } -func addDiscoveryGroups(g *run.Group, c *httpconfig.Client, interval time.Duration) { +func addDiscoveryGroups(g *run.Group, d *exthttp.Discoverer, interval time.Duration) { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - c.Discover(ctx) + d.Discover(ctx) return nil }, func(error) { cancel() @@ -772,7 +770,7 @@ func addDiscoveryGroups(g *run.Group, c *httpconfig.Client, interval time.Durati g.Add(func() error { return runutil.Repeat(interval, ctx.Done(), func() error { - return c.Resolve(ctx) + return d.Resolve(ctx) }) }, func(error) { cancel() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 8584492b4f..4187997c57 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -29,7 +29,6 @@ import ( "github.com/thanos-io/thanos/pkg/exthttp" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" - "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/logging" meta "github.com/thanos-io/thanos/pkg/metadata" thanosmodel "github.com/thanos-io/thanos/pkg/model" @@ -228,7 +227,7 @@ func runSidecar( t := exthttp.NewTransport() t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost t.MaxIdleConns = conf.connection.maxIdleConns - c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, httpconfig.ThanosUserAgent) + c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, exthttp.ThanosUserAgent) promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { diff --git a/docs/components/query.md b/docs/components/query.md index 41e2cccf89..68e15e0da9 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -255,6 +255,18 @@ Flags: --enable-feature= ... Comma separated experimental feature names to enable.The current list of features is promql-negative-offset and promql-at-modifier. + --endpoint.config= + Alternative to 'endpoint.config-file' flag + (mutually exclusive). Content of YAML file that + contains set of endpoints (e.g Store API) with + optional TLS options. To enable TLS either use + this option or deprecated ones + --grpc-client-tls* . + --endpoint.config-file= + Path to YAML file that contains set of + endpoints (e.g Store API) with optional TLS + options. To enable TLS either use this option + or deprecated ones --grpc-client-tls* . --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/pkg/alert/config.go b/pkg/alert/config.go index 1572821cf2..00d1466853 100644 --- a/pkg/alert/config.go +++ b/pkg/alert/config.go @@ -13,7 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/relabel" - "github.com/thanos-io/thanos/pkg/httpconfig" + "github.com/thanos-io/thanos/pkg/exthttp" "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" @@ -25,10 +25,10 @@ type AlertingConfig struct { // AlertmanagerConfig represents a client to a cluster of Alertmanager endpoints. type AlertmanagerConfig struct { - HTTPClientConfig httpconfig.ClientConfig `yaml:"http_config"` - EndpointsConfig httpconfig.EndpointsConfig `yaml:",inline"` - Timeout model.Duration `yaml:"timeout"` - APIVersion APIVersion `yaml:"api_version"` + HTTPClientConfig exthttp.ClientConfig `yaml:"http_config"` + EndpointsConfig exthttp.EndpointsConfig `yaml:",inline"` + Timeout model.Duration `yaml:"timeout"` + APIVersion APIVersion `yaml:"api_version"` } // APIVersion represents the API version of the Alertmanager endpoint. @@ -61,10 +61,10 @@ func (v *APIVersion) UnmarshalYAML(unmarshal func(interface{}) error) error { func DefaultAlertmanagerConfig() AlertmanagerConfig { return AlertmanagerConfig{ - EndpointsConfig: httpconfig.EndpointsConfig{ - Scheme: "http", - StaticAddresses: []string{}, - FileSDConfigs: []httpconfig.FileSDConfig{}, + EndpointsConfig: exthttp.EndpointsConfig{ + Scheme: "http", + Addresses: []string{}, + FileSDConfigs: []exthttp.FileSDConfig{}, }, Timeout: model.Duration(time.Second * 10), APIVersion: APIv1, @@ -111,7 +111,7 @@ func BuildAlertmanagerConfig(address string, timeout time.Duration) (Alertmanage break } } - var basicAuth httpconfig.BasicAuth + var basicAuth exthttp.BasicAuth if parsed.User != nil && parsed.User.String() != "" { basicAuth.Username = parsed.User.Username() pw, _ := parsed.User.Password() @@ -119,13 +119,13 @@ func BuildAlertmanagerConfig(address string, timeout time.Duration) (Alertmanage } return AlertmanagerConfig{ - HTTPClientConfig: httpconfig.ClientConfig{ + HTTPClientConfig: exthttp.ClientConfig{ BasicAuth: basicAuth, }, - EndpointsConfig: httpconfig.EndpointsConfig{ - PathPrefix: parsed.Path, - Scheme: scheme, - StaticAddresses: []string{host}, + EndpointsConfig: exthttp.EndpointsConfig{ + PathPrefix: parsed.Path, + Scheme: scheme, + Addresses: []string{host}, }, Timeout: model.Duration(timeout), APIVersion: APIv1, diff --git a/pkg/extgrpc/client.go b/pkg/extgrpc/client.go deleted file mode 100644 index d6a6dc8b3d..0000000000 --- a/pkg/extgrpc/client.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package extgrpc - -import ( - "math" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - "github.com/thanos-io/thanos/pkg/tls" - "github.com/thanos-io/thanos/pkg/tracing" -) - -// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client. -func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { - grpcMets := grpc_prometheus.NewClientMetrics() - grpcMets.EnableClientHandlingTimeHistogram( - grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}), - ) - dialOpts := []grpc.DialOption{ - // We want to make sure that we can receive huge gRPC messages from storeAPI. - // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. - // Current limit is ~2GB. - // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), - grpc.WithUnaryInterceptor( - grpc_middleware.ChainUnaryClient( - grpcMets.UnaryClientInterceptor(), - tracing.UnaryClientInterceptor(tracer), - ), - ), - grpc.WithStreamInterceptor( - grpc_middleware.ChainStreamClient( - grpcMets.StreamClientInterceptor(), - tracing.StreamClientInterceptor(tracer), - ), - ), - } - if reg != nil { - reg.MustRegister(grpcMets) - } - - if !secure { - return append(dialOpts, grpc.WithInsecure()), nil - } - - level.Info(logger).Log("msg", "enabling client to server TLS") - - tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName, skipVerify) - if err != nil { - return nil, err - } - return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil -} diff --git a/pkg/extgrpc/config.go b/pkg/extgrpc/config.go new file mode 100644 index 0000000000..cb870c6cee --- /dev/null +++ b/pkg/extgrpc/config.go @@ -0,0 +1,53 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package extgrpc + +import ( + "strings" + + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/exthttp" +) + +// Config is a structure that allows pointing to various gRPC endpoint, e.g Querier connecting to StoreAPI. +type Config struct { + GRPCClientConfig exthttp.ClientConfig `yaml:"grpc_config"` + EndpointsConfig EndpointsConfig `yaml:",inline"` +} + +func DefaultConfig() Config { + return Config{ + EndpointsConfig: EndpointsConfig{ + Addresses: []string{}, + FileSDConfigs: []exthttp.FileSDConfig{}, + }, + } +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultConfig() + type plain Config + return unmarshal((*plain)(c)) +} + +// BuildConfig returns a configuration from a static addresses. +func BuildConfig(addrs []string) ([]Config, error) { + configs := make([]Config, 0, len(addrs)) + for i, addr := range addrs { + if addr == "" { + return nil, errors.Errorf("static address cannot be empty, but was at index %d", i) + } + if strings.Contains(addr, "/") { + return nil, errors.Errorf("gRPC address either has HTTP scheme or path. We expect only host+port with optional dns+ dnssrv+ prefix in it. Got %v", addr) + } + + configs = append(configs, Config{ + EndpointsConfig: EndpointsConfig{ + Addresses: []string{addr}, + }, + }) + } + return configs, nil +} diff --git a/pkg/extgrpc/grpc.go b/pkg/extgrpc/grpc.go new file mode 100644 index 0000000000..d62fda9763 --- /dev/null +++ b/pkg/extgrpc/grpc.go @@ -0,0 +1,98 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package extgrpc + +import ( + "math" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/exthttp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/thanos-io/thanos/pkg/tls" + "github.com/thanos-io/thanos/pkg/tracing" +) + +// EndpointsConfig configures a cluster of gRPC endpoints from static addresses and +// file service discovery. Similar to exthttp.EndpointConfig but for gRPC. +type EndpointsConfig struct { + // List of addresses with DNS prefixes. + Addresses []string `yaml:"addresses"` + // List of file configurations (our FileSD supports different DNS lookups). + FileSDConfigs []exthttp.FileSDConfig `yaml:"file_sd_configs"` +} + +// NewDiscoverer returns a new exthttp.Discoverer. +func NewDiscoverer(logger log.Logger, cfg EndpointsConfig, provider exthttp.AddressProvider) (*exthttp.Discoverer, error) { + return exthttp.NewDiscoverer(logger, exthttp.EndpointsConfig{ + Addresses: cfg.Addresses, + FileSDConfigs: cfg.FileSDConfigs, + }, provider) +} + +// TODO: Description +func ClientGRPCMetrics(reg *prometheus.Registry, clientName string) *grpc_prometheus.ClientMetrics { + if clientName == "" { + clientName = "default" + } + + grpcMets := grpc_prometheus.NewClientMetrics(grpc_prometheus.WithConstLabels(map[string]string{"client": clientName})) + grpcMets.EnableClientHandlingTimeHistogram( + grpc_prometheus.WithHistogramConstLabels(map[string]string{"client": clientName}), + grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}), + ) + if reg != nil { + reg.MustRegister(grpcMets) + } + + return grpcMets +} + +// ClientGRPCOpts creates gRPC dial options from config.. +func ClientGRPCOpts(logger log.Logger, tracer opentracing.Tracer, metrics *grpc_prometheus.ClientMetrics, config exthttp.ClientConfig) ([]grpc.DialOption, error) { + dialOpts := []grpc.DialOption{ + // We want to make sure that we can receive huge gRPC messages from storeAPI. + // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. + // Current limit is ~2GB. + // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + grpc.WithUnaryInterceptor( + grpc_middleware.ChainUnaryClient( + metrics.UnaryClientInterceptor(), + tracing.UnaryClientInterceptor(tracer), + ), + ), + grpc.WithStreamInterceptor( + grpc_middleware.ChainStreamClient( + metrics.StreamClientInterceptor(), + tracing.StreamClientInterceptor(tracer), + ), + ), + } + + // TODO(bwplotka): Add here support for non TLS exthttp.ClientConfig options, for not block them. + if (config.BasicAuth != exthttp.BasicAuth{} || config.BearerToken != "" || config.BearerTokenFile != "" || config.ProxyURL != "") { + return nil, errors.New("basic auth, bearer token and proxy URL options are currently not implemented") + + } + + if (config.TLSConfig == exthttp.TLSConfig{}) { + return append(dialOpts, grpc.WithInsecure()), nil + } + + level.Info(logger).Log("msg", "enabling client to server TLS") + + tlsCfg, err := tls.NewClientConfig(logger, config.TLSConfig.CertFile, config.TLSConfig.KeyFile, config.TLSConfig.CAFile, config.TLSConfig.ServerName, config.TLSConfig.InsecureSkipVerify) + if err != nil { + return nil, err + } + return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil +} diff --git a/pkg/httpconfig/config.go b/pkg/exthttp/config.go similarity index 88% rename from pkg/httpconfig/config.go rename to pkg/exthttp/config.go index 3280e33378..eedb3b8566 100644 --- a/pkg/httpconfig/config.go +++ b/pkg/exthttp/config.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package httpconfig +package exthttp import ( "fmt" @@ -22,9 +22,9 @@ type Config struct { func DefaultConfig() Config { return Config{ EndpointsConfig: EndpointsConfig{ - Scheme: "http", - StaticAddresses: []string{}, - FileSDConfigs: []FileSDConfig{}, + Scheme: "http", + Addresses: []string{}, + FileSDConfigs: []FileSDConfig{}, }, } } @@ -65,9 +65,9 @@ func BuildConfig(addrs []string) ([]Config, error) { } configs = append(configs, Config{ EndpointsConfig: EndpointsConfig{ - Scheme: u.Scheme, - StaticAddresses: []string{u.Host}, - PathPrefix: u.Path, + Scheme: u.Scheme, + Addresses: []string{u.Host}, + PathPrefix: u.Path, }, }) } diff --git a/pkg/httpconfig/config_test.go b/pkg/exthttp/config_test.go similarity index 78% rename from pkg/httpconfig/config_test.go rename to pkg/exthttp/config_test.go index fe876e859b..60727995a6 100644 --- a/pkg/httpconfig/config_test.go +++ b/pkg/exthttp/config_test.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package httpconfig +package exthttp import ( "testing" @@ -21,8 +21,8 @@ func TestBuildConfig(t *testing.T) { addresses: []string{"localhost:9093"}, expected: []Config{{ EndpointsConfig: EndpointsConfig{ - StaticAddresses: []string{"localhost:9093"}, - Scheme: "http", + Addresses: []string{"localhost:9093"}, + Scheme: "http", }, }}, }, @@ -32,15 +32,15 @@ func TestBuildConfig(t *testing.T) { expected: []Config{ { EndpointsConfig: EndpointsConfig{ - StaticAddresses: []string{"localhost:9093"}, - Scheme: "http", + Addresses: []string{"localhost:9093"}, + Scheme: "http", }, }, { EndpointsConfig: EndpointsConfig{ - StaticAddresses: []string{"localhost:9094"}, - Scheme: "http", - PathPrefix: "/prefix", + Addresses: []string{"localhost:9094"}, + Scheme: "http", + PathPrefix: "/prefix", }, }, }, @@ -50,8 +50,8 @@ func TestBuildConfig(t *testing.T) { addresses: []string{"http://localhost:9093"}, expected: []Config{{ EndpointsConfig: EndpointsConfig{ - StaticAddresses: []string{"localhost:9093"}, - Scheme: "http", + Addresses: []string{"localhost:9093"}, + Scheme: "http", }, }}, }, @@ -60,8 +60,8 @@ func TestBuildConfig(t *testing.T) { addresses: []string{"https://localhost:9093"}, expected: []Config{{ EndpointsConfig: EndpointsConfig{ - StaticAddresses: []string{"localhost:9093"}, - Scheme: "https", + Addresses: []string{"localhost:9093"}, + Scheme: "https", }, }}, }, diff --git a/pkg/httpconfig/http.go b/pkg/exthttp/http.go similarity index 83% rename from pkg/httpconfig/http.go rename to pkg/exthttp/http.go index b00204e425..b2b7ca6fc3 100644 --- a/pkg/httpconfig/http.go +++ b/pkg/exthttp/http.go @@ -1,8 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -// Package httpconfig is a wrapper around github.com/prometheus/common/config. -package httpconfig +package exthttp import ( "context" @@ -37,9 +36,6 @@ type ClientConfig struct { ProxyURL string `yaml:"proxy_url"` // TLSConfig to use to connect to the targets. TLSConfig TLSConfig `yaml:"tls_config"` - // ClientMetrics contains metrics that will be used to instrument - // the client that will be created with this config. - ClientMetrics *extpromhttp.ClientMetrics `yaml:"-"` } // TLSConfig configures TLS connections. @@ -69,7 +65,7 @@ func (b BasicAuth) IsZero() bool { } // NewHTTPClient returns a new HTTP client. -func NewHTTPClient(cfg ClientConfig, name string) (*http.Client, error) { +func NewHTTPClient(cfg ClientConfig, name string, metrics *extpromhttp.ClientMetrics) (*http.Client, error) { httpClientConfig := config_util.HTTPClientConfig{ BearerToken: config_util.Secret(cfg.BearerToken), BearerTokenFile: cfg.BearerTokenFile, @@ -107,8 +103,8 @@ func NewHTTPClient(cfg ClientConfig, name string) (*http.Client, error) { tripper := client.Transport - if cfg.ClientMetrics != nil { - tripper = extpromhttp.InstrumentedRoundTripper(tripper, cfg.ClientMetrics) + if metrics != nil { + tripper = extpromhttp.InstrumentedRoundTripper(tripper, metrics) } client.Transport = &userAgentRoundTripper{name: ThanosUserAgent, rt: tripper} @@ -145,7 +141,7 @@ func (u userAgentRoundTripper) RoundTrip(r *http.Request) (*http.Response, error // file service discovery. type EndpointsConfig struct { // List of addresses with DNS prefixes. - StaticAddresses []string `yaml:"static_configs"` + Addresses []string `yaml:"static_configs"` // List of file configurations (our FileSD supports different DNS lookups). FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` @@ -162,7 +158,7 @@ type FileSDConfig struct { RefreshInterval model.Duration `yaml:"refresh_interval"` } -func (c FileSDConfig) convert() (file.SDConfig, error) { +func (c FileSDConfig) Convert() (file.SDConfig, error) { var fileSDConfig file.SDConfig b, err := yaml.Marshal(c) if err != nil { @@ -177,69 +173,62 @@ type AddressProvider interface { Addresses() []string } -// Client represents a client that can send requests to a cluster of HTTP-based endpoints. -type Client struct { - logger log.Logger - - httpClient *http.Client - scheme string - prefix string - +// Discoverer allows managing and discovering group of targets composed form static and dynamic (file SD) HTTP addresses. It works also for +// gRPC addresses (which are HTTP on underlying protocol). +type Discoverer struct { staticAddresses []string fileSDCache *cache.Cache fileDiscoverers []*file.Discovery + scheme string + prefix string + provider AddressProvider } -// NewClient returns a new Client. -func NewClient(logger log.Logger, cfg EndpointsConfig, client *http.Client, provider AddressProvider) (*Client, error) { +// NewDiscoverer returns a new Discoverer. +func NewDiscoverer(logger log.Logger, cfg EndpointsConfig, provider AddressProvider) (*Discoverer, error) { if logger == nil { logger = log.NewNopLogger() } var discoverers []*file.Discovery for _, sdCfg := range cfg.FileSDConfigs { - fileSDCfg, err := sdCfg.convert() + fileSDCfg, err := sdCfg.Convert() if err != nil { return nil, err } discoverers = append(discoverers, file.NewDiscovery(&fileSDCfg, logger)) } - return &Client{ - logger: logger, - httpClient: client, + return &Discoverer{ scheme: cfg.Scheme, prefix: cfg.PathPrefix, - staticAddresses: cfg.StaticAddresses, + staticAddresses: cfg.Addresses, fileSDCache: cache.New(), fileDiscoverers: discoverers, provider: provider, }, nil } -// Do executes an HTTP request with the underlying HTTP client. -func (c *Client) Do(req *http.Request) (*http.Response, error) { - return c.httpClient.Do(req) -} - // Endpoints returns the list of known endpoints. -func (c *Client) Endpoints() []*url.URL { +func (c *Discoverer) Endpoints() []*url.URL { var urls []*url.URL for _, addr := range c.provider.Addresses() { - urls = append(urls, - &url.URL{ - Scheme: c.scheme, - Host: addr, - Path: path.Join("/", c.prefix), - }, - ) + u := &url.URL{ + Scheme: c.scheme, + Host: addr, + } + + if c.prefix != "" { + u.Path = path.Join("/", c.prefix) + } + urls = append(urls, u) } return urls } -// Discover runs the service to discover endpoints until the given context is done. -func (c *Client) Discover(ctx context.Context) { +// Discover runs the service to discover endpoints from file SD until the given context is done. +func (c *Discoverer) Discover(ctx context.Context) { var wg sync.WaitGroup ch := make(chan []*targetgroup.Group) @@ -269,6 +258,30 @@ func (c *Client) Discover(ctx context.Context) { } // Resolve refreshes and resolves the list of targets. -func (c *Client) Resolve(ctx context.Context) error { +func (c *Discoverer) Resolve(ctx context.Context) error { return c.provider.Resolve(ctx, append(c.fileSDCache.Addresses(), c.staticAddresses...)) } + +// Client represents a client that can send requests to a cluster of HTTP-based endpoints. +type Client struct { + *Discoverer + + httpClient *http.Client +} + +// NewClient returns a new Client. +func NewClient(logger log.Logger, cfg EndpointsConfig, client *http.Client, provider AddressProvider) (*Client, error) { + d, err := NewDiscoverer(logger, cfg, provider) + if err != nil { + return nil, err + } + return &Client{ + httpClient: client, + Discoverer: d, + }, nil +} + +// Do executes an HTTP request with the underlying HTTP client. +func (c *Client) Do(req *http.Request) (*http.Response, error) { + return c.httpClient.Do(req) +} diff --git a/pkg/query/config.go b/pkg/query/config.go new file mode 100644 index 0000000000..3ef301047c --- /dev/null +++ b/pkg/query/config.go @@ -0,0 +1,96 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package query + +import ( + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/exthttp" + "gopkg.in/yaml.v2" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/discovery/file" +) + +// EndpointConfig represents the configuration of a set of gRPC Store API endpoints. +// If `tls_config` is omitted then TLS will not be used. +// Configs must have a name and they must be unique. +type EndpointConfig struct { + extgrpc.Config `yaml:",inline"` + + Mode EndpointMode `yaml:"mode"` + + // TODO(bwplotka): Allow filtering by API (e.g someone wants to have endpoint that serves Store and Exemplar API but want to connect to Store only. +} + +type EndpointMode string + +const ( + DefaultEndpointMode EndpointMode = "" + StrictEndpointMode EndpointMode = "strict" +) + +// LoadConfig returns list of per-endpoint TLS config. +func LoadConfig(confYAML []byte, endpointAddrs, strictEndpointAddrs []string, globalFileSDConfig *file.SDConfig, globalTLSConfig exthttp.TLSConfig) ([]EndpointConfig, error) { + var endpointConfig []EndpointConfig + + if len(confYAML) > 0 { + if err := yaml.UnmarshalStrict(confYAML, &endpointConfig); err != nil { + return nil, err + } + + // Checking if wrong mode is provided. + for _, config := range endpointConfig { + if config.Mode != StrictEndpointMode && config.Mode != DefaultEndpointMode { + return nil, errors.Errorf("%s is wrong mode", config.Mode) + } + } + + // No dynamic endpoints in strict mode. + for _, config := range endpointConfig { + if config.Mode == StrictEndpointMode && len(config.EndpointsConfig.FileSDConfigs) != 0 { + return nil, errors.Errorf("no sd-files allowed in strict mode") + } + } + } + + // Adding --store, rule, metadata, target, exemplar and --store.sd-files, if provided. + // Global TLS config applies until deprecated. + if len(endpointAddrs) > 0 || globalFileSDConfig != nil { + cfg := EndpointConfig{} + cfg.GRPCClientConfig.TLSConfig = globalTLSConfig + cfg.EndpointsConfig.Addresses = endpointAddrs + if globalFileSDConfig != nil { + cfg.EndpointsConfig.FileSDConfigs = []exthttp.FileSDConfig{ + { + Files: globalFileSDConfig.Files, + RefreshInterval: globalFileSDConfig.RefreshInterval, + }, + } + } + endpointConfig = append(endpointConfig, cfg) + } + + // Adding --store-strict endpoints, if provided. + // Global TLS config applies until deprecated. + if len(strictEndpointAddrs) > 0 { + cfg := EndpointConfig{} + cfg.GRPCClientConfig.TLSConfig = globalTLSConfig + cfg.EndpointsConfig.Addresses = strictEndpointAddrs + cfg.Mode = StrictEndpointMode + endpointConfig = append(endpointConfig, cfg) + } + + // Checking for duplicates. + // NOTE: This does not check dynamic endpoints of course. + allEndpoints := make(map[string]struct{}) + for _, config := range endpointConfig { + for _, addr := range config.EndpointsConfig.Addresses { + if _, exists := allEndpoints[addr]; exists { + return nil, errors.Errorf("%s endpoint provided more than once", addr) + } + allEndpoints[addr] = struct{}{} + } + } + return endpointConfig, nil +} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 727299db1f..6c6c96ca8b 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" + "github.com/thanos-io/thanos/pkg/exthttp" "github.com/thanos-io/thanos/pkg/info/infopb" "google.golang.org/grpc" @@ -34,14 +35,16 @@ const ( unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist" noMetadataEndpointMessage = "cannot obtain metadata: neither info nor store client found" - // Default minimum and maximum time values used by Prometheus when they are not passed as query parameter. + // MinTime is a default minimum time used by Prometheus when it's not passed as query parameter. MinTime = -9223309901257974 + // MaxTime is a default maximum time used by Prometheus when it's not passed as query parameter. MaxTime = 9223309901257974 ) type EndpointSpec interface { - // Addr returns Thanos API Address for the endpoint spec. It is used as ID for endpoint. + // Addr returns host port address for the endpoint. It is used as ID for endpoint. Addr() string + // Metadata returns current labels, component type and min, max ranges for store. // It can change for every call for this method. // If metadata call fails we assume that store is no longer accessible and we should not use it. @@ -167,7 +170,13 @@ func (es *grpcEndpointSpec) fillExpectedAPIs(componentType component.Component, Rules: &infopb.RulesInfo{}, } default: - return infopb.InfoResponse{} + // This might break non-native StoreAPI implementation, so assume Store API too. + return infopb.InfoResponse{ + Store: &infopb.StoreInfo{ + MinTime: mintime, + MaxTime: maxTime, + }, + } } } @@ -256,15 +265,63 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) { } } -// EndpointSet maintains a set of active Thanos endpoints. It is backed up by Endpoint Specifications that are dynamically fetched on -// every Update() call. +// EndpointGroup groups common endpoints (having the same gRPC dial Opts and coming from the same DNS discovery) together. +// It is backed up by set of *exthttp.Discoverer structs can give us addresses. +type EndpointGroup struct { + d *exthttp.Discoverer + + dialOpts []grpc.DialOption +} + +func NewEndpointGroup(d *exthttp.Discoverer, dialOpts []grpc.DialOption) *EndpointGroup { + return &EndpointGroup{ + d: d, + dialOpts: dialOpts, + } +} + +func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.EndpointSpec) []query.EndpointSpec { + set := make(map[string]query.EndpointSpec) + for _, spec := range specs { + addr := spec.Addr() + if _, ok := set[addr]; ok { + level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) + duplicatedStores.Inc() + } + set[addr] = spec + } + deduplicated := make([]query.EndpointSpec, 0, len(set)) + for _, value := range set { + deduplicated = append(deduplicated, value) + } + return deduplicated +} + +// Spec returns current set of endpoint specs. +// Note that endpoint specifications return can change dynamically. If some component is missing from the list, we assume it is no longer +// accessible and we close gRPC client for it, unless it is strict. +func (g *EndpointGroup) Spec() []EndpointSpec { + g.d.Endpoints() // TODO: ... + + /* + Something like.. + for _, dnsProvider := range []*dns.Provider{dnsStoreProvider, dnsRuleProvider, dnsExemplarProvider, dnsMetadataProvider, dnsTargetProvider} { + var tmpSpecs []query.EndpointSpec + + for _, addr := range dnsProvider.Addresses() { + tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false)) + } + tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs) + specs = append(specs, tmpSpecs...) + } + */ +} + +// EndpointSet maintains a set of active Thanos endpoints groups. type EndpointSet struct { logger log.Logger - // Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer - // accessible and we close gRPC client for it, unless it is strict. - endpointSpec func() []EndpointSpec - dialOpts []grpc.DialOption + groups []*EndpointGroup gRPCInfoCallTimeout time.Duration updateMtx sync.Mutex @@ -284,10 +341,10 @@ type EndpointSet struct { func NewEndpointSet( logger log.Logger, reg *prometheus.Registry, - endpointSpecs func() []EndpointSpec, - dialOpts []grpc.DialOption, + groups []*EndpointGroup, unhealthyEndpointTimeout time.Duration, ) *EndpointSet { + // TODO(bwplotka): Consider adding provider per config name, for instrumentation purposes, but only if strongly requested. endpointsMetric := newEndpointSetNodeCollector() if reg != nil { reg.MustRegister(endpointsMetric) @@ -297,19 +354,14 @@ func NewEndpointSet( logger = log.NewNopLogger() } - if endpointSpecs == nil { - endpointSpecs = func() []EndpointSpec { return nil } - } - es := &EndpointSet{ logger: log.With(logger, "component", "endpointset"), - dialOpts: dialOpts, endpointsMetric: endpointsMetric, gRPCInfoCallTimeout: 5 * time.Second, endpoints: make(map[string]*endpointRef), endpointStatuses: make(map[string]*EndpointStatus), unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: endpointSpecs, + groups: groups, } return es } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 5dc7eefa45..d941e59d69 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -300,7 +300,7 @@ func TestEndpointSet_Update(t *testing.T) { // Testing if duplicates can cause weird results. discoveredEndpointAddr = append(discoveredEndpointAddr, discoveredEndpointAddr[0]) - endpointSet := NewEndpointSet(nil, nil, + endpointSet := NewEndpointSet(nil, nil, "", func() (specs []EndpointSpec) { for _, addr := range discoveredEndpointAddr { specs = append(specs, NewGRPCEndpointSpec(addr, false)) @@ -683,7 +683,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { endpoints.CloseOne(initialEndpointAddr[0]) endpoints.CloseOne(initialEndpointAddr[1]) - endpointSet := NewEndpointSet(nil, nil, + endpointSet := NewEndpointSet(nil, nil, "", func() (specs []EndpointSpec) { for _, addr := range initialEndpointAddr { specs = append(specs, NewGRPCEndpointSpec(addr, false)) @@ -801,7 +801,7 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { staticEndpointAddr := discoveredEndpointAddr[0] slowStaticEndpointAddr := discoveredEndpointAddr[2] - endpointSet := NewEndpointSet(nil, nil, func() (specs []EndpointSpec) { + endpointSet := NewEndpointSet(nil, nil, "", func() (specs []EndpointSpec) { return []EndpointSpec{ NewGRPCEndpointSpec(discoveredEndpointAddr[0], true), NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), @@ -977,7 +977,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { t.Run(tc.name, func(t *testing.T) { currentState := 0 - endpointSet := NewEndpointSet(nil, nil, + endpointSet := NewEndpointSet(nil, nil, "", func() []EndpointSpec { if tc.states[currentState].endpointSpec == nil { return nil diff --git a/pkg/ui/query.go b/pkg/ui/query.go index 1778dc5557..73f385bc81 100644 --- a/pkg/ui/query.go +++ b/pkg/ui/query.go @@ -22,7 +22,7 @@ import ( type Query struct { *BaseUI - endpointSet *query.EndpointSet + endpointStatus func() []query.EndpointStatus externalPrefix, prefixHeader string @@ -32,7 +32,7 @@ type Query struct { now func() model.Time } -func NewQueryUI(logger log.Logger, endpointSet *query.EndpointSet, externalPrefix, prefixHeader string) *Query { +func NewQueryUI(logger log.Logger, endpointStatus func() []query.EndpointStatus, externalPrefix, prefixHeader string) *Query { tmplVariables := map[string]string{ "Component": component.Query.String(), } @@ -43,7 +43,7 @@ func NewQueryUI(logger log.Logger, endpointSet *query.EndpointSet, externalPrefi return &Query{ BaseUI: NewBaseUI(logger, "query_menu.html", tmplFuncs, tmplVariables, externalPrefix, prefixHeader, component.Query), - endpointSet: endpointSet, + endpointStatus: endpointStatus, externalPrefix: externalPrefix, prefixHeader: prefixHeader, cwd: runtimeInfo().CWD, @@ -112,7 +112,7 @@ func (q *Query) status(w http.ResponseWriter, r *http.Request) { func (q *Query) stores(w http.ResponseWriter, r *http.Request) { prefix := GetWebPrefix(q.logger, q.externalPrefix, q.prefixHeader, r) statuses := make(map[component.Component][]query.EndpointStatus) - for _, status := range q.endpointSet.GetEndpointStatus() { + for _, status := range q.endpointStatus() { statuses[status.ComponentType] = append(statuses[status.ComponentType], status) } diff --git a/test/e2e/certs/create.sh b/test/e2e/certs/create.sh new file mode 100644 index 0000000000..ff231df9f8 --- /dev/null +++ b/test/e2e/certs/create.sh @@ -0,0 +1,71 @@ +#!/bin/zsh + +openssl req \ + -new \ + -x509 \ + -nodes \ + -days 99999 \ + -subj '/CN=my-ca' \ + -keyout testca.key \ + -out testca.crt + +openssl genrsa -out testserver.key 2048 +openssl genrsa -out testclient.key 2048 + +openssl req \ + -new \ + -key testserver.key \ + -subj "/CN=e2e_test_query_config-sidecar" \ + -out e2e_test_query_config_server.csr + +openssl req \ + -new \ + -key testclient.key \ + -subj "/CN=e2e_test_query_config-querier" \ + -out e2e_test_query_config_client.csr + +openssl x509 \ + -req \ + -in e2e_test_query_config_server.csr \ + -CA testca.crt \ + -CAkey testca.key \ + -CAcreateserial \ + -days 99999 \ + -extfile <( + cat <<-EOF +basicConstraints = CA:FALSE +nsCertType = server +nsComment = "OpenSSL Generated Server Certificate" +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid,issuer:always +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth +subjectAltName = @alt_names +[alt_names] +DNS.1 = e2e_test_query_config-sidecar +EOF + ) \ + -out e2e_test_query_config_server.crt + +openssl x509 \ + -req \ + -in e2e_test_query_config_client.csr \ + -CA testca.crt \ + -CAkey testca.key \ + -CAcreateserial \ + -days 99999 \ + -extfile <( + cat <<-EOF +basicConstraints = CA:FALSE +nsCertType = client +nsComment = "OpenSSL Generated Client Certificate" +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid,issuer:always +keyUsage = critical, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth +subjectAltName = @alt_names +[alt_names] +DNS.1 = e2e_test_query_config-querier +EOF + ) \ + -out e2e_test_query_config_client.crt diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b0d9b1b3ad..65a1e9866c 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,14 +17,13 @@ import ( e2edb "github.com/efficientgo/e2e/db" "github.com/efficientgo/tools/core/pkg/backoff" "github.com/pkg/errors" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/relabel" "github.com/thanos-io/thanos/pkg/httpconfig" "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/queryfrontend" "github.com/thanos-io/thanos/pkg/receive" ) @@ -103,29 +102,33 @@ func NewPrometheus(e e2e.Environment, name, config, promImage string, enableFeat return prom, container, nil } -func NewPrometheusWithSidecar(e e2e.Environment, name, config, promImage string, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { - return NewPrometheusWithSidecarCustomImage(e, name, config, promImage, DefaultImage(), enableFeatures...) +func NewPrometheusWithSidecar(e e2e.Environment, name, config, promImage string, extraArgs []string, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { + return NewPrometheusWithSidecarCustomImage(e, name, config, promImage, extraArgs, DefaultImage(), enableFeatures...) } -func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, config, promImage string, sidecarImage string, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { +func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, config, promImage string, extraArgs []string, sidecarImage string, enableFeatures ...string) (*e2e.InstrumentedRunnable, *e2e.InstrumentedRunnable, error) { prom, dataDir, err := NewPrometheus(e, name, config, promImage, enableFeatures...) if err != nil { return nil, nil, err } + args := e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("sidecar-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--prometheus.url": "http://" + prom.InternalEndpoint("http"), + "--tsdb.path": dataDir, + "--log.level": infoLogLevel, + }) + + args = append(args, extraArgs...) + sidecar := NewService( e, fmt.Sprintf("sidecar-%s", name), - sidecarImage, - e2e.NewCommand("sidecar", e2e.BuildArgs(map[string]string{ - "--debug.name": fmt.Sprintf("sidecar-%v", name), - "--grpc-address": ":9091", - "--grpc-grace-period": "0s", - "--http-address": ":8080", - "--prometheus.url": "http://" + prom.InternalEndpoint("http"), - "--tsdb.path": dataDir, - "--log.level": infoLogLevel, - })...), + DefaultImage(), + e2e.NewCommand("sidecar", args...), e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), 8080, 9091, @@ -141,13 +144,15 @@ type QuerierBuilder struct { routePrefix string externalPrefix string image string + fileSDPath string + + storeAddresses []string + ruleAddresses []string + metadataAddresses []string + targetAddresses []string + exemplarAddresses []string - storeAddresses []string - fileSDStoreAddresses []string - ruleAddresses []string - metadataAddresses []string - targetAddresses []string - exemplarAddresses []string + endpointConfig []query.EndpointConfig tracingConfig string } @@ -166,9 +171,8 @@ func (q *QuerierBuilder) WithImage(image string) *QuerierBuilder { q.image = image return q } - -func (q *QuerierBuilder) WithFileSDStoreAddresses(fileSDStoreAddresses ...string) *QuerierBuilder { - q.fileSDStoreAddresses = fileSDStoreAddresses +func (q *QuerierBuilder) WithFileSDStoreAddresses(fileSDPath string) *QuerierBuilder { + q.fileSDPath = fileSDPath return q } @@ -252,6 +256,11 @@ func (q *QuerierBuilder) Build() (*e2e.InstrumentedRunnable, error) { return querier, nil } +func (q *QuerierBuilder) WithEndpointConfig(endpointConfig []query.EndpointConfig) *QuerierBuilder { + q.endpointConfig = endpointConfig + return q +} + func (q *QuerierBuilder) collectArgs() ([]string, error) { const replicaLabel = "replica" @@ -286,28 +295,8 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { args = append(args, "--exemplar="+addr) } - if len(q.fileSDStoreAddresses) > 0 { - queryFileSDDir := filepath.Join(q.sharedDir, "data", "querier", q.name) - container := filepath.Join(ContainerSharedDir, "data", "querier", q.name) - if err := os.MkdirAll(queryFileSDDir, 0750); err != nil { - return nil, errors.Wrap(err, "create query dir failed") - } - - fileSD := []*targetgroup.Group{{}} - for _, a := range q.fileSDStoreAddresses { - fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)}) - } - - b, err := yaml.Marshal(fileSD) - if err != nil { - return nil, err - } - - if err := ioutil.WriteFile(queryFileSDDir+"/filesd.yaml", b, 0600); err != nil { - return nil, errors.Wrap(err, "creating query SD config failed") - } - - args = append(args, "--store.sd-files="+filepath.Join(container, "filesd.yaml")) + if q.fileSDPath != "" { + args = append(args, "--store.sd-files="+q.fileSDPath) } if q.routePrefix != "" { @@ -322,6 +311,14 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { args = append(args, "--tracing.config="+q.tracingConfig) } + if (len(q.storeAddresses) == 0 && q.fileSDPath == "") && len(q.endpointConfig) > 0 { + endpointCfgBytes, err := yaml.Marshal(q.endpointConfig) + if err != nil { + return nil, errors.Wrapf(err, "generate endpoint config file: %v", q.endpointConfig) + } + args = append(args, "--endpoint.config="+string(endpointCfgBytes)) + } + return args, nil } diff --git a/test/e2e/exemplars_api_test.go b/test/e2e/exemplars_api_test.go index 13aaffca85..6f3e83dd2e 100644 --- a/test/e2e/exemplars_api_test.go +++ b/test/e2e/exemplars_api_test.go @@ -44,6 +44,7 @@ func TestExemplarsAPI_Fanout(t *testing.T) { "prom1", defaultPromConfig("ha", 0, "", "", "localhost:9090", qUnitiated.InternalEndpoint("http")), e2ethanos.DefaultPrometheusImage(), + nil, e2ethanos.FeatureExemplarStorage, ) testutil.Ok(t, err) @@ -52,6 +53,7 @@ func TestExemplarsAPI_Fanout(t *testing.T) { "prom2", defaultPromConfig("ha", 1, "", "", "localhost:9090", qUnitiated.InternalEndpoint("http")), e2ethanos.DefaultPrometheusImage(), + nil, e2ethanos.FeatureExemplarStorage, ) testutil.Ok(t, err) diff --git a/test/e2e/metadata_api_test.go b/test/e2e/metadata_api_test.go index 096560e64e..f7c44c0772 100644 --- a/test/e2e/metadata_api_test.go +++ b/test/e2e/metadata_api_test.go @@ -32,6 +32,7 @@ func TestMetadataAPI_Fanout(t *testing.T) { "prom1", defaultPromConfig("ha", 0, "", "", "localhost:9090", "sidecar-prom1:8080"), e2ethanos.DefaultPrometheusImage(), + nil, ) testutil.Ok(t, err) @@ -40,6 +41,7 @@ func TestMetadataAPI_Fanout(t *testing.T) { "prom2", defaultPromConfig("ha", 1, "", "", "localhost:9090", "sidecar-prom2:8080"), e2ethanos.DefaultPrometheusImage(), + nil, ) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 6635555ed0..d95495d387 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -32,7 +32,7 @@ func TestQueryFrontend(t *testing.T) { now := time.Now() - prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar(e, "1", defaultPromConfig("test", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar(e, "1", defaultPromConfig("test", 0, "", ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) @@ -396,7 +396,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { now := time.Now() - prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar(e, "1", defaultPromConfig("test", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar(e, "1", defaultPromConfig("test", 0, "", ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 06c6208763..e19bb14a25 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -6,9 +6,11 @@ package e2e_test import ( "context" "fmt" + "io/ioutil" "net/http/httptest" "net/url" "os" + "os/exec" "path/filepath" "sort" "strings" @@ -21,22 +23,29 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/targets/targetspb" + "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) // NOTE: by using aggregation all results are now unsorted. -const queryUpWithoutInstance = "sum(up) without (instance)" +const ( + queryUpWithoutInstance = "sum(up) without (instance)" + ContainerSharedDir = "/shared" +) // defaultPromConfig returns Prometheus config that sets Prometheus to: // * expose 2 external labels, source and replica. @@ -94,6 +103,31 @@ func sortResults(res model.Vector) { }) } +func createSDFile(sharedDir string, name string, fileSDStoreAddresses []string) (string, error) { + if len(fileSDStoreAddresses) > 0 { + queryFileSDDir := filepath.Join(sharedDir, "data", "querier", name) + container := filepath.Join(ContainerSharedDir, "data", "querier", name) + if err := os.MkdirAll(queryFileSDDir, 0750); err != nil { + return "", errors.Wrap(err, "create query dir failed") + } + + fileSD := []*targetgroup.Group{{}} + for _, a := range fileSDStoreAddresses { + fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)}) + } + + b, err := yaml.Marshal(fileSD) + if err != nil { + return "", err + } + + if err := ioutil.WriteFile(queryFileSDDir+"/filesd.yaml", b, 0600); err != nil { + return "", errors.Wrap(err, "creating query SD config failed") + } + return filepath.Join(container, "filesd.yaml"), nil + } + return "", nil +} func TestQuery(t *testing.T) { t.Parallel() @@ -106,19 +140,153 @@ func TestQuery(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(receiverRunnable)) - prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) - prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage()) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) - prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(e, "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2ethanos.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage()) + prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(e, "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2ethanos.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) - prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(e, "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2ethanos.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage()) + prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(e, "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2ethanos.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) + fileSDPath, err := createSDFile(e.SharedDir(), "1", []string{sidecar3.InternalEndpoint("grpc"), sidecar4.InternalEndpoint("grpc")}) + testutil.Ok(t, err) + // Querier. Both fileSD and directly by flags. q, err := e2ethanos.NewQuerierBuilder(e, "1", sidecar1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc"), receiver.InternalEndpoint("grpc")). - WithFileSDStoreAddresses(sidecar3.InternalEndpoint("grpc"), sidecar4.InternalEndpoint("grpc")).Build() + WithFileSDStoreAddresses(fileSDPath).Build() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom-alone", + "replica": "0", + }, + { + "job": "myself", + "prometheus": "prom-both-remote-write-and-sidecar", + "receive": "receive-1", + "replica": "1234", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom-both-remote-write-and-sidecar", + "replica": "1234", + }, + { + "job": "myself", + "prometheus": "prom-ha", + "replica": "0", + }, + { + "job": "myself", + "prometheus": "prom-ha", + "replica": "1", + }, + }) + + // With deduplication. + queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom-alone", + }, + { + "job": "myself", + "prometheus": "prom-both-remote-write-and-sidecar", + "receive": "receive-1", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom-both-remote-write-and-sidecar", + }, + { + "job": "myself", + "prometheus": "prom-ha", + }, + }) +} + +func TestQueryWithEndpointConfig(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("e2e_test_query_config") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + receiver := e2ethanos.NewUninitiatedReceiver(e, "1") + receiverRunnable, err := e2ethanos.NewRoutingAndIngestingReceiverFromService(receiver, e.SharedDir(), 1) + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(receiverRunnable)) + + queryFileSDDir := filepath.Join(e.SharedDir(), "data", "querier", "1") + container := filepath.Join(ContainerSharedDir, "data", "querier", "1") + err = os.MkdirAll(queryFileSDDir, 0750) + testutil.Ok(t, err) + + // Generate certificates from ./test/e2e/certs/create.sh + cmd := exec.Command("/bin/bash", "../../../../certs/create.sh") + cmd.Dir = queryFileSDDir + _, err = cmd.Output() + testutil.Ok(t, err) + + tlsConfig := e2e.BuildArgs(map[string]string{ + "--grpc-server-tls-cert": filepath.Join(container, "e2e_test_query_config_server.crt"), + "--grpc-server-tls-key": filepath.Join(container, "testserver.key"), + "--grpc-server-tls-client-ca": filepath.Join(container, "testca.crt"), + }) + + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage(), tlsConfig) + testutil.Ok(t, err) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage(), tlsConfig) + testutil.Ok(t, err) + prom3, sidecar3, err := e2ethanos.NewPrometheusWithSidecar(e, "ha1", defaultPromConfig("prom-ha", 0, "", filepath.Join(e2ethanos.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage(), nil) + testutil.Ok(t, err) + prom4, sidecar4, err := e2ethanos.NewPrometheusWithSidecar(e, "ha2", defaultPromConfig("prom-ha", 1, "", filepath.Join(e2ethanos.ContainerSharedDir, "", "*.yaml")), e2ethanos.DefaultPrometheusImage(), nil) + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) + + fileSDPath, err := createSDFile(e.SharedDir(), "1", []string{sidecar3.InternalEndpoint("grpc"), sidecar4.InternalEndpoint("grpc")}) + testutil.Ok(t, err) + + endpointConfig := []query.EndpointConfig{ + { + Name: "withTLS", + TLSConfig: query.TLSConfiguration{ + CertFile: filepath.Join(container, "e2e_test_query_config_client.crt"), + KeyFile: filepath.Join(container, "testclient.key"), + CaCertFile: filepath.Join(container, "testca.crt"), + ServerName: "e2e_test_query_config-sidecar", + }, + Endpoints: []string{sidecar1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc")}, + }, + { + Name: "withoutTLS", + Endpoints: []string{receiver.InternalEndpoint("grpc")}, + EndpointsSD: []file.SDConfig{ + { + Files: []string{fileSDPath}, + RefreshInterval: model.Duration(time.Minute), + }, + }, + }, + } + + q, err := e2ethanos.NewQuerierBuilder(e, "1").WithEndpointConfig(endpointConfig).Build() testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(q)) @@ -260,9 +428,9 @@ func TestQueryLabelNames(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(receiverRunnable)) - prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) - prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage()) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) @@ -312,9 +480,9 @@ func TestQueryLabelValues(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(receiverRunnable)) - prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage()) + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(e, "alone", defaultPromConfig("prom-alone", 0, "", ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) - prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage()) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar(e, "remote-and-sidecar", defaultPromConfig("prom-both-remote-write-and-sidecar", 1234, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")), ""), e2ethanos.DefaultPrometheusImage(), nil) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) @@ -384,6 +552,7 @@ func TestQueryCompatibilityWithPreInfoAPI(t *testing.T) { "p1", defaultPromConfig("p1", 0, "", filepath.Join(e2ethanos.ContainerSharedDir, promRulesSubDir, "*.yaml"), "localhost:9090", qUninit.InternalEndpoint("http")), e2ethanos.DefaultPrometheusImage(), + nil, tcase.sidecarImage, e2ethanos.FeatureExemplarStorage, ) diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 0d94317c8b..dcb21a6d81 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -49,6 +49,7 @@ func TestRulesAPI_Fanout(t *testing.T) { "prom1", defaultPromConfig("ha", 0, "", filepath.Join(e2ethanos.ContainerSharedDir, promRulesSubDir, "*.yaml")), e2ethanos.DefaultPrometheusImage(), + nil, ) testutil.Ok(t, err) prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar( @@ -56,6 +57,7 @@ func TestRulesAPI_Fanout(t *testing.T) { "prom2", defaultPromConfig("ha", 1, "", filepath.Join(e2ethanos.ContainerSharedDir, promRulesSubDir, "*.yaml")), e2ethanos.DefaultPrometheusImage(), + nil, ) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) diff --git a/test/e2e/targets_api_test.go b/test/e2e/targets_api_test.go index a3b2d4a615..0920b71757 100644 --- a/test/e2e/targets_api_test.go +++ b/test/e2e/targets_api_test.go @@ -39,6 +39,7 @@ func TestTargetsAPI_Fanout(t *testing.T) { "prom1", defaultPromConfig("ha", 0, "", "", "localhost:9090", "localhost:80"), e2ethanos.DefaultPrometheusImage(), + nil, ) testutil.Ok(t, err) prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar( @@ -46,6 +47,7 @@ func TestTargetsAPI_Fanout(t *testing.T) { "prom2", defaultPromConfig("ha", 1, "", "", "localhost:9090", "localhost:80"), e2ethanos.DefaultPrometheusImage(), + nil, ) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2))