diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 86d91f6cc..3e714ea19 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -44,10 +44,11 @@ var version = "unversioned" const ( product = "weave-flux" - // The number of connections chosen for memcache and remote GETs should match for best performance (hence the single hardcoded value) - // Value chosen through performance tests on sock-shop. I was unable to get higher performance than this. - defaultRemoteConnections = 125 // Chosen performance tests on sock-shop. Unable to get higher performance than this. - defaultMemcacheConnections = 10 // This doesn't need to be high. The user is only requesting one tag/image at a time. + // This is used as the "burst" value for rate limiting, and + // therefore also as the limit to the number of concurrent fetches + // and memcached connections, since these in general can't do any + // more work than is allowed by the burst amount. + defaultRemoteConnections = 10 // There are running systems that assume these defaults (by not // supplying a value for one or both). Don't change them. @@ -98,9 +99,8 @@ func main() { memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.") memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.") memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.") - registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.") registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images") - registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host") + registryRPS = fs.Float64("registry-rps", 50, "maximum registry requests per second per host") registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache") registryTrace = fs.Bool("registry-trace", false, "output trace of image registry requests to log") registryInsecure = fs.StringSlice("registry-insecure-host", []string{}, "use HTTP for this image registry domain (e.g., registry.cluster.local), instead of HTTPS") @@ -119,7 +119,10 @@ func main() { token = fs.String("token", "", "Authentication token for upstream service") dockerConfig = fs.String("docker-config", "", "path to a docker config to use for image registry credentials") + + _ = fs.Duration("registry-cache-expiry", 0, "") ) + fs.MarkDeprecated("registry-cache-expiry", "no longer used; cache entries are expired adaptively according to how often they change") err := fs.Parse(os.Args[1:]) switch { @@ -281,7 +284,6 @@ func main() { memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{ Host: *memcachedHostname, Service: *memcachedService, - Expiry: *registryCacheExpiry, Timeout: *memcachedTimeout, UpdateInterval: 1 * time.Minute, Logger: log.With(logger, "component", "memcached"), @@ -298,8 +300,9 @@ func main() { // Remote client, for warmer to refresh entries registryLogger := log.With(logger, "component", "registry") registryLimits := ®istryMiddleware.RateLimiters{ - RPS: *registryRPS, - Burst: *registryBurst, + RPS: *registryRPS, + Burst: *registryBurst, + Logger: log.With(logger, "component", "ratelimiter"), } remoteFactory := ®istry.RemoteClientFactory{ Logger: registryLogger, @@ -441,6 +444,7 @@ func main() { cacheWarmer.Notify = daemon.AskForImagePoll cacheWarmer.Priority = daemon.ImageRefresh + cacheWarmer.Trace = *registryTrace shutdownWg.Add(1) go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds) diff --git a/image/image.go b/image/image.go index 6311b6fa7..52881464d 100644 --- a/image/image.go +++ b/image/image.go @@ -237,6 +237,8 @@ type Info struct { ImageID string `json:",omitempty"` // the time at which the image pointed at was created CreatedAt time.Time `json:",omitempty"` + // the last time this image manifest was fetched + LastFetched time.Time `json:",omitempty"` } // MarshalJSON returns the Info value in JSON (as bytes). It is @@ -245,14 +247,18 @@ type Info struct { // detect. func (im Info) MarshalJSON() ([]byte, error) { type InfoAlias Info // alias to shed existing MarshalJSON implementation - var t string + var ca, lf string if !im.CreatedAt.IsZero() { - t = im.CreatedAt.UTC().Format(time.RFC3339Nano) + ca = im.CreatedAt.UTC().Format(time.RFC3339Nano) + } + if !im.LastFetched.IsZero() { + lf = im.LastFetched.UTC().Format(time.RFC3339Nano) } encode := struct { InfoAlias - CreatedAt string `json:",omitempty"` - }{InfoAlias(im), t} + CreatedAt string `json:",omitempty"` + LastFetched string `json:",omitempty"` + }{InfoAlias(im), ca, lf} return json.Marshal(encode) } @@ -262,18 +268,28 @@ func (im *Info) UnmarshalJSON(b []byte) error { type InfoAlias Info unencode := struct { InfoAlias - CreatedAt string `json:",omitempty"` + CreatedAt string `json:",omitempty"` + LastFetched string `json:",omitempty"` }{} json.Unmarshal(b, &unencode) *im = Info(unencode.InfoAlias) - if unencode.CreatedAt == "" { - im.CreatedAt = time.Time{} + + var err error + if err = decodeTime(unencode.CreatedAt, &im.CreatedAt); err == nil { + err = decodeTime(unencode.LastFetched, &im.LastFetched) + } + return err +} + +func decodeTime(s string, t *time.Time) error { + if s == "" { + *t = time.Time{} } else { - t, err := time.Parse(time.RFC3339, unencode.CreatedAt) + var err error + *t, err = time.Parse(time.RFC3339, s) if err != nil { return err } - im.CreatedAt = t.UTC() } return nil } @@ -311,7 +327,7 @@ func NewerBySemver(lhs, rhs *Info) bool { } // Sort orders the given image infos according to `newer` func. -func Sort(infos []Info, newer func (a, b *Info) bool) { +func Sort(infos []Info, newer func(a, b *Info) bool) { if newer == nil { newer = NewerByCreated } diff --git a/image/image_test.go b/image/image_test.go index 0c93bed68..79a4cec55 100644 --- a/image/image_test.go +++ b/image/image_test.go @@ -3,7 +3,6 @@ package image import ( "encoding/json" "fmt" - "reflect" "strconv" "testing" "time" @@ -149,9 +148,11 @@ func mustMakeInfo(ref string, created time.Time) Info { func TestImageInfoSerialisation(t *testing.T) { t0 := time.Now().UTC() // UTC so it has nil location, otherwise it won't compare + t1 := time.Now().Add(5 * time.Minute).UTC() info := mustMakeInfo("my/image:tag", t0) info.Digest = "sha256:digest" info.ImageID = "sha256:layerID" + info.LastFetched = t1 bytes, err := json.Marshal(info) if err != nil { t.Fatal(err) @@ -160,9 +161,7 @@ func TestImageInfoSerialisation(t *testing.T) { if err = json.Unmarshal(bytes, &info1); err != nil { t.Fatal(err) } - if !reflect.DeepEqual(info, info1) { - t.Errorf("roundtrip serialisation failed:\n original: %#v\nroundtripped: %#v", info, info1) - } + assert.Equal(t, info, info1) } func TestImageInfoCreatedAtZero(t *testing.T) { @@ -248,4 +247,3 @@ func reverse(imgs []Info) { imgs[i], imgs[opp] = imgs[opp], imgs[i] } } - diff --git a/registry/cache/cache.go b/registry/cache/cache.go index eabaabc08..1b4a0fa2b 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -8,11 +8,13 @@ import ( ) type Reader interface { + // GetKey gets the value at a key, along with its refresh deadline GetKey(k Keyer) ([]byte, time.Time, error) } type Writer interface { - SetKey(k Keyer, v []byte) error + // SetKey sets the value at a key, along with its refresh deadline + SetKey(k Keyer, deadline time.Time, v []byte) error } type Client interface { diff --git a/registry/cache/memcached/memcached.go b/registry/cache/memcached/memcached.go index adbbddfce..e8dfd6b68 100644 --- a/registry/cache/memcached/memcached.go +++ b/registry/cache/memcached/memcached.go @@ -1,3 +1,14 @@ +/* This package implements an image DB cache using memcached. + +Items are given an expiry based on their refresh deadline, with a +minimum duration to try and ensure things will expire well after they +would have been refreshed (i.e., only if they truly need garbage +collection). + +memcached will still evict things when under memory pressure. We can +recover from that -- we'll just get a cache miss, and fetch it again. + +*/ package memcached import ( @@ -16,7 +27,8 @@ import ( ) const ( - DefaultExpiry = time.Hour + // The minimum expiry given to an entry. + MinExpiry = time.Hour ) // MemcacheClient is a memcache client that gets its server list from SRV @@ -26,7 +38,6 @@ type MemcacheClient struct { serverList *memcache.ServerList hostname string service string - ttl time.Duration logger log.Logger quit chan struct{} @@ -37,7 +48,6 @@ type MemcacheClient struct { type MemcacheConfig struct { Host string Service string - Expiry time.Duration Timeout time.Duration UpdateInterval time.Duration Logger log.Logger @@ -55,15 +65,10 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient { serverList: &servers, hostname: config.Host, service: config.Service, - ttl: config.Expiry, logger: config.Logger, quit: make(chan struct{}), } - if newClient.ttl == 0 { - newClient.ttl = DefaultExpiry - } - err := newClient.updateMemcacheServers() if err != nil { config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host)) @@ -86,25 +91,14 @@ func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) *M serverList: &servers, hostname: config.Host, service: config.Service, - ttl: config.Expiry, logger: config.Logger, quit: make(chan struct{}), } - if newClient.ttl == 0 { - newClient.ttl = DefaultExpiry - } - return newClient } -// The memcached client does not report the expiry when you GET a -// value, but we do want to know it, so we can refresh items that are -// soon to expire (and ignore items that are not). For that reason, we -// prepend the expiry to the value when setting, and read it back when -// getting. - -// GetKey gets the value and its expiry time from the cache. +// GetKey gets the value and its refresh deadline from the cache. func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) { cacheItem, err := c.client.Get(k.Key()) if err != nil { @@ -116,19 +110,25 @@ func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) { return []byte{}, time.Time{}, err } } - exTime := binary.BigEndian.Uint32(cacheItem.Value) - return cacheItem.Value[4:], time.Unix(int64(exTime), 0), nil + deadlineTime := binary.BigEndian.Uint32(cacheItem.Value) + return cacheItem.Value[4:], time.Unix(int64(deadlineTime), 0), nil } -// SetKey sets the value at a key. -func (c *MemcacheClient) SetKey(k cache.Keyer, v []byte) error { - exTime := time.Now().Add(c.ttl).Unix() - exBytes := make([]byte, 4, 4) - binary.BigEndian.PutUint32(exBytes, uint32(exTime)) +// SetKey sets the value and its refresh deadline at a key. NB the key +// expiry is set _longer_ than the deadline, to give us a grace period +// in which to refresh the value. +func (c *MemcacheClient) SetKey(k cache.Keyer, refreshDeadline time.Time, v []byte) error { + expiry := refreshDeadline.Sub(time.Now()) * 2 + if expiry < MinExpiry { + expiry = MinExpiry + } + + deadlineBytes := make([]byte, 4, 4) + binary.BigEndian.PutUint32(deadlineBytes, uint32(refreshDeadline.Unix())) if err := c.client.Set(&memcache.Item{ Key: k.Key(), - Value: append(exBytes, v...), - Expiration: int32(exTime), + Value: append(deadlineBytes, v...), + Expiration: int32(expiry.Seconds()), }); err != nil { c.logger.Log("err", errors.Wrap(err, "storing in memcache")) return err diff --git a/registry/cache/memcached/memcached_test.go b/registry/cache/memcached/memcached_test.go index 9c0b6d408..bd4ec413c 100644 --- a/registry/cache/memcached/memcached_test.go +++ b/registry/cache/memcached/memcached_test.go @@ -34,20 +34,18 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) { }, strings.Fields(*memcachedIPs)...) // Set some dummy data - err := mc.SetKey(key, val) + now := time.Now().Round(time.Second) + err := mc.SetKey(key, now, val) if err != nil { t.Fatal(err) } - cached, expiry, err := mc.GetKey(key) + cached, deadline, err := mc.GetKey(key) if err != nil { t.Fatal(err) } - if expiry.IsZero() { - t.Fatal("Time should not be zero") - } - if expiry.Before(time.Now()) { - t.Fatal("Expiry should be in the future") + if !deadline.Equal(now) { + t.Fatalf("Deadline should be %s, but is %s", now.String(), deadline.String()) } if string(cached) != string(val) { diff --git a/registry/cache/monitoring.go b/registry/cache/monitoring.go index 38d707ff4..359d95a8b 100644 --- a/registry/cache/monitoring.go +++ b/registry/cache/monitoring.go @@ -40,12 +40,12 @@ func (i *instrumentedClient) GetKey(k Keyer) (_ []byte, ex time.Time, err error) return i.next.GetKey(k) } -func (i *instrumentedClient) SetKey(k Keyer, v []byte) (err error) { +func (i *instrumentedClient) SetKey(k Keyer, d time.Time, v []byte) (err error) { defer func(begin time.Time) { cacheRequestDuration.With( fluxmetrics.LabelMethod, "SetKey", fluxmetrics.LabelSuccess, fmt.Sprint(err == nil), ).Observe(time.Since(begin).Seconds()) }(time.Now()) - return i.next.SetKey(k, v) + return i.next.SetKey(k, d, v) } diff --git a/registry/cache/warming.go b/registry/cache/warming.go index 77f216315..78fa68dda 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -14,15 +14,44 @@ import ( "github.com/weaveworks/flux/registry" ) -const refreshWhenExpiryWithin = time.Minute const askForNewImagesInterval = time.Minute +// start off assuming an image will change about an hour from first +// seeing it +const initialRefresh = 1 * time.Hour + +// never try to refresh a tag faster than this +const minRefresh = 5 * time.Minute + +// never set a refresh deadline longer than this +const maxRefresh = 7 * 24 * time.Hour + +// excluded images get an constant, fairly long refresh deadline; we +// don't expect them to become usable e.g., change architecture. +const excludedRefresh = 24 * time.Hour + +// the whole set of image manifests for a repo gets a long refresh; in +// general we write it back every time we go 'round the loop, so this +// is mainly for the effect of making garbage collection less likely. +const repoRefresh = maxRefresh + +func clipRefresh(r time.Duration) time.Duration { + if r > maxRefresh { + return maxRefresh + } + if r < minRefresh { + return minRefresh + } + return r +} + // Warmer refreshes the information kept in the cache from remote // registries. type Warmer struct { clientFactory registry.ClientFactory cache Client burst int + Trace bool Priority chan image.Name Notify func() } @@ -66,7 +95,7 @@ func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGrou priorityWarm := func(name image.Name) { logger.Log("priority", name.String()) if creds, ok := imageCreds[name]; ok { - w.warm(ctx, logger, name, creds) + w.warm(ctx, time.Now(), logger, name, creds) } else { logger.Log("priority", name.String(), "err", "no creds available") } @@ -92,7 +121,7 @@ func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGrou if len(backlog) > 0 { im := backlog[0] backlog = backlog[1:] - w.warm(ctx, logger, im.Name, im.Credentials) + w.warm(ctx, time.Now(), logger, im.Name, im.Credentials) } else { select { case <-stop: @@ -118,8 +147,9 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem { return backlog } -func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, creds registry.Credentials) { +func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id image.Name, creds registry.Credentials) { errorLogger := log.With(logger, "canonical_name", id.CanonicalName(), "auth", creds) + client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds) if err != nil { errorLogger.Log("err", err.Error()) @@ -149,7 +179,7 @@ func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, cre defer func() { bytes, err := json.Marshal(repo) if err == nil { - err = w.cache.SetKey(repoKey, bytes) + err = w.cache.SetKey(repoKey, now.Add(repoRefresh), bytes) } if err != nil { errorLogger.Log("err", errors.Wrap(err, "writing result to cache")) @@ -167,63 +197,99 @@ func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, cre newImages := map[string]image.Info{} - // Create a list of manifests that need updating - var toUpdate []image.Ref - var missing, expired int + // Create a list of images that need updating + type update struct { + ref image.Ref + previousDigest string + previousRefresh time.Duration + } + var toUpdate []update + + // Counters for reporting what happened + var missing, refresh int for _, tag := range tags { + if tag == "" { + errorLogger.Log("err", "empty tag in fetched tags", "tags", tags) + repo.LastError = "empty tag in fetched tags" + return // abort and let the error be written + } + // See if we have the manifest already cached newID := id.ToRef(tag) key := NewManifestKey(newID.CanonicalRef()) - bytes, expiry, err := w.cache.GetKey(key) + bytes, deadline, err := w.cache.GetKey(key) // If err, then we don't have it yet. Update. switch { - case tag == "": - errorLogger.Log("err", "empty tag in fetched tags", "tags", tags) - repo.LastError = "empty tag in fetched tags" - return // abort and let the error be written - case err != nil: // by and large these are cache misses, but any error will do + case err != nil: // by and large these are cache misses, but any error shall count as "not found" + if err != ErrNotCached { + errorLogger.Log("warning", "error from cache", "err", err, "ref", newID) + } missing++ - case time.Until(expiry) < refreshWhenExpiryWithin: - expired++ + toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh}) case len(bytes) == 0: - errorLogger.Log("warning", "empty result from cache", "tag", tag) + errorLogger.Log("warning", "empty result from cache", "ref", newID) missing++ + toUpdate = append(toUpdate, update{ref: newID, previousRefresh: initialRefresh}) default: var entry registry.ImageEntry if err := json.Unmarshal(bytes, &entry); err == nil { + if w.Trace { + errorLogger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339)) + } + if entry.ExcludedReason == "" { newImages[tag] = entry.Info + if now.After(deadline) { + previousRefresh := minRefresh + lastFetched := entry.Info.LastFetched + if !lastFetched.IsZero() { + previousRefresh = deadline.Sub(lastFetched) + } + toUpdate = append(toUpdate, update{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest}) + refresh++ + } } else { - logger.Log("info", "excluded in cache", "ref", newID) + if w.Trace { + logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason) + } + if now.After(deadline) { + toUpdate = append(toUpdate, update{ref: newID, previousRefresh: excludedRefresh}) + refresh++ + } } - continue // i.e., no need to update this one } - missing++ } - toUpdate = append(toUpdate, newID) } + var fetchMx sync.Mutex // also guards access to newImages var successCount int if len(toUpdate) > 0 { - logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing) - var successMx sync.Mutex + logger.Log("info", "refreshing image", "image", id, "tag_count", len(tags), "to_update", len(toUpdate), "of_which_refresh", refresh, "of_which_missing", missing) // The upper bound for concurrent fetches against a single host is // w.Burst, so limit the number of fetching goroutines to that. fetchers := make(chan struct{}, w.burst) awaitFetchers := &sync.WaitGroup{} + awaitFetchers.Add(len(toUpdate)) + updates: - for _, imID := range toUpdate { + for _, up := range toUpdate { select { case <-ctx.Done(): break updates case fetchers <- struct{}{}: } - awaitFetchers.Add(1) - go func(imageID image.Ref) { + go func(update update) { defer func() { awaitFetchers.Done(); <-fetchers }() + + imageID := update.ref + + if w.Trace { + errorLogger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String()) + } + // Get the image from the remote entry, err := client.Manifest(ctx, imageID.Tag) if err != nil { @@ -234,8 +300,30 @@ func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, cre errorLogger.Log("err", err, "ref", imageID) return } - if entry.ExcludedReason != "" { - errorLogger.Log("excluded", entry.ExcludedReason, "ref", imageID.Tag) + + refresh := update.previousRefresh + reason := "" + switch { + case entry.ExcludedReason != "": + errorLogger.Log("excluded", entry.ExcludedReason, "ref", imageID) + refresh = excludedRefresh + reason = "image is excluded" + case update.previousDigest == "": + entry.Info.LastFetched = now + refresh = update.previousRefresh + reason = "no prior cache entry for image" + case entry.Info.Digest == update.previousDigest: + entry.Info.LastFetched = now + refresh = clipRefresh(refresh * 2) + reason = "image digest is same" + default: // i.e., not excluded, but the digests differ -> the tag was moved + entry.Info.LastFetched = now + refresh = clipRefresh(refresh / 2) + reason = "image digest is different" + } + + if w.Trace { + errorLogger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason) } key := NewManifestKey(imageID.CanonicalRef()) @@ -245,21 +333,21 @@ func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, cre errorLogger.Log("err", err, "ref", imageID) return } - err = w.cache.SetKey(key, val) + err = w.cache.SetKey(key, now.Add(refresh), val) if err != nil { errorLogger.Log("err", err, "ref", imageID) return } - successMx.Lock() + fetchMx.Lock() successCount++ if entry.ExcludedReason == "" { newImages[imageID.Tag] = entry.Info } - successMx.Unlock() - }(imID) + fetchMx.Unlock() + }(up) } awaitFetchers.Wait() - logger.Log("updated", id.String(), "count", successCount) + logger.Log("updated", id.String(), "successful", successCount, "attempted", len(toUpdate)) } // We managed to fetch new metadata for everything we were missing @@ -269,6 +357,10 @@ func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, cre LastUpdate: time.Now(), Images: newImages, } + // If we got through all that without bumping into `HTTP 429 + // Too Many Requests` (or other problems), we can potentially + // creep the rate limit up + w.clientFactory.Succeed(id.CanonicalName()) } if w.Notify != nil { diff --git a/registry/cache/warming_test.go b/registry/cache/warming_test.go index d62ce4e78..8571d86e1 100644 --- a/registry/cache/warming_test.go +++ b/registry/cache/warming_test.go @@ -7,24 +7,41 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" "github.com/weaveworks/flux/image" "github.com/weaveworks/flux/registry" "github.com/weaveworks/flux/registry/mock" ) +type entry struct { + b []byte + d time.Time +} + type mem struct { - kv map[string][]byte + kv map[string]entry mx sync.Mutex } -func (c *mem) SetKey(k Keyer, v []byte) error { +var ( + ref image.Ref + repo image.Name +) + +func init() { + ref, _ = image.ParseRef("example.com/path/image:tag") + repo = ref.Name +} + +func (c *mem) SetKey(k Keyer, deadline time.Time, v []byte) error { + println("set key", k.Key(), deadline.Format(time.RFC3339)) c.mx.Lock() defer c.mx.Unlock() if c.kv == nil { - c.kv = make(map[string][]byte) + c.kv = make(map[string]entry) } - c.kv[k.Key()] = v + c.kv[k.Key()] = entry{v, deadline} return nil } @@ -32,12 +49,14 @@ func (c *mem) GetKey(k Keyer) ([]byte, time.Time, error) { c.mx.Lock() defer c.mx.Unlock() if c.kv == nil { - c.kv = make(map[string][]byte) + c.kv = make(map[string]entry) } - if v, ok := c.kv[k.Key()]; ok { - return v, time.Now().Add(time.Hour), nil + if e, ok := c.kv[k.Key()]; ok { + println("get key", k.Key(), e.d.Format(time.RFC3339)) + return e.b, e.d, nil } + println("get key", k.Key(), "nil") return nil, time.Time{}, ErrNotCached } @@ -45,17 +64,65 @@ func (c *mem) GetKey(k Keyer) ([]byte, time.Time, error) { // cache.Registry work together as intended: that is, if you ask the // warmer to fetch information, the cached gets populated, and the // Registry implementation will see it. -func TestWarm(t *testing.T) { - ref, _ := image.ParseRef("example.com/path/image:tag") - repo := ref.Name +func TestWarmThenQuery(t *testing.T) { + digest := "abc" + warmer, cache := setup(t, &digest) + logger := log.NewNopLogger() + + now := time.Now() + warmer.warm(context.TODO(), now, logger, repo, registry.NoCredentials()) + + registry := &Cache{Reader: cache} + repoInfo, err := registry.GetRepositoryImages(ref.Name) + assert.NoError(t, err) + // Otherwise, we should get what we put in ... + assert.Len(t, repoInfo, 1) + assert.Equal(t, ref.String(), repoInfo[0].ID.String()) +} + +func TestRefreshDeadline(t *testing.T) { + digest := "abc" + warmer, cache := setup(t, &digest) logger := log.NewNopLogger() + now0 := time.Now() + warmer.warm(context.TODO(), now0, logger, repo, registry.NoCredentials()) + + // We should see that there's an entry for the manifest, and that + // it's set to be refreshed + k := NewManifestKey(ref.CanonicalRef()) + _, deadline0, err := cache.GetKey(k) + assert.NoError(t, err) + assert.True(t, deadline0.After(now0)) + + // Fast-forward to after the refresh deadline; check that the + // entry is given a longer deadline + now1 := deadline0.Add(time.Minute) + warmer.warm(context.TODO(), now1, logger, repo, registry.NoCredentials()) + _, deadline1, err := cache.GetKey(k) + assert.NoError(t, err) + assert.True(t, deadline1.After(now1)) + assert.True(t, deadline0.Sub(now0) < deadline1.Sub(now1), "%s < %s", deadline0.Sub(now0), deadline1.Sub(now1)) + + // Fast-forward again, check that a _differing_ manifest results + // in a shorter deadline + digest = "cba" // <-- means manifest points at a different image + now2 := deadline1.Add(time.Minute) + warmer.warm(context.TODO(), now2, logger, repo, registry.NoCredentials()) + _, deadline2, err := cache.GetKey(k) + assert.NoError(t, err) + assert.True(t, deadline1.Sub(now1) > deadline2.Sub(now2), "%s > %s", deadline1.Sub(now1), deadline2.Sub(now2)) +} + +func setup(t *testing.T, digest *string) (*Warmer, Client) { client := &mock.Client{ TagsFn: func() ([]string, error) { + println("asked for tags") return []string{"tag"}, nil }, ManifestFn: func(tag string) (registry.ImageEntry, error) { + println("asked for manifest", tag) if tag != "tag" { t.Errorf("remote client was asked for %q instead of %q", tag, "tag") } @@ -63,6 +130,7 @@ func TestWarm(t *testing.T) { Info: image.Info{ ID: ref, CreatedAt: time.Now(), + Digest: *digest, }, }, nil }, @@ -70,19 +138,5 @@ func TestWarm(t *testing.T) { factory := &mock.ClientFactory{Client: client} c := &mem{} warmer := &Warmer{clientFactory: factory, cache: c, burst: 10} - warmer.warm(context.TODO(), logger, repo, registry.NoCredentials()) - - registry := &Cache{Reader: c} - repoInfo, err := registry.GetRepositoryImages(ref.Name) - if err != nil { - t.Error(err) - } - // Otherwise, we should get what we put in ... - if len(repoInfo) != 1 { - t.Errorf("expected an image.Info item; got %#v", repoInfo) - } else { - if got := repoInfo[0].ID.String(); got != ref.String() { - t.Errorf("expected image %q from registry cache; got %q", ref.String(), got) - } - } + return warmer, c } diff --git a/registry/client.go b/registry/client.go index 5b39b47da..8b4263e4e 100644 --- a/registry/client.go +++ b/registry/client.go @@ -69,6 +69,7 @@ type Client interface { // implementations. type ClientFactory interface { ClientFor(image.CanonicalName, Credentials) (Client, error) + Succeed(image.CanonicalName) } type Remote struct { diff --git a/registry/client_factory.go b/registry/client_factory.go index 1a93a1942..db9f8d33c 100644 --- a/registry/client_factory.go +++ b/registry/client_factory.go @@ -108,6 +108,13 @@ func (f *RemoteClientFactory) ClientFor(repo image.CanonicalName, creds Credenti return NewInstrumentedClient(client), nil } +// Succeed exists merely so that the user of the ClientFactory can +// bump rate limits up if a repo's metadata has successfully been +// fetched. +func (f *RemoteClientFactory) Succeed(repo image.CanonicalName) { + f.Limiters.Recover(repo.Domain) +} + // store adapts a set of pre-selected creds to be an // auth.CredentialsStore type store struct { diff --git a/registry/middleware/rate_limiter.go b/registry/middleware/rate_limiter.go index 87ef3213a..ef8b22418 100644 --- a/registry/middleware/rate_limiter.go +++ b/registry/middleware/rate_limiter.go @@ -1,18 +1,94 @@ package middleware import ( - "context" "net/http" + "strconv" "sync" + "github.com/go-kit/kit/log" "github.com/pkg/errors" "golang.org/x/time/rate" ) +const ( + minLimit = 0.1 + backOffBy = 2.0 + recoverBy = 1.5 +) + +// RateLimiters keeps track of per-host rate limiting for an arbitrary +// set of hosts. + +// Use `*RateLimiter.RoundTripper(host)` to obtain a rate limited HTTP +// transport for an operation. The RoundTripper will react to a `HTTP +// 429 Too many requests` response by reducing the limit for that +// host. It will only do so once, so that concurrent requests don't +// *also* reduce the limit. +// +// Call `*RateLimiter.Recover(host)` when an operation has succeeded +// without incident, which will increase the rate limit modestly back +// towards the given ideal. type RateLimiters struct { - RPS, Burst int - perHost map[string]*rate.Limiter - mu sync.Mutex + RPS float64 + Burst int + Logger log.Logger + perHost map[string]*rate.Limiter + mu sync.Mutex +} + +func (limiters *RateLimiters) clip(limit float64) float64 { + if limit < minLimit { + return minLimit + } + if limit > limiters.RPS { + return limiters.RPS + } + return limit +} + +// BackOff can be called to explicitly reduce the limit for a +// particular host. Usually this isn't necessary since a RoundTripper +// obtained for a host will respond to `HTTP 429` by doing this for +// you. +func (limiters *RateLimiters) BackOff(host string) { + limiters.mu.Lock() + defer limiters.mu.Unlock() + + var limiter *rate.Limiter + if limiters.perHost == nil { + limiters.perHost = map[string]*rate.Limiter{} + } + if rl, ok := limiters.perHost[host]; ok { + limiter = rl + } else { + limiter = rate.NewLimiter(rate.Limit(limiters.RPS), limiters.Burst) + limiters.perHost[host] = limiter + } + + oldLimit := float64(limiter.Limit()) + newLimit := limiters.clip(oldLimit / backOffBy) + if oldLimit != newLimit && limiters.Logger != nil { + limiters.Logger.Log("info", "reducing rate limit", "host", host, "limit", strconv.FormatFloat(newLimit, 'f', 2, 64)) + } + limiter.SetLimit(rate.Limit(newLimit)) +} + +// Recover should be called when a use of a RoundTripper has +// succeeded, to bump the limit back up again. +func (limiters *RateLimiters) Recover(host string) { + limiters.mu.Lock() + defer limiters.mu.Unlock() + if limiters.perHost == nil { + return + } + if limiter, ok := limiters.perHost[host]; ok { + oldLimit := float64(limiter.Limit()) + newLimit := limiters.clip(oldLimit * recoverBy) + if newLimit != oldLimit && limiters.Logger != nil { + limiters.Logger.Log("info", "increasing rate limit", "host", host, "limit", strconv.FormatFloat(newLimit, 'f', 2, 64)) + } + limiter.SetLimit(rate.Limit(newLimit)) + } } // Limit returns a RoundTripper for a particular host. We expect to do @@ -28,32 +104,35 @@ func (limiters *RateLimiters) RoundTripper(rt http.RoundTripper, host string) ht rl := rate.NewLimiter(rate.Limit(limiters.RPS), limiters.Burst) limiters.perHost[host] = rl } + var reduceOnce sync.Once return &RoundTripRateLimiter{ rl: limiters.perHost[host], tx: rt, + slowDown: func() { + reduceOnce.Do(func() { limiters.BackOff(host) }) + }, } } type RoundTripRateLimiter struct { - rl *rate.Limiter - tx http.RoundTripper + rl *rate.Limiter + tx http.RoundTripper + slowDown func() } func (t *RoundTripRateLimiter) RoundTrip(r *http.Request) (*http.Response, error) { // Wait errors out if the request cannot be processed within - // the deadline. This is preemptive, instead of waiting the + // the deadline. This is pre-emptive, instead of waiting the // entire duration. if err := t.rl.Wait(r.Context()); err != nil { return nil, errors.Wrap(err, "rate limited") } - return t.tx.RoundTrip(r) -} - -type ContextRoundTripper struct { - Transport http.RoundTripper - Ctx context.Context -} - -func (rt *ContextRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - return rt.Transport.RoundTrip(r.WithContext(rt.Ctx)) + resp, err := t.tx.RoundTrip(r) + if err != nil { + return nil, err + } + if resp.StatusCode == http.StatusTooManyRequests { + t.slowDown() + } + return resp, err } diff --git a/registry/mock/mock.go b/registry/mock/mock.go index 9f351ec92..ecd760630 100644 --- a/registry/mock/mock.go +++ b/registry/mock/mock.go @@ -2,6 +2,7 @@ package mock import ( "context" + "github.com/pkg/errors" "github.com/weaveworks/flux/image" @@ -32,6 +33,10 @@ func (m *ClientFactory) ClientFor(repository image.CanonicalName, creds registry return m.Client, m.Err } +func (_ *ClientFactory) Succeed(_ image.CanonicalName) { + return +} + var _ registry.ClientFactory = &ClientFactory{} type Registry struct {