diff --git a/registry/cache/repocachemanager.go b/registry/cache/repocachemanager.go
new file mode 100644
index 000000000..ef7dd6a94
--- /dev/null
+++ b/registry/cache/repocachemanager.go
@@ -0,0 +1,268 @@
+package cache
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"net"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/go-kit/kit/log"
+	"github.com/pkg/errors"
+
+	"github.com/weaveworks/flux/image"
+	"github.com/weaveworks/flux/registry"
+)
+
+type imageToUpdate struct {
+	ref             image.Ref
+	previousDigest  string
+	previousRefresh time.Duration
+}
+
+// repoCacheManager handles cache operations for a repository
+type repoCacheManager struct {
+	now         time.Time
+	repoID      image.Name
+	burst       int
+	trace       bool
+	logger      log.Logger
+	cacheClient Client
+	sync.Mutex
+}
+
+func newRepoCacheManager(now time.Time, repoId image.Name, burst int, trace bool, logger log.Logger,
+	cacheClient Client) *repoCacheManager {
+	return &repoCacheManager{
+		now:         now,
+		repoID:      repoId,
+		burst:       burst,
+		trace:       trace,
+		logger:      logger,
+		cacheClient: cacheClient,
+	}
+}
+
+// fetchRepository fetches the repository from the cache
+func (c *repoCacheManager) fetchRepository() (ImageRepository, error) {
+	var result ImageRepository
+	repoKey := NewRepositoryKey(c.repoID.CanonicalName())
+	bytes, _, err := c.cacheClient.GetKey(repoKey)
+	if err != nil {
+		return ImageRepository{}, err
+	}
+	if err = json.Unmarshal(bytes, &result); err != nil {
+		return ImageRepository{}, err
+	}
+	return result, nil
+}
+
+// storeRepository stores the repository from the cache
+func (c *repoCacheManager) storeRepository(repo ImageRepository) error {
+	repoKey := NewRepositoryKey(c.repoID.CanonicalName())
+	bytes, err := json.Marshal(repo)
+	if err != nil {
+		return err
+	}
+	return c.cacheClient.SetKey(repoKey, c.now.Add(repoRefresh), bytes)
+}
+
+// fetchImagesResult is the result of fetching images from the cache
+// invariant: len(imagesToUpdate) == imagesToUpdateRefreshCount + imagesToUpdateMissingCount
+type fetchImagesResult struct {
+	imagesFound                map[string]image.Info // images found in the cache
+	imagesToUpdate             []imageToUpdate       // images which need to be updated
+	imagesToUpdateRefreshCount int                   // number of imagesToUpdate which need updating due to their cache entry expiring
+	imagesToUpdateMissingCount int                   // number of imagesToUpdate which need updating due to being missing
+}
+
+// fetchImages attemps to fetch the images with the provided tags from the cache.
+// It returns the images found, those which require updating and details about
+// why they need to be updated.
+func (c *repoCacheManager) fetchImages(tags []string) (fetchImagesResult, error) {
+	images := map[string]image.Info{}
+
+	// Create a list of images that need updating
+	var toUpdate []imageToUpdate
+
+	// Counters for reporting what happened
+	var missing, refresh int
+	for _, tag := range tags {
+		if tag == "" {
+			return fetchImagesResult{}, fmt.Errorf("empty tag in fetched tags")
+		}
+
+		// See if we have the manifest already cached
+		newID := c.repoID.ToRef(tag)
+		key := NewManifestKey(newID.CanonicalRef())
+		bytes, deadline, err := c.cacheClient.GetKey(key)
+		// If err, then we don't have it yet. Update.
+		switch {
+		case err != nil: // by and large these are cache misses, but any error shall count as "not found"
+			if err != ErrNotCached {
+				c.logger.Log("warning", "error from cache", "err", err, "ref", newID)
+			}
+			missing++
+			toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: initialRefresh})
+		case len(bytes) == 0:
+			c.logger.Log("warning", "empty result from cache", "ref", newID)
+			missing++
+			toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: initialRefresh})
+		default:
+			var entry registry.ImageEntry
+			if err := json.Unmarshal(bytes, &entry); err == nil {
+				if c.trace {
+					c.logger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339))
+				}
+
+				if entry.ExcludedReason == "" {
+					images[tag] = entry.Info
+					if c.now.After(deadline) {
+						previousRefresh := minRefresh
+						lastFetched := entry.Info.LastFetched
+						if !lastFetched.IsZero() {
+							previousRefresh = deadline.Sub(lastFetched)
+						}
+						toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest})
+						refresh++
+					}
+				} else {
+					if c.trace {
+						c.logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason)
+					}
+					if c.now.After(deadline) {
+						toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: excludedRefresh})
+						refresh++
+					}
+				}
+			}
+		}
+	}
+
+	result := fetchImagesResult{
+		imagesFound:                images,
+		imagesToUpdate:             toUpdate,
+		imagesToUpdateRefreshCount: refresh,
+		imagesToUpdateMissingCount: missing,
+	}
+
+	return result, nil
+}
+
+// updateImages, refreshes the cache entries for the images passed. It may not succeed for all images.
+// It returns the values stored in cache, the number of images it succeeded for and the number
+// of images whose manifest wasn't found in the registry.
+func (c *repoCacheManager) updateImages(ctx context.Context, registryClient registry.Client, images []imageToUpdate) (map[string]image.Info, int, int) {
+	// The upper bound for concurrent fetches against a single host is
+	// w.Burst, so limit the number of fetching goroutines to that.
+	fetchers := make(chan struct{}, c.burst)
+	awaitFetchers := &sync.WaitGroup{}
+
+	ctxc, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	var successCount int
+	var manifestUnknownCount int
+	var result = map[string]image.Info{}
+	var warnAboutRateLimit sync.Once
+updates:
+	for _, up := range images {
+		// to avoid race condition, when accessing it in the go routine
+		upCopy := up
+		select {
+		case <-ctxc.Done():
+			break updates
+		case fetchers <- struct{}{}:
+		}
+		awaitFetchers.Add(1)
+		go func() {
+			defer func() { awaitFetchers.Done(); <-fetchers }()
+			entry, err := c.updateImage(ctxc, registryClient, upCopy)
+			if err != nil {
+				if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
+					// This was due to a context timeout, don't bother logging
+					return
+				}
+				switch {
+				case strings.Contains(err.Error(), "429"):
+					// abort the image tags fetching if we've been rate limited
+					warnAboutRateLimit.Do(func() {
+						c.logger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later")
+						cancel()
+					})
+				case strings.Contains(err.Error(), "manifest unknown"):
+					// Registry is corrupted, keep going, this manifest may not be relevant for automatic updates
+					c.Lock()
+					manifestUnknownCount++
+					c.Unlock()
+					c.logger.Log("warn", fmt.Sprintf("manifest for tag %s missing in repository %s", up.ref.Tag, up.ref.Name),
+						"impact", "flux will fail to auto-release workloads with matching images, ask the repository administrator to fix the inconsistency")
+				default:
+					c.logger.Log("err", err, "ref", up.ref)
+				}
+				return
+			}
+			c.Lock()
+			successCount++
+			if entry.ExcludedReason == "" {
+				result[upCopy.ref.Tag] = entry.Info
+			}
+			c.Unlock()
+		}()
+	}
+	awaitFetchers.Wait()
+	return result, successCount, manifestUnknownCount
+}
+
+func (c *repoCacheManager) updateImage(ctx context.Context, registryClient registry.Client, update imageToUpdate) (registry.ImageEntry, error) {
+	imageID := update.ref
+
+	if c.trace {
+		c.logger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
+	}
+
+	// Get the image from the remote
+	entry, err := registryClient.Manifest(ctx, imageID.Tag)
+	if err != nil {
+		return registry.ImageEntry{}, err
+	}
+
+	refresh := update.previousRefresh
+	reason := ""
+	switch {
+	case entry.ExcludedReason != "":
+		c.logger.Log("excluded", entry.ExcludedReason, "ref", imageID)
+		refresh = excludedRefresh
+		reason = "image is excluded"
+	case update.previousDigest == "":
+		entry.Info.LastFetched = c.now
+		refresh = update.previousRefresh
+		reason = "no prior cache entry for image"
+	case entry.Info.Digest == update.previousDigest:
+		entry.Info.LastFetched = c.now
+		refresh = clipRefresh(refresh * 2)
+		reason = "image digest is same"
+	default: // i.e., not excluded, but the digests differ -> the tag was moved
+		entry.Info.LastFetched = c.now
+		refresh = clipRefresh(refresh / 2)
+		reason = "image digest is different"
+	}
+
+	if c.trace {
+		c.logger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", c.now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason)
+	}
+
+	key := NewManifestKey(imageID.CanonicalRef())
+	// Write back to memcached
+	val, err := json.Marshal(entry)
+	if err != nil {
+		return registry.ImageEntry{}, err
+	}
+	err = c.cacheClient.SetKey(key, c.now.Add(refresh), val)
+	if err != nil {
+		return registry.ImageEntry{}, err
+	}
+	return entry, nil
+}
diff --git a/registry/cache/warming.go b/registry/cache/warming.go
index ed9bc1373..becc6d8a3 100644
--- a/registry/cache/warming.go
+++ b/registry/cache/warming.go
@@ -2,15 +2,13 @@ package cache
 
 import (
 	"context"
-	"encoding/json"
-	"fmt"
-	"net"
 	"strings"
 	"sync"
 	"time"
 
 	"github.com/go-kit/kit/log"
 	"github.com/pkg/errors"
+
 	"github.com/weaveworks/flux/image"
 	"github.com/weaveworks/flux/registry"
 )
@@ -157,17 +155,12 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
 		return
 	}
 
+	cacheManager := newRepoCacheManager(now, id, w.burst, w.Trace, errorLogger, w.cache)
+
 	// This is what we're going to write back to the cache
 	var repo ImageRepository
-	repoKey := NewRepositoryKey(id.CanonicalName())
-	bytes, _, err := w.cache.GetKey(repoKey)
-	if err == nil {
-		err = json.Unmarshal(bytes, &repo)
-	} else if err == ErrNotCached {
-		err = nil
-	}
-
-	if err != nil {
+	repo, err = cacheManager.fetchRepository()
+	if err != nil && err != ErrNotCached {
 		errorLogger.Log("err", errors.Wrap(err, "fetching previous result from cache"))
 		return
 	}
@@ -178,11 +171,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
 	// attempting to refresh that value. Whatever happens, at the end
 	// we'll write something back.
 	defer func() {
-		bytes, err := json.Marshal(repo)
-		if err == nil {
-			err = w.cache.SetKey(repoKey, now.Add(repoRefresh), bytes)
-		}
-		if err != nil {
+		if err := cacheManager.storeRepository(repo); err != nil {
 			errorLogger.Log("err", errors.Wrap(err, "writing result to cache"))
 		}
 	}()
@@ -196,187 +185,32 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
 		return
 	}
 
-	newImages := map[string]image.Info{}
-
-	// Create a list of images that need updating
-	type update struct {
-		ref             image.Ref
-		previousDigest  string
-		previousRefresh time.Duration
-	}
-	var toUpdate []update
-
-	// Counters for reporting what happened
-	var missing, refresh int
-	for _, tag := range tags {
-		if tag == "" {
-			errorLogger.Log("err", "empty tag in fetched tags", "tags", tags)
-			repo.LastError = "empty tag in fetched tags"
-			return // abort and let the error be written
-		}
-
-		// See if we have the manifest already cached
-		newID := id.ToRef(tag)
-		key := NewManifestKey(newID.CanonicalRef())
-		bytes, deadline, err := w.cache.GetKey(key)
-		// If err, then we don't have it yet. Update.
-		switch {
-		case err != nil: // by and large these are cache misses, but any error shall count as "not found"
-			if err != ErrNotCached {
-				errorLogger.Log("warning", "error from cache", "err", err, "ref", newID)
-			}
-			missing++
-			toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh})
-		case len(bytes) == 0:
-			errorLogger.Log("warning", "empty result from cache", "ref", newID)
-			missing++
-			toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh})
-		default:
-			var entry registry.ImageEntry
-			if err := json.Unmarshal(bytes, &entry); err == nil {
-				if w.Trace {
-					errorLogger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339))
-				}
-
-				if entry.ExcludedReason == "" {
-					newImages[tag] = entry.Info
-					if now.After(deadline) {
-						previousRefresh := minRefresh
-						lastFetched := entry.Info.LastFetched
-						if !lastFetched.IsZero() {
-							previousRefresh = deadline.Sub(lastFetched)
-						}
-						toUpdate = append(toUpdate, update{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest})
-						refresh++
-					}
-				} else {
-					if w.Trace {
-						logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason)
-					}
-					if now.After(deadline) {
-						toUpdate = append(toUpdate, update{ref: newID, previousRefresh: excludedRefresh})
-						refresh++
-					}
-				}
-			}
-		}
+	fetchResult, err := cacheManager.fetchImages(tags)
+	if err != nil {
+		logger.Log("err", err, "tags", tags)
+		repo.LastError = err.Error()
+		return // abort and let the error be written
 	}
+	newImages := fetchResult.imagesFound
 
-	var fetchMx sync.Mutex // also guards access to newImages
 	var successCount int
 	var manifestUnknownCount int
 
-	if len(toUpdate) > 0 {
-		logger.Log("info", "refreshing image", "image", id, "tag_count", len(tags), "to_update", len(toUpdate), "of_which_refresh", refresh, "of_which_missing", missing)
-
-		// The upper bound for concurrent fetches against a single host is
-		// w.Burst, so limit the number of fetching goroutines to that.
-		fetchers := make(chan struct{}, w.burst)
-		awaitFetchers := &sync.WaitGroup{}
-
-		ctxc, cancel := context.WithCancel(ctx)
-		var once sync.Once
-		defer cancel()
-
-	updates:
-		for _, up := range toUpdate {
-			select {
-			case <-ctxc.Done():
-				break updates
-			case fetchers <- struct{}{}:
-			}
-
-			awaitFetchers.Add(1)
-
-			go func(update update) {
-				defer func() { awaitFetchers.Done(); <-fetchers }()
-
-				imageID := update.ref
-
-				if w.Trace {
-					errorLogger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
-				}
-
-				// Get the image from the remote
-				entry, err := client.Manifest(ctxc, imageID.Tag)
-				if err != nil {
-					if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
-						// This was due to a context timeout, don't bother logging
-						return
-					}
-
-					switch {
-					case strings.Contains(err.Error(), "429"):
-						// abort the image tags fetching if we've been rate limited
-						once.Do(func() {
-							errorLogger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later")
-							cancel()
-						})
-					case strings.Contains(err.Error(), "manifest unknown"):
-						// Registry is corrupted, keep going, this manifest may not be relevant for automatic updates
-						fetchMx.Lock()
-						manifestUnknownCount++
-						fetchMx.Unlock()
-						errorLogger.Log("warn", fmt.Sprintf("manifest for tag %s missing in repository %s", imageID.Tag, imageID.Name),
-							"impact", "flux will fail to auto-release workloads with matching images, ask the repository administrator to fix the inconsistency")
-					default:
-						errorLogger.Log("err", err, "ref", imageID)
-					}
-					return
-				}
-
-				refresh := update.previousRefresh
-				reason := ""
-				switch {
-				case entry.ExcludedReason != "":
-					errorLogger.Log("excluded", entry.ExcludedReason, "ref", imageID)
-					refresh = excludedRefresh
-					reason = "image is excluded"
-				case update.previousDigest == "":
-					entry.Info.LastFetched = now
-					refresh = update.previousRefresh
-					reason = "no prior cache entry for image"
-				case entry.Info.Digest == update.previousDigest:
-					entry.Info.LastFetched = now
-					refresh = clipRefresh(refresh * 2)
-					reason = "image digest is same"
-				default: // i.e., not excluded, but the digests differ -> the tag was moved
-					entry.Info.LastFetched = now
-					refresh = clipRefresh(refresh / 2)
-					reason = "image digest is different"
-				}
-
-				if w.Trace {
-					errorLogger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason)
-				}
-
-				key := NewManifestKey(imageID.CanonicalRef())
-				// Write back to memcached
-				val, err := json.Marshal(entry)
-				if err != nil {
-					errorLogger.Log("err", err, "ref", imageID)
-					return
-				}
-				err = w.cache.SetKey(key, now.Add(refresh), val)
-				if err != nil {
-					errorLogger.Log("err", err, "ref", imageID)
-					return
-				}
-				fetchMx.Lock()
-				successCount++
-				if entry.ExcludedReason == "" {
-					newImages[imageID.Tag] = entry.Info
-				}
-				fetchMx.Unlock()
-			}(up)
+	if len(fetchResult.imagesToUpdate) > 0 {
+		logger.Log("info", "refreshing image", "image", id, "tag_count", len(tags),
+			"to_update", len(fetchResult.imagesToUpdate),
+			"of_which_refresh", fetchResult.imagesToUpdateRefreshCount, "of_which_missing", fetchResult.imagesToUpdateMissingCount)
+		var images map[string]image.Info
+		images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, client, fetchResult.imagesToUpdate)
+		for k, v := range images {
+			newImages[k] = v
 		}
-		awaitFetchers.Wait()
-		logger.Log("updated", id.String(), "successful", successCount, "attempted", len(toUpdate))
+		logger.Log("updated", id.String(), "successful", successCount, "attempted", len(fetchResult.imagesToUpdate))
 	}
 
 	// We managed to fetch new metadata for everything we needed.
 	// Ratchet the result forward.
-	if successCount+manifestUnknownCount == len(toUpdate) {
+	if successCount+manifestUnknownCount == len(fetchResult.imagesToUpdate) {
 		repo = ImageRepository{
 			LastUpdate: time.Now(),
 			RepositoryMetadata: image.RepositoryMetadata{