diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 0e8a133440..35b38915a6 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -38,11 +38,12 @@ import ( var version string const ( - defaultRemoteConnections = 125 // Chosen performance tests on sock-shop. Unable to get higher performance than this. - // Memcache: There is only one thread to accept new client connections. If you are cycling connections very quickly, you can overwhelm the thread. Use persistent connections or UDP in this case. - // See https://github.com/memcached/memcached/wiki/Performance - // Hence, keep the number of new connections low. - defaultMemcacheConnections = 10 + // The number of connections chosen for memcache and remote GETs must match. + // If they do not match then the remote either creates too many memcache connections + // or not enough to be efficient. + // Value chosen through performance tests on sock-shop. I was unable to get higher performance than this. + defaultRemoteConnections = 125 // Chosen performance tests on sock-shop. Unable to get higher performance than this. + defaultMemcacheConnections = 10 // This doesn't need to be high. The user is only requesting one tag/image at a time. ) func optionalVar(fs *pflag.FlagSet, value ssh.OptionalValue, name, usage string) ssh.OptionalValue { @@ -78,11 +79,11 @@ func main() { memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.") memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.") - memcachedConnections = fs.Int("memcached-connections", defaultMemcacheConnections, "maximum number of connections to memcache") + memcachedConnections = fs.Int("memcached-connections", defaultMemcacheConnections, "maximum number of registry connections to memcache") registryCacheExpiry = fs.Duration("registry-cache-expiry", 20*time.Minute, "Duration to keep cached registry tag info. Must be < 1 month.") registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to poll registry for new images") registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host") - registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum registry request burst per host (default matched to number of http worker goroutines)") + registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache") // k8s-secret backed ssh keyring configuration k8sSecretName = fs.String("k8s-secret-name", "flux-git-deploy", "Name of the k8s secret used to store the private SSH key") k8sSecretVolumeMountPath = fs.String("k8s-secret-volume-mount-path", "/etc/fluxd/ssh", "Mount location of the k8s secret storing the private SSH key") @@ -210,22 +211,36 @@ func main() { var cacheWarmer registry.Warmer { // Cache - var memcacheClient registryMemcache.Client + var memcacheRegistry registryMemcache.Client if *memcachedHostname != "" { - memcacheClient = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ + memcacheRegistry = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ Host: *memcachedHostname, Service: *memcachedService, Timeout: *memcachedTimeout, UpdateInterval: 1 * time.Minute, Logger: log.NewContext(logger).With("component", "memcached"), + MaxConnections: *memcachedConnections, }) - memcacheClient = registryMemcache.InstrumentMemcacheClient(memcacheClient) - defer memcacheClient.Stop() + memcacheRegistry = registryMemcache.InstrumentMemcacheClient(memcacheRegistry) + defer memcacheRegistry.Stop() + } + var memcacheWarmer registryMemcache.Client + if *memcachedHostname != "" { + memcacheWarmer = registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ + Host: *memcachedHostname, + Service: *memcachedService, + Timeout: *memcachedTimeout, + UpdateInterval: 1 * time.Minute, + Logger: log.NewContext(logger).With("component", "memcached"), + MaxConnections: *registryBurst, + }) + memcacheWarmer = registryMemcache.InstrumentMemcacheClient(memcacheWarmer) + defer memcacheWarmer.Stop() } cacheLogger := log.NewContext(logger).With("component", "cache") cache = registry.NewRegistry( - registry.NewCacheClientFactory(cacheLogger, memcacheClient, *registryCacheExpiry), + registry.NewCacheClientFactory(cacheLogger, memcacheRegistry, *registryCacheExpiry), cacheLogger, *memcachedConnections, ) @@ -244,8 +259,8 @@ func main() { Logger: warmerLogger, ClientFactory: remoteFactory, Expiry: *registryCacheExpiry, - Reader: memcacheClient, - Writer: memcacheClient, + Reader: memcacheWarmer, + Writer: memcacheWarmer, Burst: *registryBurst, } } diff --git a/registry/cache/memcached.go b/registry/cache/memcached.go index 4c44d4dca6..5e1c1e5656 100644 --- a/registry/cache/memcached.go +++ b/registry/cache/memcached.go @@ -75,12 +75,14 @@ type MemcacheConfig struct { Timeout time.Duration UpdateInterval time.Duration Logger log.Logger + MaxConnections int } func NewMemcacheClient(config MemcacheConfig) Client { var servers memcache.ServerList client := memcache.NewFromSelector(&servers) client.Timeout = config.Timeout + client.MaxIdleConns = config.MaxConnections newClient := &memcacheClient{ Client: client, diff --git a/vendor/manifest b/vendor/manifest index 9bb9b9e31b..8be48f625a 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -88,7 +88,7 @@ "importpath": "github.com/bradfitz/gomemcache/memcache", "repository": "https://github.com/bradfitz/gomemcache", "vcs": "git", - "revision": "2fafb84a66c4911e11a8f50955b01e74fe3ab9c5", + "revision": "1952afaa557dc08e8e0d89eafab110fb501c1a2b", "branch": "master", "path": "/memcache", "notests": true