Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1970 from 2opremio/set-warmer-timeouts
Browse files Browse the repository at this point in the history
 Add a timeout to all registry requests
  • Loading branch information
Alfonso Acosta authored Apr 24, 2019
2 parents 2a948a1 + d1d3147 commit d4b0ae7
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 36 deletions.
15 changes: 4 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"]
[[constraint]]
name = "github.com/docker/distribution"
branch = "master"
source = "github.com/2opremio/distribution"

# Pin to master branch until there is a more recent stable release:
# https://github.com/prometheus/client_golang/issues/375
Expand Down
72 changes: 52 additions & 20 deletions registry/cache/repocachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,35 @@ type imageToUpdate struct {

// repoCacheManager handles cache operations for a container image repository
type repoCacheManager struct {
now time.Time
repoID image.Name
burst int
trace bool
logger log.Logger
cacheClient Client
now time.Time
repoID image.Name
client registry.Client
clientTimeout time.Duration
burst int
trace bool
logger log.Logger
cacheClient Client
sync.Mutex
}

func newRepoCacheManager(now time.Time, repoId image.Name, burst int, trace bool, logger log.Logger,
cacheClient Client) *repoCacheManager {
return &repoCacheManager{
now: now,
repoID: repoId,
burst: burst,
trace: trace,
logger: logger,
cacheClient: cacheClient,
func newRepoCacheManager(now time.Time,
repoID image.Name, clientFactory registry.ClientFactory, creds registry.Credentials, repoClientTimeout time.Duration,
burst int, trace bool, logger log.Logger, cacheClient Client) (*repoCacheManager, error) {
client, err := clientFactory.ClientFor(repoID.CanonicalName(), creds)
if err != nil {
return nil, err
}
manager := &repoCacheManager{
now: now,
repoID: repoID,
client: client,
clientTimeout: repoClientTimeout,
burst: burst,
trace: trace,
logger: logger,
cacheClient: cacheClient,
}
return manager, nil
}

// fetchRepository fetches the repository from the cache
Expand All @@ -59,6 +69,17 @@ func (c *repoCacheManager) fetchRepository() (ImageRepository, error) {
return result, nil
}

// getTags gets the tags from the repository
func (c *repoCacheManager) getTags(ctx context.Context) ([]string, error) {
ctx, cancel := context.WithTimeout(ctx, c.clientTimeout)
defer cancel()
tags, err := c.client.Tags(ctx)
if ctx.Err() == context.DeadlineExceeded {
return nil, c.clientTimeoutError()
}
return tags, err
}

// storeRepository stores the repository from the cache
func (c *repoCacheManager) storeRepository(repo ImageRepository) error {
repoKey := NewRepositoryKey(c.repoID.CanonicalName())
Expand Down Expand Up @@ -154,7 +175,7 @@ func (c *repoCacheManager) fetchImages(tags []string) (fetchImagesResult, error)
// updateImages, refreshes the cache entries for the images passed. It may not succeed for all images.
// It returns the values stored in cache, the number of images it succeeded for and the number
// of images whose manifest wasn't found in the registry.
func (c *repoCacheManager) updateImages(ctx context.Context, registryClient registry.Client, images []imageToUpdate) (map[string]image.Info, int, int) {
func (c *repoCacheManager) updateImages(ctx context.Context, images []imageToUpdate) (map[string]image.Info, int, int) {
// The upper bound for concurrent fetches against a single host is
// w.Burst, so limit the number of fetching goroutines to that.
fetchers := make(chan struct{}, c.burst)
Expand All @@ -179,9 +200,11 @@ updates:
awaitFetchers.Add(1)
go func() {
defer func() { awaitFetchers.Done(); <-fetchers }()
entry, err := c.updateImage(ctxc, registryClient, upCopy)
ctxcc, cancel := context.WithTimeout(ctxc, c.clientTimeout)
defer cancel()
entry, err := c.updateImage(ctxcc, upCopy)
if err != nil {
if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
if err, ok := errors.Cause(err).(net.Error); (ok && err.Timeout()) || ctxcc.Err() == context.DeadlineExceeded {
// This was due to a context timeout, don't bother logging
return
}
Expand Down Expand Up @@ -216,16 +239,21 @@ updates:
return result, successCount, manifestUnknownCount
}

func (c *repoCacheManager) updateImage(ctx context.Context, registryClient registry.Client, update imageToUpdate) (registry.ImageEntry, error) {
func (c *repoCacheManager) updateImage(ctx context.Context, update imageToUpdate) (registry.ImageEntry, error) {
imageID := update.ref

if c.trace {
c.logger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
}

ctx, cancel := context.WithTimeout(ctx, c.clientTimeout)
defer cancel()
// Get the image from the remote
entry, err := registryClient.Manifest(ctx, imageID.Tag)
entry, err := c.client.Manifest(ctx, imageID.Tag)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return registry.ImageEntry{}, c.clientTimeoutError()
}
return registry.ImageEntry{}, err
}

Expand Down Expand Up @@ -266,3 +294,7 @@ func (c *repoCacheManager) updateImage(ctx context.Context, registryClient regis
}
return entry, nil
}

func (r *repoCacheManager) clientTimeoutError() error {
return fmt.Errorf("client timeout (%s) exceeded", r.clientTimeout)
}
59 changes: 59 additions & 0 deletions registry/cache/repocachemanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cache

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/registry/middleware"
)

func Test_ClientTimeouts(t *testing.T) {
timeout := 2 * time.Millisecond
server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
// make sure we exceed the timeout
time.Sleep(timeout * 10)
}))
defer server.Close()
url, err := url.Parse(server.URL)
assert.NoError(t, err)
logger := log.NewLogfmtLogger(os.Stdout)
cf := &registry.RemoteClientFactory{
Logger: log.NewLogfmtLogger(os.Stdout),
Limiters: &middleware.RateLimiters{
RPS: 100,
Burst: 100,
Logger: logger,
},
Trace: false,
InsecureHosts: []string{url.Host},
}
name := image.Name{
Domain: url.Host,
Image: "foo/bar",
}
rcm, err := newRepoCacheManager(
time.Now(),
name,
cf,
registry.NoCredentials(),
timeout,
100,
false,
logger,
nil,
)
assert.NoError(t, err)
_, err = rcm.getTags(context.Background())
assert.Error(t, err)
assert.Equal(t, "client timeout (2ms) exceeded", err.Error())
}
8 changes: 3 additions & 5 deletions registry/cache/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,12 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id image.Name, creds registry.Credentials) {
errorLogger := log.With(logger, "canonical_name", id.CanonicalName(), "auth", creds)

client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds)
cacheManager, err := newRepoCacheManager(now, id, w.clientFactory, creds, time.Minute, w.burst, w.Trace, errorLogger, w.cache)
if err != nil {
errorLogger.Log("err", err.Error())
return
}

cacheManager := newRepoCacheManager(now, id, w.burst, w.Trace, errorLogger, w.cache)

// This is what we're going to write back to the cache
var repo ImageRepository
repo, err = cacheManager.fetchRepository()
Expand All @@ -176,7 +174,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
}
}()

tags, err := client.Tags(ctx)
tags, err := cacheManager.getTags(ctx)
if err != nil {
if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
errorLogger.Log("err", errors.Wrap(err, "requesting tags"))
Expand All @@ -201,7 +199,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id
"to_update", len(fetchResult.imagesToUpdate),
"of_which_refresh", fetchResult.imagesToUpdateRefreshCount, "of_which_missing", fetchResult.imagesToUpdateMissingCount)
var images map[string]image.Info
images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, client, fetchResult.imagesToUpdate)
images, successCount, manifestUnknownCount = cacheManager.updateImages(ctx, fetchResult.imagesToUpdate)
for k, v := range images {
newImages[k] = v
}
Expand Down

0 comments on commit d4b0ae7

Please sign in to comment.