From fefb216c1d2c415aa3adf00265c0a6ad0fecf7f5 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Sun, 14 Apr 2019 21:42:43 +0200 Subject: [PATCH 1/2] Split Warmer.warm() into smaller functions `Warmer.warming()` was a huge function (~250 loc), making it really hard to read. This change moves most of it into a new `repoCacheManager`, improving readability. After this change `Warmer.warming()` is still 100 loc, but reads much better. --- registry/cache/repocachemanager.go | 268 +++++++++++++++++++++++++++++ registry/cache/warming.go | 210 +++------------------- 2 files changed, 290 insertions(+), 188 deletions(-) create mode 100644 registry/cache/repocachemanager.go diff --git a/registry/cache/repocachemanager.go b/registry/cache/repocachemanager.go new file mode 100644 index 000000000..ef7dd6a94 --- /dev/null +++ b/registry/cache/repocachemanager.go @@ -0,0 +1,268 @@ +package cache + +import ( + "context" + "encoding/json" + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + + "github.com/weaveworks/flux/image" + "github.com/weaveworks/flux/registry" +) + +type imageToUpdate struct { + ref image.Ref + previousDigest string + previousRefresh time.Duration +} + +// repoCacheManager handles cache operations for a repository +type repoCacheManager struct { + now time.Time + repoID image.Name + burst int + trace bool + logger log.Logger + cacheClient Client + sync.Mutex +} + +func newRepoCacheManager(now time.Time, repoId image.Name, burst int, trace bool, logger log.Logger, + cacheClient Client) *repoCacheManager { + return &repoCacheManager{ + now: now, + repoID: repoId, + burst: burst, + trace: trace, + logger: logger, + cacheClient: cacheClient, + } +} + +// fetchRepository fetches the repository from the cache +func (c *repoCacheManager) fetchRepository() (ImageRepository, error) { + var result ImageRepository + repoKey := NewRepositoryKey(c.repoID.CanonicalName()) + bytes, _, err := c.cacheClient.GetKey(repoKey) + if err != nil { + return ImageRepository{}, err + } + if err = json.Unmarshal(bytes, &result); err != nil { + return ImageRepository{}, err + } + return result, nil +} + +// storeRepository stores the repository from the cache +func (c *repoCacheManager) storeRepository(repo ImageRepository) error { + repoKey := NewRepositoryKey(c.repoID.CanonicalName()) + bytes, err := json.Marshal(repo) + if err != nil { + return err + } + return c.cacheClient.SetKey(repoKey, c.now.Add(repoRefresh), bytes) +} + +// fetchImagesResult is the result of fetching images from the cache +// invariant: len(imagesToUpdate) == imagesToUpdateRefreshCount + imagesToUpdateMissingCount +type fetchImagesResult struct { + imagesFound map[string]image.Info // images found in the cache + imagesToUpdate []imageToUpdate // images which need to be updated + imagesToUpdateRefreshCount int // number of imagesToUpdate which need updating due to their cache entry expiring + imagesToUpdateMissingCount int // number of imagesToUpdate which need updating due to being missing +} + +// fetchImages attemps to fetch the images with the provided tags from the cache. +// It returns the images found, those which require updating and details about +// why they need to be updated. +func (c *repoCacheManager) fetchImages(tags []string) (fetchImagesResult, error) { + images := map[string]image.Info{} + + // Create a list of images that need updating + var toUpdate []imageToUpdate + + // Counters for reporting what happened + var missing, refresh int + for _, tag := range tags { + if tag == "" { + return fetchImagesResult{}, fmt.Errorf("empty tag in fetched tags") + } + + // See if we have the manifest already cached + newID := c.repoID.ToRef(tag) + key := NewManifestKey(newID.CanonicalRef()) + bytes, deadline, err := c.cacheClient.GetKey(key) + // If err, then we don't have it yet. Update. + switch { + case err != nil: // by and large these are cache misses, but any error shall count as "not found" + if err != ErrNotCached { + c.logger.Log("warning", "error from cache", "err", err, "ref", newID) + } + missing++ + toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: initialRefresh}) + case len(bytes) == 0: + c.logger.Log("warning", "empty result from cache", "ref", newID) + missing++ + toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: initialRefresh}) + default: + var entry registry.ImageEntry + if err := json.Unmarshal(bytes, &entry); err == nil { + if c.trace { + c.logger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339)) + } + + if entry.ExcludedReason == "" { + images[tag] = entry.Info + if c.now.After(deadline) { + previousRefresh := minRefresh + lastFetched := entry.Info.LastFetched + if !lastFetched.IsZero() { + previousRefresh = deadline.Sub(lastFetched) + } + toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest}) + refresh++ + } + } else { + if c.trace { + c.logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason) + } + if c.now.After(deadline) { + toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: excludedRefresh}) + refresh++ + } + } + } + } + } + + result := fetchImagesResult{ + imagesFound: images, + imagesToUpdate: toUpdate, + imagesToUpdateRefreshCount: refresh, + imagesToUpdateMissingCount: missing, + } + + return result, nil +} + +// updateImages, refreshes the cache entries for the images passed. It may not succeed for all images. +// It returns the values stored in cache, the number of images it succeeded for and the number +// of images whose manifest wasn't found in the registry. +func (c *repoCacheManager) updateImages(ctx context.Context, registryClient registry.Client, images []imageToUpdate) (map[string]image.Info, int, int) { + // The upper bound for concurrent fetches against a single host is + // w.Burst, so limit the number of fetching goroutines to that. + fetchers := make(chan struct{}, c.burst) + awaitFetchers := &sync.WaitGroup{} + + ctxc, cancel := context.WithCancel(ctx) + defer cancel() + + var successCount int + var manifestUnknownCount int + var result = map[string]image.Info{} + var warnAboutRateLimit sync.Once +updates: + for _, up := range images { + // to avoid race condition, when accessing it in the go routine + upCopy := up + select { + case <-ctxc.Done(): + break updates + case fetchers <- struct{}{}: + } + awaitFetchers.Add(1) + go func() { + defer func() { awaitFetchers.Done(); <-fetchers }() + entry, err := c.updateImage(ctxc, registryClient, upCopy) + if err != nil { + if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() { + // This was due to a context timeout, don't bother logging + return + } + switch { + case strings.Contains(err.Error(), "429"): + // abort the image tags fetching if we've been rate limited + warnAboutRateLimit.Do(func() { + c.logger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later") + cancel() + }) + case strings.Contains(err.Error(), "manifest unknown"): + // Registry is corrupted, keep going, this manifest may not be relevant for automatic updates + c.Lock() + manifestUnknownCount++ + c.Unlock() + c.logger.Log("warn", fmt.Sprintf("manifest for tag %s missing in repository %s", up.ref.Tag, up.ref.Name), + "impact", "flux will fail to auto-release workloads with matching images, ask the repository administrator to fix the inconsistency") + default: + c.logger.Log("err", err, "ref", up.ref) + } + return + } + c.Lock() + successCount++ + if entry.ExcludedReason == "" { + result[upCopy.ref.Tag] = entry.Info + } + c.Unlock() + }() + } + awaitFetchers.Wait() + return result, successCount, manifestUnknownCount +} + +func (c *repoCacheManager) updateImage(ctx context.Context, registryClient registry.Client, update imageToUpdate) (registry.ImageEntry, error) { + imageID := update.ref + + if c.trace { + c.logger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String()) + } + + // Get the image from the remote + entry, err := registryClient.Manifest(ctx, imageID.Tag) + if err != nil { + return registry.ImageEntry{}, err + } + + refresh := update.previousRefresh + reason := "" + switch { + case entry.ExcludedReason != "": + c.logger.Log("excluded", entry.ExcludedReason, "ref", imageID) + refresh = excludedRefresh + reason = "image is excluded" + case update.previousDigest == "": + entry.Info.LastFetched = c.now + refresh = update.previousRefresh + reason = "no prior cache entry for image" + case entry.Info.Digest == update.previousDigest: + entry.Info.LastFetched = c.now + refresh = clipRefresh(refresh * 2) + reason = "image digest is same" + default: // i.e., not excluded, but the digests differ -> the tag was moved + entry.Info.LastFetched = c.now + refresh = clipRefresh(refresh / 2) + reason = "image digest is different" + } + + if c.trace { + c.logger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", c.now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason) + } + + key := NewManifestKey(imageID.CanonicalRef()) + // Write back to memcached + val, err := json.Marshal(entry) + if err != nil { + return registry.ImageEntry{}, err + } + err = c.cacheClient.SetKey(key, c.now.Add(refresh), val) + if err != nil { + return registry.ImageEntry{}, err + } + return entry, nil +} diff --git a/registry/cache/warming.go b/registry/cache/warming.go index ed9bc1373..becc6d8a3 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -2,15 +2,13 @@ package cache import ( "context" - "encoding/json" - "fmt" - "net" "strings" "sync" "time" "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/weaveworks/flux/image" "github.com/weaveworks/flux/registry" ) @@ -157,17 +155,12 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id return } + cacheManager := newRepoCacheManager(now, id, w.burst, w.Trace, errorLogger, w.cache) + // This is what we're going to write back to the cache var repo ImageRepository - repoKey := NewRepositoryKey(id.CanonicalName()) - bytes, _, err := w.cache.GetKey(repoKey) - if err == nil { - err = json.Unmarshal(bytes, &repo) - } else if err == ErrNotCached { - err = nil - } - - if err != nil { + repo, err = cacheManager.fetchRepository() + if err != nil && err != ErrNotCached { errorLogger.Log("err", errors.Wrap(err, "fetching previous result from cache")) return } @@ -178,11 +171,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id // attempting to refresh that value. Whatever happens, at the end // we'll write something back. defer func() { - bytes, err := json.Marshal(repo) - if err == nil { - err = w.cache.SetKey(repoKey, now.Add(repoRefresh), bytes) - } - if err != nil { + if err := cacheManager.storeRepository(repo); err != nil { errorLogger.Log("err", errors.Wrap(err, "writing result to cache")) } }() @@ -196,187 +185,32 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id return } - newImages := map[string]image.Info{} - - // Create a list of images that need updating - type update struct { - ref image.Ref - previousDigest string - previousRefresh time.Duration - } - var toUpdate []update - - // Counters for reporting what happened - var missing, refresh int - for _, tag := range tags { - if tag == "" { - errorLogger.Log("err", "empty tag in fetched tags", "tags", tags) - repo.LastError = "empty tag in fetched tags" - return // abort and let the error be written - } - - // See if we have the manifest already cached - newID := id.ToRef(tag) - key := NewManifestKey(newID.CanonicalRef()) - bytes, deadline, err := w.cache.GetKey(key) - // If err, then we don't have it yet. Update. - switch { - case err != nil: // by and large these are cache misses, but any error shall count as "not found" - if err != ErrNotCached { - errorLogger.Log("warning", "error from cache", "err", err, "ref", newID) - } - missing++ - toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh}) - case len(bytes) == 0: - errorLogger.Log("warning", "empty result from cache", "ref", newID) - missing++ - toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh}) - default: - var entry registry.ImageEntry - if err := json.Unmarshal(bytes, &entry); err == nil { - if w.Trace { - errorLogger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339)) - } - - if entry.ExcludedReason == "" { - newImages[tag] = entry.Info - if now.After(deadline) { - previousRefresh := minRefresh - lastFetched := entry.Info.LastFetched - if !lastFetched.IsZero() { - previousRefresh = deadline.Sub(lastFetched) - } - toUpdate = append(toUpdate, update{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest}) - refresh++ - } - } else { - if w.Trace { - logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason) - } - if now.After(deadline) { - toUpdate = append(toUpdate, update{ref: newID, previousRefresh: excludedRefresh}) - refresh++ - } - } - } - } + fetchResult, err := cacheManager.fetchImages(tags) + if err != nil { + logger.Log("err", err, "tags", tags) + repo.LastError = err.Error() + return // abort and let the error be written } + newImages := fetchResult.imagesFound - var fetchMx sync.Mutex // also guards access to newImages var successCount int var manifestUnknownCount int - if len(toUpdate) > 0 { - logger.Log("info", "refreshing image", "image", id, "tag_count", len(tags), "to_update", len(toUpdate), "of_which_refresh", refresh, "of_which_missing", missing) - - // The upper bound for concurrent fetches against a single host is - // w.Burst, so limit the number of fetching goroutines to that. - fetchers := make(chan struct{}, w.burst) - awaitFetchers := &sync.WaitGroup{} - - ctxc, cancel := context.WithCancel(ctx) - var once sync.Once - defer cancel() - - updates: - for _, up := range toUpdate { - select { - case <-ctxc.Done(): - break updates - case fetchers <- struct{}{}: - } - - awaitFetchers.Add(1) - - go func(update update) { - defer func() { awaitFetchers.Done(); <-fetchers }() - - imageID := update.ref - - if w.Trace { - errorLogger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String()) - } - - // Get the image from the remote - entry, err := client.Manifest(ctxc, imageID.Tag) - if err != nil { - if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() { - // This was due to a context timeout, don't bother logging - return - } - - switch { - case strings.Contains(err.Error(), "429"): - // abort the image tags fetching if we've been rate limited - once.Do(func() { - errorLogger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later") - cancel() - }) - case strings.Contains(err.Error(), "manifest unknown"): - // Registry is corrupted, keep going, this manifest may not be relevant for automatic updates - fetchMx.Lock() - manifestUnknownCount++ - fetchMx.Unlock() - errorLogger.Log("warn", fmt.Sprintf("manifest for tag %s missing in repository %s", imageID.Tag, imageID.Name), - "impact", "flux will fail to auto-release workloads with matching images, ask the repository administrator to fix the inconsistency") - default: - errorLogger.Log("err", err, "ref", imageID) - } - return - } - - refresh := update.previousRefresh - reason := "" - switch { - case entry.ExcludedReason != "": - errorLogger.Log("excluded", entry.ExcludedReason, "ref", imageID) - refresh = excludedRefresh - reason = "image is excluded" - case update.previousDigest == "": - entry.Info.LastFetched = now - refresh = update.previousRefresh - reason = "no prior cache entry for image" - case entry.Info.Digest == update.previousDigest: - entry.Info.LastFetched = now - refresh = clipRefresh(refresh * 2) - reason = "image digest is same" - default: // i.e., not excluded, but the digests differ -> the tag was moved - entry.Info.LastFetched = now - refresh = clipRefresh(refresh / 2) - reason = "image digest is different" - } - - if w.Trace { - errorLogger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason) - } - - key := NewManifestKey(imageID.CanonicalRef()) - // Write back to memcached - val, err := json.Marshal(entry) - if err != nil { - errorLogger.Log("err", err, "ref", imageID) - return - } - err = w.cache.SetKey(key, now.Add(refresh), val) - if err != nil { - errorLogger.Log("err", err, "ref", imageID) - return - } - fetchMx.Lock() - successCount++ - if entry.ExcludedReason == "" { - newImages[imageID.Tag] = entry.Info - } - fetchMx.Unlock() - }(up) + if len(fetchResult.imagesToUpdate) > 0 { + logger.Log("info", "refreshing image", "image", id, "tag_count", len(tags), + "to_update", len(fetchResult.imagesToUpdate), + "of_which_refresh", fetchResult.imagesToUpdateRefreshCount, "of_which_missing", fetchResult.imagesToUpdateMissingCount) + var images map[string]image.Info + images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, client, fetchResult.imagesToUpdate) + for k, v := range images { + newImages[k] = v } - awaitFetchers.Wait() - logger.Log("updated", id.String(), "successful", successCount, "attempted", len(toUpdate)) + logger.Log("updated", id.String(), "successful", successCount, "attempted", len(fetchResult.imagesToUpdate)) } // We managed to fetch new metadata for everything we needed. // Ratchet the result forward. - if successCount+manifestUnknownCount == len(toUpdate) { + if successCount+manifestUnknownCount == len(fetchResult.imagesToUpdate) { repo = ImageRepository{ LastUpdate: time.Now(), RepositoryMetadata: image.RepositoryMetadata{ From 2fc6258d9bb7afa5ebe6dc04e7b4e757cb12cedc Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 15 Apr 2019 18:10:00 +0200 Subject: [PATCH 2/2] Small clarification --- registry/cache/repocachemanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/cache/repocachemanager.go b/registry/cache/repocachemanager.go index ef7dd6a94..e5e203435 100644 --- a/registry/cache/repocachemanager.go +++ b/registry/cache/repocachemanager.go @@ -22,7 +22,7 @@ type imageToUpdate struct { previousRefresh time.Duration } -// repoCacheManager handles cache operations for a repository +// repoCacheManager handles cache operations for a container image repository type repoCacheManager struct { now time.Time repoID image.Name