From 757677fa41253d44c4ffe606fa03f8af7a7a46ee Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Mon, 18 Dec 2017 12:00:29 +0000 Subject: [PATCH] Clarify which fields of cache.Warmer are required The cache.Warmer struct has some mandatory fields and some optional. With a constructor we can better enforce this. In particular, the cache client implementation, which is now mandatory but was still described as optional in examples and help text. We can simplify (and in examples, omit) the default values by assuming memcached is in the same namespace. --- cmd/fluxd/main.go | 39 ++++++----- deploy/flux-deployment.yaml | 17 ++--- registry/cache/memcached/integration_test.go | 10 +-- registry/cache/warming.go | 68 +++++++++++--------- registry/cache/warming_test.go | 6 +- 5 files changed, 69 insertions(+), 71 deletions(-) diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 9a79d18552..df543d2b95 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -86,7 +86,7 @@ func main() { gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits") // registry - memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.") memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.") memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.") registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.") @@ -235,19 +235,17 @@ func main() { { // Cache client, for use by registry and cache warmer var cacheClient cache.Client - if *memcachedHostname != "" { - memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ - Host: *memcachedHostname, - Service: *memcachedService, - Expiry: *registryCacheExpiry, - Timeout: *memcachedTimeout, - UpdateInterval: 1 * time.Minute, - Logger: log.With(logger, "component", "memcached"), - MaxIdleConns: *registryBurst, - }) - defer memcacheClient.Stop() - cacheClient = cache.InstrumentClient(memcacheClient) - } + memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ + Host: *memcachedHostname, + Service: *memcachedService, + Expiry: *registryCacheExpiry, + Timeout: *memcachedTimeout, + UpdateInterval: 1 * time.Minute, + Logger: log.With(logger, "component", "memcached"), + MaxIdleConns: *registryBurst, + }) + defer memcacheClient.Stop() + cacheClient = cache.InstrumentClient(memcacheClient) cacheRegistry = &cache.Cache{ Reader: cacheClient, @@ -266,12 +264,11 @@ func main() { } // Warmer - warmerLogger := log.With(logger, "component", "warmer") - cacheWarmer = &cache.Warmer{ - Logger: warmerLogger, - ClientFactory: remoteFactory, - Cache: cacheClient, - Burst: *registryBurst, + var err error + cacheWarmer, err = cache.NewWarmer(remoteFactory, cacheClient, *registryBurst) + if err != nil { + logger.Log("err", err) + os.Exit(1) } } @@ -446,7 +443,7 @@ func main() { cacheWarmer.Notify = daemon.AskForImagePoll cacheWarmer.Priority = daemon.ImageRefresh shutdownWg.Add(1) - go cacheWarmer.Loop(shutdown, shutdownWg, image_creds) + go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, image_creds) // Update daemonRef so that upstream and handlers point to fully working daemon daemonRef.UpdatePlatform(daemon) diff --git a/deploy/flux-deployment.yaml b/deploy/flux-deployment.yaml index de1625f803..b79aad8885 100644 --- a/deploy/flux-deployment.yaml +++ b/deploy/flux-deployment.yaml @@ -32,13 +32,13 @@ spec: - name: git-key mountPath: /etc/fluxd/ssh args: - # if you deployed memcached, you can supply these arguments to - # tell fluxd to use it. You may need to change the namespace - # (`default`) if you run fluxd in another namespace. - - --memcached-hostname=memcached.default.svc.cluster.local - - --memcached-timeout=100ms - - --memcached-service=memcached - - --registry-cache-expiry=20m + + # if you deployed memcached in a different namespace to flux, + # or with a different service name, you can supply these + # following two arguments to tell fluxd how to connect to it. + # - --memcached-hostname=memcached.default.svc.cluster.local + # - --memcached-service=memcached + # replace (at least) the following URL - --git-url=git@github.com:weaveworks/flux-example - --git-branch=master @@ -46,6 +46,3 @@ spec: # (e.g., Weave Cloud). The token is particular to the service. # - --connect=wss://cloud.weave.works/api/flux # - --token=abc123abc123abc123abc123 - # override -b and -t arguments to ssh-keygen - # - --ssh-keygen-bits=2048 - - --ssh-keygen-type=ed25519 diff --git a/registry/cache/memcached/integration_test.go b/registry/cache/memcached/integration_test.go index ea209d2d85..92b9b4ffc4 100644 --- a/registry/cache/memcached/integration_test.go +++ b/registry/cache/memcached/integration_test.go @@ -42,13 +42,7 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) { r := &cache.Cache{mc} - w := &cache.Warmer{ - Logger: log.With(logger, "component", "warmer"), - ClientFactory: remote, - Cache: mc, - Burst: 125, - } - + w, _ := cache.NewWarmer(remote, mc, 125) shutdown := make(chan struct{}) shutdownWg := &sync.WaitGroup{} defer func() { @@ -57,7 +51,7 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) { }() shutdownWg.Add(1) - go w.Loop(shutdown, shutdownWg, func() registry.ImageCreds { + go w.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, func() registry.ImageCreds { return registry.ImageCreds{ id.Name: registry.NoCredentials(), } diff --git a/registry/cache/warming.go b/registry/cache/warming.go index f7923bb7ca..6c15bd39b3 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -20,14 +20,26 @@ const askForNewImagesInterval = time.Minute // Warmer refreshes the information kept in the cache from remote // registries. type Warmer struct { - Logger log.Logger - ClientFactory registry.ClientFactory - Cache Client - Burst int + clientFactory registry.ClientFactory + cache Client + burst int Priority chan image.Name Notify func() } +// NewWarmer creates cache warmer that (when Loop is invoked) will +// periodically refresh the values kept in the cache. +func NewWarmer(cf registry.ClientFactory, cacheClient Client, burst int) (*Warmer, error) { + if cf == nil || cacheClient == nil || burst <= 0 { + return nil, errors.New("arguments must be non-nil (or > 0 in the case of burst)") + } + return &Warmer{ + clientFactory: cf, + cache: cacheClient, + burst: burst, + }, nil +} + // .. and this is what we keep in the backlog type backlogItem struct { image.Name @@ -36,13 +48,9 @@ type backlogItem struct { // Continuously get the images to populate the cache with, and // populate the cache with them. -func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) { +func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) { defer wg.Done() - if w.Logger == nil || w.ClientFactory == nil || w.Cache == nil { - panic("registry.Warmer fields are nil") - } - refresh := time.Tick(askForNewImagesInterval) imageCreds := imagesToFetchFunc() backlog := imageCredsToBacklog(imageCreds) @@ -61,17 +69,17 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun for { select { case <-stop: - w.Logger.Log("stopping", "true") + logger.Log("stopping", "true") return case name := <-w.Priority: - w.Logger.Log("priority", name.String()) + logger.Log("priority", name.String()) // NB the implicit contract here is that the prioritised // image has to have been running the last time we // requested the credentials. if creds, ok := imageCreds[name]; ok { - w.warm(ctx, name, creds) + w.warm(ctx, logger, name, creds) } else { - w.Logger.Log("priority", name.String(), "err", "no creds available") + logger.Log("priority", name.String(), "err", "no creds available") } continue default: @@ -80,7 +88,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun if len(backlog) > 0 { im := backlog[0] backlog = backlog[1:] - w.warm(ctx, im.Name, im.Credentials) + w.warm(ctx, logger, im.Name, im.Credentials) } else { select { case <-refresh: @@ -102,17 +110,17 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem { return backlog } -func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credentials) { - client, err := w.ClientFactory.ClientFor(id.CanonicalName(), creds) +func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, creds registry.Credentials) { + client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds) if err != nil { - w.Logger.Log("err", err.Error()) + logger.Log("err", err.Error()) return } // This is what we're going to write back to the cache var repo ImageRepository repoKey := NewRepositoryKey(id.CanonicalName()) - bytes, _, err := w.Cache.GetKey(repoKey) + bytes, _, err := w.cache.GetKey(repoKey) if err == nil { err = json.Unmarshal(bytes, &repo) } else if err == ErrNotCached { @@ -120,7 +128,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent } if err != nil { - w.Logger.Log("err", errors.Wrap(err, "fetching previous result from cache")) + logger.Log("err", errors.Wrap(err, "fetching previous result from cache")) return } @@ -133,17 +141,17 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent defer func() { bytes, err := json.Marshal(repo) if err == nil { - err = w.Cache.SetKey(repoKey, bytes) + err = w.cache.SetKey(repoKey, bytes) } if err != nil { - w.Logger.Log("err", errors.Wrap(err, "writing result to cache")) + logger.Log("err", errors.Wrap(err, "writing result to cache")) } }() tags, err := client.Tags(ctx) if err != nil { if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") { - w.Logger.Log("err", errors.Wrap(err, "requesting tags")) + logger.Log("err", errors.Wrap(err, "requesting tags")) repo.LastError = err.Error() } return @@ -158,7 +166,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent // See if we have the manifest already cached newID := id.ToRef(tag) key := NewManifestKey(newID.CanonicalRef()) - bytes, expiry, err := w.Cache.GetKey(key) + bytes, expiry, err := w.cache.GetKey(key) // If err, then we don't have it yet. Update. switch { case err != nil: @@ -179,12 +187,12 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent var successCount int if len(toUpdate) > 0 { - w.Logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing) + logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing) var successMx sync.Mutex // The upper bound for concurrent fetches against a single host is // w.Burst, so limit the number of fetching goroutines to that. - fetchers := make(chan struct{}, w.Burst) + fetchers := make(chan struct{}, w.burst) awaitFetchers := &sync.WaitGroup{} updates: for _, imID := range toUpdate { @@ -204,7 +212,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent // This was due to a context timeout, don't bother logging return } - w.Logger.Log("err", errors.Wrap(err, "requesting manifests")) + logger.Log("err", errors.Wrap(err, "requesting manifests")) return } @@ -212,12 +220,12 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent // Write back to memcached val, err := json.Marshal(img) if err != nil { - w.Logger.Log("err", errors.Wrap(err, "serializing tag to store in cache")) + logger.Log("err", errors.Wrap(err, "serializing tag to store in cache")) return } - err = w.Cache.SetKey(key, val) + err = w.cache.SetKey(key, val) if err != nil { - w.Logger.Log("err", errors.Wrap(err, "storing manifests in cache")) + logger.Log("err", errors.Wrap(err, "storing manifests in cache")) return } successMx.Lock() @@ -227,7 +235,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent }(imID) } awaitFetchers.Wait() - w.Logger.Log("updated", id.String(), "count", successCount) + logger.Log("updated", id.String(), "count", successCount) } // We managed to fetch new metadata for everything we were missing diff --git a/registry/cache/warming_test.go b/registry/cache/warming_test.go index 829e5896ef..29159a55d7 100644 --- a/registry/cache/warming_test.go +++ b/registry/cache/warming_test.go @@ -49,6 +49,8 @@ func TestWarm(t *testing.T) { ref, _ := image.ParseRef("example.com/path/image:tag") repo := ref.Name + logger := log.NewNopLogger() + client := &mock.Client{ TagsFn: func() ([]string, error) { return []string{"tag"}, nil @@ -65,8 +67,8 @@ func TestWarm(t *testing.T) { } factory := &mock.ClientFactory{Client: client} c := &mem{} - warmer := &Warmer{Logger: log.NewNopLogger(), ClientFactory: factory, Cache: c, Burst: 10} - warmer.warm(context.TODO(), repo, registry.NoCredentials()) + warmer := &Warmer{clientFactory: factory, cache: c, burst: 10} + warmer.warm(context.TODO(), logger, repo, registry.NoCredentials()) registry := &Cache{Reader: c} repoInfo, err := registry.GetRepository(ref.Name)