Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add memcached autodiscovery support #4487

Merged
merged 7 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.
- [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks.
- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
expiration: 0s
```

Expand Down
4 changes: 3 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,12 @@ config:
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
```

The **required** settings are:

- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider.
- `addresses`: list of memcached addresses, that will get resolved with the [DNS service discovery](../service-discovery.md/#dns-service-discovery) provider. If your cluster supports auto-discovery, you should use the flag `auto_discovery` instead and only point to *one of* the memcached servers. This typically means that there should be only one address specified that resolves to any of the alive memcached servers. Use this for Amazon ElastiCache and other similar services.

While the remaining settings are **optional**:

Expand All @@ -306,6 +307,7 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.

## Caching Bucket

Expand Down
57 changes: 41 additions & 16 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/discovery/dns"
memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
AutoDiscovery: false,
}
)

Expand Down Expand Up @@ -114,6 +116,9 @@ type MemcachedClientConfig struct {

// DNSProviderUpdateInterval specifies the DNS discovery update interval.
DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"`

// AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution
AutoDiscovery bool `yaml:"auto_discovery"`
}

func (c *MemcachedClientConfig) validate() error {
Expand Down Expand Up @@ -153,8 +158,8 @@ type memcachedClient struct {
// Name provides an identifier for the instantiated Client
name string

// DNS provider used to keep the memcached servers list updated.
dnsProvider *dns.Provider
// Address provider used to keep the memcached servers list updated.
addressProvider AddressProvider

// Channel used to notify internal goroutines when they should quit.
stop chan struct{}
Expand All @@ -177,6 +182,15 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec
}

// AddressProvider performs node address resolution given a list of clusters.
type AddressProvider interface {
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
// Resolves the provided list of memcached cluster to the actual nodes
Resolve(context.Context, []string) error

// Returns the nodes
Addresses() []string
}

type memcachedGetMultiResult struct {
items map[string]*memcache.Item
err error
Expand Down Expand Up @@ -220,20 +234,31 @@ func newMemcachedClient(
reg prometheus.Registerer,
name string,
) (*memcachedClient, error) {
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg)

var addressProvider AddressProvider
if config.AutoDiscovery {
addressProvider = memcacheDiscovery.NewProvider(
logger,
promRegisterer,
config.Timeout,
)
} else {
addressProvider = dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_memcached_", reg),
dns.GolangResolverType,
)
}

c := &memcachedClient{
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
logger: log.With(logger, "name", name),
config: config,
client: client,
selector: selector,
addressProvider: addressProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
Expand Down Expand Up @@ -561,11 +586,11 @@ func (c *memcachedClient) resolveAddrs() error {
defer cancel()

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
servers := c.addressProvider.Addresses()
if len(servers) == 0 {
return fmt.Errorf("no server address resolved for %s", c.name)
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
)

// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve
// addresses and obtain them.
type Provider struct {
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
sync.RWMutex
resolver Resolver
clusterConfigs map[string]*clusterConfig
logger log.Logger

configVersion *extprom.TxGaugeVec
resolvedAddresses *extprom.TxGaugeVec
resolverFailuresCount prometheus.Counter
resolverLookupsCount prometheus.Counter
}

func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider {
p := &Provider{
resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout},
clusterConfigs: map[string]*clusterConfig{},
configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_config_version",
Help: "The current auto discovery config version",
}, []string{"addr"}),
resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_resolved_addresses",
Help: "The number of memcached nodes found via auto discovery",
}, []string{"addr"}),
resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_total",
Help: "The number of memcache auto discovery attempts",
}),
resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "auto_discovery_failures_total",
Help: "The number of memcache auto discovery failures",
}),
logger: logger,
}
return p
}

// Resolve stores a list of nodes auto-discovered from the provided addresses.
func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
clusterConfigs := map[string]*clusterConfig{}
errs := errutil.MultiError{}

for _, address := range addresses {
clusterConfig, err := p.resolver.Resolve(ctx, address)
p.resolverLookupsCount.Inc()

if err != nil {
level.Warn(p.logger).Log(
"msg", "failed to perform auto-discovery for memcached",
"address", address,
)
errs.Add(err)
p.resolverFailuresCount.Inc()

// Use cached values.
p.RLock()
clusterConfigs[address] = p.clusterConfigs[address]
p.RUnlock()
} else {
clusterConfigs[address] = clusterConfig
}
}

p.Lock()
defer p.Unlock()

p.resolvedAddresses.ResetTx()
p.configVersion.ResetTx()
for address, config := range clusterConfigs {
p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes)))
p.configVersion.WithLabelValues(address).Set(float64(config.version))
}
p.resolvedAddresses.Submit()
p.configVersion.Submit()

p.clusterConfigs = clusterConfigs

return errs.Err()
}

// Addresses returns the latest addresses present in the Provider.
func (p *Provider) Addresses() []string {
var result []string
for _, config := range p.clusterConfigs {
roystchiang marked this conversation as resolved.
Show resolved Hide resolved
for _, node := range config.nodes {
result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port))
}
}
return result
}
81 changes: 81 additions & 0 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package memcache

import (
"context"
"sort"
"testing"
"time"

"github.com/pkg/errors"

"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestProviderUpdatesAddresses(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
}

func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
ctx := context.TODO()
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")

testutil.NotOk(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
}

type mockResolver struct {
configs map[string]*clusterConfig
err error
}

func (r *mockResolver) Resolve(_ context.Context, address string) (*clusterConfig, error) {
if r.err != nil {
return nil, r.err
}
return r.configs[address], nil
}
Loading