diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ae72d51e4..596fd9e612 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token. - [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket. - [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks. +- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support. ### Fixed diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index e16228b670..6e1a6a7f74 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -74,6 +74,7 @@ config: max_item_size: 0 max_get_multi_batch_size: 0 dns_provider_update_interval: 0s + auto_discovery: false expiration: 0s ``` diff --git a/docs/components/store.md b/docs/components/store.md index 888a846991..0b955a4b3d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -290,11 +290,12 @@ config: max_item_size: 0 max_get_multi_batch_size: 0 dns_provider_update_interval: 0s + auto_discovery: false ``` The **required** settings are: -- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. +- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead and only point to *one of* the memcached servers. This typically means that there should be only one address specified that resolves to any of the alive memcached servers. Use this for Amazon ElastiCache and other similar services. While the remaining settings are **optional**: @@ -306,6 +307,7 @@ While the remaining settings are **optional**: - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. +- `auto_discovery`: whether to use the auto-discovery mechanism for memcached. ## Caching Bucket diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index cb5adcb0d0..445138b3e6 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" + memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" @@ -53,6 +54,7 @@ var ( MaxGetMultiConcurrency: 100, MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, + AutoDiscovery: false, } ) @@ -114,6 +116,9 @@ type MemcachedClientConfig struct { // DNSProviderUpdateInterval specifies the DNS discovery update interval. DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"` + + // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution + AutoDiscovery bool `yaml:"auto_discovery"` } func (c *MemcachedClientConfig) validate() error { @@ -153,8 +158,8 @@ type memcachedClient struct { // Name provides an identifier for the instantiated Client name string - // DNS provider used to keep the memcached servers list updated. - dnsProvider *dns.Provider + // Address provider used to keep the memcached servers list updated. + addressProvider AddressProvider // Channel used to notify internal goroutines when they should quit. stop chan struct{} @@ -177,6 +182,15 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec } +// AddressProvider performs node address resolution given a list of clusters. +type AddressProvider interface { + // Resolves the provided list of memcached cluster to the actual nodes + Resolve(context.Context, []string) error + + // Returns the nodes + Addresses() []string +} + type memcachedGetMultiResult struct { items map[string]*memcache.Item err error @@ -220,20 +234,31 @@ func newMemcachedClient( reg prometheus.Registerer, name string, ) (*memcachedClient, error) { - dnsProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), - dns.GolangResolverType, - ) + promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg) + + var addressProvider AddressProvider + if config.AutoDiscovery { + addressProvider = memcacheDiscovery.NewProvider( + logger, + promRegisterer, + config.Timeout, + ) + } else { + addressProvider = dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), + dns.GolangResolverType, + ) + } c := &memcachedClient{ - logger: log.With(logger, "name", name), - config: config, - client: client, - selector: selector, - dnsProvider: dnsProvider, - asyncQueue: make(chan func(), config.MaxAsyncBufferSize), - stop: make(chan struct{}, 1), + logger: log.With(logger, "name", name), + config: config, + client: client, + selector: selector, + addressProvider: addressProvider, + asyncQueue: make(chan func(), config.MaxAsyncBufferSize), + stop: make(chan struct{}, 1), getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), config.MaxGetMultiConcurrency, @@ -561,11 +586,11 @@ func (c *memcachedClient) resolveAddrs() error { defer cancel() // If some of the dns resolution fails, log the error. - if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil { + if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil { level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. - servers := c.dnsProvider.Addresses() + servers := c.addressProvider.Addresses() if len(servers) == 0 { return fmt.Errorf("no server address resolved for %s", c.name) } diff --git a/pkg/discovery/memcache/provider.go b/pkg/discovery/memcache/provider.go new file mode 100644 index 0000000000..9bb1317824 --- /dev/null +++ b/pkg/discovery/memcache/provider.go @@ -0,0 +1,114 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package memcache + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/extprom" +) + +// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve +// addresses and obtain them. +type Provider struct { + sync.RWMutex + resolver Resolver + clusterConfigs map[string]*clusterConfig + logger log.Logger + + configVersion *extprom.TxGaugeVec + resolvedAddresses *extprom.TxGaugeVec + resolverFailuresCount prometheus.Counter + resolverLookupsCount prometheus.Counter +} + +func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider { + p := &Provider{ + resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout}, + clusterConfigs: map[string]*clusterConfig{}, + configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_config_version", + Help: "The current auto discovery config version", + }, []string{"addr"}), + resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_resolved_addresses", + Help: "The number of memcached nodes found via auto discovery", + }, []string{"addr"}), + resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_total", + Help: "The number of memcache auto discovery attempts", + }), + resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_failures_total", + Help: "The number of memcache auto discovery failures", + }), + logger: logger, + } + return p +} + +// Resolve stores a list of nodes auto-discovered from the provided addresses. +func (p *Provider) Resolve(ctx context.Context, addresses []string) error { + clusterConfigs := map[string]*clusterConfig{} + errs := errutil.MultiError{} + + for _, address := range addresses { + clusterConfig, err := p.resolver.Resolve(ctx, address) + p.resolverLookupsCount.Inc() + + if err != nil { + level.Warn(p.logger).Log( + "msg", "failed to perform auto-discovery for memcached", + "address", address, + ) + errs.Add(err) + p.resolverFailuresCount.Inc() + + // Use cached values. + p.RLock() + clusterConfigs[address] = p.clusterConfigs[address] + p.RUnlock() + } else { + clusterConfigs[address] = clusterConfig + } + } + + p.Lock() + defer p.Unlock() + + p.resolvedAddresses.ResetTx() + p.configVersion.ResetTx() + for address, config := range clusterConfigs { + p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes))) + p.configVersion.WithLabelValues(address).Set(float64(config.version)) + } + p.resolvedAddresses.Submit() + p.configVersion.Submit() + + p.clusterConfigs = clusterConfigs + + return errs.Err() +} + +// Addresses returns the latest addresses present in the Provider. +func (p *Provider) Addresses() []string { + p.RLock() + defer p.RUnlock() + + var result []string + for _, config := range p.clusterConfigs { + for _, node := range config.nodes { + result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port)) + } + } + return result +} diff --git a/pkg/discovery/memcache/provider_test.go b/pkg/discovery/memcache/provider_test.go new file mode 100644 index 0000000000..6ebbc16657 --- /dev/null +++ b/pkg/discovery/memcache/provider_test.go @@ -0,0 +1,81 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package memcache + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/pkg/errors" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestProviderUpdatesAddresses(t *testing.T) { + ctx := context.TODO() + clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} + provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) + resolver := mockResolver{ + configs: map[string]*clusterConfig{ + "memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + }, + } + provider.resolver = &resolver + + testutil.Ok(t, provider.Resolve(ctx, clusters)) + addresses := provider.Addresses() + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) + + resolver.configs = map[string]*clusterConfig{ + "memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}}, + "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + } + + testutil.Ok(t, provider.Resolve(ctx, clusters)) + addresses = provider.Addresses() + sort.Strings(addresses) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses) +} + +func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) { + ctx := context.TODO() + clusters := []string{"memcached-cluster-1", "memcached-cluster-2"} + provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second) + resolver := mockResolver{ + configs: map[string]*clusterConfig{ + "memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}}, + "memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}}, + }, + } + provider.resolver = &resolver + + testutil.Ok(t, provider.Resolve(ctx, clusters)) + addresses := provider.Addresses() + sort.Strings(addresses) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) + + resolver.configs = nil + resolver.err = errors.New("oops") + + testutil.NotOk(t, provider.Resolve(ctx, clusters)) + addresses = provider.Addresses() + sort.Strings(addresses) + testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses) +} + +type mockResolver struct { + configs map[string]*clusterConfig + err error +} + +func (r *mockResolver) Resolve(_ context.Context, address string) (*clusterConfig, error) { + if r.err != nil { + return nil, r.err + } + return r.configs[address], nil +} diff --git a/pkg/discovery/memcache/resolver.go b/pkg/discovery/memcache/resolver.go new file mode 100644 index 0000000000..4e7406cfba --- /dev/null +++ b/pkg/discovery/memcache/resolver.go @@ -0,0 +1,111 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package memcache + +import ( + "bufio" + "context" + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/thanos-io/thanos/pkg/runutil" +) + +type clusterConfig struct { + version int + nodes []node +} + +type node struct { + dns string + ip string + port int +} + +type Resolver interface { + Resolve(ctx context.Context, address string) (*clusterConfig, error) +} + +type memcachedAutoDiscovery struct { + dialTimeout time.Duration +} + +func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *clusterConfig, err error) { + conn, err := net.DialTimeout("tcp", address, s.dialTimeout) + if err != nil { + return nil, err + } + defer runutil.CloseWithErrCapture(&err, conn, "closing connection") + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + if _, err := fmt.Fprintf(rw, "config get cluster\n"); err != nil { + return nil, err + } + if err := rw.Flush(); err != nil { + return nil, err + } + + config, err = s.parseConfig(rw.Reader) + if err != nil { + return nil, err + } + + return config, err +} + +func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*clusterConfig, error) { + clusterConfig := new(clusterConfig) + + configMeta, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read config metadata: %s", err) + } + configMeta = strings.TrimSpace(configMeta) + + // First line should be "CONFIG cluster 0 [length-of-payload-] + configMetaComponents := strings.Split(configMeta, " ") + if len(configMetaComponents) != 4 { + return nil, fmt.Errorf("expected 4 components in config metadata, and received %d, meta: %s", len(configMetaComponents), configMeta) + } + + configSize, err := strconv.Atoi(configMetaComponents[3]) + if err != nil { + return nil, fmt.Errorf("failed to parse config size from metadata: %s, error: %s", configMeta, err) + } + + configVersion, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to find config version: %s", err) + } + clusterConfig.version, err = strconv.Atoi(strings.TrimSpace(configVersion)) + if err != nil { + return nil, fmt.Errorf("failed to parser config version: %s", err) + } + + nodes, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read nodes: %s", err) + } + + if len(configVersion)+len(nodes) != configSize { + return nil, fmt.Errorf("expected %d in config payload, but got %d instead.", configSize, len(configVersion)+len(nodes)) + } + + for _, host := range strings.Split(strings.TrimSpace(nodes), " ") { + dnsIpPort := strings.Split(host, "|") + if len(dnsIpPort) != 3 { + return nil, fmt.Errorf("node not in expected format: %s", dnsIpPort) + } + port, err := strconv.Atoi(dnsIpPort[2]) + if err != nil { + return nil, fmt.Errorf("failed to parse port: %s, err: %s", dnsIpPort, err) + } + clusterConfig.nodes = append(clusterConfig.nodes, node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port}) + } + + return clusterConfig, nil +} diff --git a/pkg/discovery/memcache/resolver_test.go b/pkg/discovery/memcache/resolver_test.go new file mode 100644 index 0000000000..a1dad82e74 --- /dev/null +++ b/pkg/discovery/memcache/resolver_test.go @@ -0,0 +1,83 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package memcache + +import ( + "bufio" + "strings" + "testing" + + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestGoodClusterConfigs(t *testing.T) { + resolver := memcachedAutoDiscovery{} + testCases := []struct { + content string + config clusterConfig + }{ + {"CONFIG cluster 0 23\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", + clusterConfig{nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100}, + }, + {"CONFIG cluster 0 37\r\n0\r\ndns-1|ip-1|11211 dns-2|ip-2|8080\r\nEND\r\n", + clusterConfig{nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0}, + }, + } + + for _, testCase := range testCases { + reader := bufio.NewReader(strings.NewReader(testCase.content)) + + config, err := resolver.parseConfig(reader) + + testutil.Ok(t, err) + testutil.Equals(t, testCase.config, *config) + } +} + +func TestBadClusterConfigs(t *testing.T) { + resolver := memcachedAutoDiscovery{} + testCases := []struct { + content string + expectedErr error + }{ + {"", + errors.New("failed to read config metadata: EOF"), + }, + {"CONFIG cluster\r\n", + errors.New("expected 4 components in config metadata, and received 2, meta: CONFIG cluster"), + }, + {"CONFIG cluster 0 configSize\r\n", + errors.New("failed to parse config size from metadata: CONFIG cluster 0 configSize, error: strconv.Atoi: parsing \"configSize\": invalid syntax"), + }, + {"CONFIG cluster 0 100\r\n", + errors.New("failed to find config version: EOF"), + }, + {"CONFIG cluster 0 100\r\nconfigVersion\r\n", + errors.New("failed to parser config version: strconv.Atoi: parsing \"configVersion\": invalid syntax"), + }, + {"CONFIG cluster 0 100\r\n0\r\n", + errors.New("failed to read nodes: EOF"), + }, + {"CONFIG cluster 0 0\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n", + errors.New("expected 0 in config payload, but got 23 instead."), + }, + {"CONFIG cluster 0 17\r\n100\r\ndns-1|ip-1\r\nEND\r\n", + errors.New("node not in expected format: [dns-1 ip-1]"), + }, + {"CONFIG cluster 0 22\r\n100\r\ndns-1|ip-1|port\r\nEND\r\n", + errors.New("failed to parse port: [dns-1 ip-1 port], err: strconv.Atoi: parsing \"port\": invalid syntax"), + }, + } + + for _, testCase := range testCases { + reader := bufio.NewReader(strings.NewReader(testCase.content)) + + _, err := resolver.parseConfig(reader) + + testutil.Assert(t, testCase.expectedErr.Error() == err.Error(), "expected error '%v', but got '%v'", testCase.expectedErr.Error(), err.Error()) + } + +}