Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Bring registry tests up to date
Browse files Browse the repository at this point in the history
 - rate_limiter_test.go tested that contexts were not shared between
   transports; but we no longer implement the transport under test.
 - the use of memcached has changed
 - removed some spurious cache warmer tests
 - move the mock registry objects (which are handy elsewhere) to
   registry/mock
 - remove the tests that check if the registry assembles manifests
   from individual cache entries; it no longer does that
 - check that the cache warmer populates the cache, then the registry
   can read the result.

and a change made /en passant/:

 - supply the registry cache expiry argument to the cache and use it
   - and don't supply it to the warmer, which doesn't use it
  • Loading branch information
squaremo committed Nov 30, 2017
1 parent 390d1d1 commit 321694c
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 401 deletions.
6 changes: 3 additions & 3 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func main() {
memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 20*time.Minute, "Duration to keep cached registry tag info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to poll registry for new images")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host")
registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")

Expand Down Expand Up @@ -240,6 +240,7 @@ func main() {
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
Expand Down Expand Up @@ -270,7 +271,6 @@ func main() {
cacheWarmer = &cache.Warmer{
Logger: warmerLogger,
ClientFactory: remoteFactory,
Expiry: *registryCacheExpiry,
Cache: cacheClient,
Burst: *registryBurst,
}
Expand Down
13 changes: 8 additions & 5 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/registry"
registryMock "github.com/weaveworks/flux/registry/mock"
"github.com/weaveworks/flux/remote"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/update"
Expand Down Expand Up @@ -405,11 +406,13 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter)
img1 := makeImageInfo(currentHelloImage, time.Now())
img2 := makeImageInfo(newHelloImage, time.Now().Add(1*time.Second))
img3 := makeImageInfo("another/service:latest", time.Now().Add(1*time.Second))
imageRegistry = registry.NewMockRegistry([]image.Info{
img1,
img2,
img3,
}, nil)
imageRegistry = &registryMock.Registry{
Images: []image.Info{
img1,
img2,
img3,
},
}
}

events := &mockEventWriter{}
Expand Down
4 changes: 2 additions & 2 deletions daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/git/gittest"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/registry"
registryMock "github.com/weaveworks/flux/registry/mock"
"github.com/weaveworks/flux/resource"
)

Expand Down Expand Up @@ -66,7 +66,7 @@ func daemon(t *testing.T) (*Daemon, func()) {
d := &Daemon{
Cluster: k8s,
Manifests: k8s,
Registry: registry.NewMockRegistry(nil, nil),
Registry: &registryMock.Registry{},
Checkout: working,
Jobs: jobs,
JobStatusCache: &job.StatusCache{Size: 100},
Expand Down
28 changes: 20 additions & 8 deletions registry/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const (
expiry = time.Hour
DefaultExpiry = time.Hour
)

// MemcacheClient is a memcache client that gets its server list from SRV
Expand All @@ -26,6 +26,7 @@ type MemcacheClient struct {
serverList *memcache.ServerList
hostname string
service string
ttl time.Duration
logger log.Logger

quit chan struct{}
Expand All @@ -36,6 +37,7 @@ type MemcacheClient struct {
type MemcacheConfig struct {
Host string
Service string
Expiry time.Duration
Timeout time.Duration
UpdateInterval time.Duration
Logger log.Logger
Expand All @@ -53,10 +55,15 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

err := newClient.updateMemcacheServers()
if err != nil {
config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host))
Expand All @@ -79,10 +86,15 @@ func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) *M
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

return newClient
}

Expand All @@ -104,19 +116,19 @@ func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
return []byte{}, time.Time{}, err
}
}
expiry := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(expiry), 0), nil
exTime := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(exTime), 0), nil
}

// SetKey sets the value at a key.
func (c *MemcacheClient) SetKey(k cache.Keyer, v []byte) error {
expiry := time.Now().Add(expiry).Unix()
expiryBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(expiryBytes, uint32(expiry))
exTime := time.Now().Add(c.ttl).Unix()
exBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(exBytes, uint32(exTime))
if err := c.client.Set(&memcache.Item{
Key: k.Key(),
Value: append(expiryBytes, v...),
Expiration: int32(expiry),
Value: append(exBytes, v...),
Expiration: int32(exTime),
}); err != nil {
c.logger.Log("err", errors.Wrap(err, "storing in memcache"))
return err
Expand Down
24 changes: 2 additions & 22 deletions registry/cache/memcached/memcached_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// +build integration

package memcached

import (
Expand Down Expand Up @@ -38,8 +39,7 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) {
t.Fatal(err)
}

// Get the expiry
expiry, err := mc.GetExpiration(key)
cached, expiry, err := mc.GetKey(key)
if err != nil {
t.Fatal(err)
}
Expand All @@ -49,27 +49,7 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) {
if expiry.Before(time.Now()) {
t.Fatal("Expiry should be in the future")
}
}

func TestMemcache_ReadWrite(t *testing.T) {
// Memcache client
mc := NewFixedServerMemcacheClient(MemcacheConfig{
Timeout: time.Second,
UpdateInterval: 1 * time.Minute,
Logger: log.With(log.NewLogfmtLogger(os.Stderr), "component", "memcached"),
}, strings.Fields(*memcachedIPs)...)

// Set some dummy data
err := mc.SetKey(key, val)
if err != nil {
t.Fatal(err)
}

// Get the data
cached, err := mc.GetKey(key)
if err != nil {
t.Fatal(err)
}
if string(cached) != string(val) {
t.Fatalf("Should have returned %q, but got %q", string(val), string(cached))
}
Expand Down
5 changes: 2 additions & 3 deletions registry/cache/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ const askForNewImagesInterval = time.Minute
// registries.
type Warmer struct {
Logger log.Logger
ClientFactory *registry.RemoteClientFactory
Expiry time.Duration
ClientFactory registry.ClientFactory
Cache Client
Burst int
Priority chan image.Name
Expand All @@ -40,7 +39,7 @@ type backlogItem struct {
func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) {
defer wg.Done()

if w.Logger == nil || w.ClientFactory == nil || w.Expiry == 0 || w.Cache == nil {
if w.Logger == nil || w.ClientFactory == nil || w.Cache == nil {
panic("registry.Warmer fields are nil")
}

Expand Down
89 changes: 69 additions & 20 deletions registry/cache/warming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,83 @@ package cache

import (
"context"
"sync"
"testing"
"time"

"github.com/pkg/errors"
"github.com/go-kit/kit/log"

"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/registry/mock"
)

func TestWarming_ExpiryBuffer(t *testing.T) {
testTime := time.Now()
for _, x := range []struct {
expiresIn, buffer time.Duration
expectedResult bool
}{
{time.Minute, time.Second, false},
{time.Second, time.Minute, true},
} {
if withinExpiryBuffer(testTime.Add(x.expiresIn), x.buffer) != x.expectedResult {
t.Fatalf("Should return %t", x.expectedResult)
}
type mem struct {
kv map[string][]byte
mx sync.Mutex
}

func (c *mem) SetKey(k Keyer, v []byte) error {
c.mx.Lock()
defer c.mx.Unlock()
if c.kv == nil {
c.kv = make(map[string][]byte)
}
c.kv[k.Key()] = v
return nil
}

func (c *mem) GetKey(k Keyer) ([]byte, time.Time, error) {
c.mx.Lock()
defer c.mx.Unlock()
if c.kv == nil {
c.kv = make(map[string][]byte)
}

if v, ok := c.kv[k.Key()]; ok {
return v, time.Now().Add(time.Hour), nil
}
return nil, time.Time{}, ErrNotCached
}

func TestName(t *testing.T) {
err := errors.Wrap(context.DeadlineExceeded, "getting remote manifest")
t.Log(err.Error())
err = errors.Cause(err)
if err == context.DeadlineExceeded {
t.Log("OK")
// WarmTest effectively checks that the cache.Warmer and
// cache.Registry work together as intended: that is, if you ask the
// warmer to fetch information, the cached gets populated, and the
// Registry implementation will see it.
func TestWarm(t *testing.T) {
ref, _ := image.ParseRef("example.com/path/image:tag")
repo := ref.Name

client := &mock.Client{
TagsFn: func() ([]string, error) {
return []string{"tag"}, nil
},
ManifestFn: func(tag string) (image.Info, error) {
if tag != "tag" {
t.Errorf("remote client was asked for %q instead of %q", tag, "tag")
}
return image.Info{
ID: ref,
CreatedAt: time.Now(),
}, nil
},
}
factory := &mock.ClientFactory{Client: client}
c := &mem{}
warmer := &Warmer{Logger: log.NewNopLogger(), ClientFactory: factory, Cache: c, Burst: 10}
warmer.warm(context.TODO(), repo, registry.NoCredentials())

registry := &Cache{Reader: c}
repoInfo, err := registry.GetRepository(ref.Name)
if err != nil {
t.Error(err)
}
// Otherwise, we should get what we put in ...
if len(repoInfo) != 1 {
t.Errorf("expected an image.Info item; got %#v", repoInfo)
} else {
t.Log("Not OK")
if got := repoInfo[0].ID.String(); got != ref.String() {
t.Errorf("expected image %q from registry cache; got %q", ref.String(), got)
}
}
}
7 changes: 7 additions & 0 deletions registry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ type Client interface {
Manifest(ctx context.Context, ref string) (image.Info, error)
}

// ClientFactory supplies Client implementations for a given repo,
// with credentials. This is an interface so we can provide fake
// implementations.
type ClientFactory interface {
ClientFor(image.CanonicalName, Credentials) (Client, error)
}

type Remote struct {
transport http.RoundTripper
repo image.CanonicalName
Expand Down
2 changes: 1 addition & 1 deletion registry/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func ParseCredentials(b []byte) (Credentials, error) {

// Some users were passing in credentials in the form of
// http://docker.io and http://docker.io/v1/, etc.
// So strip everything down to it's base host.
// So strip everything down to the host.
// Also, the registry might be local and on a different port.
// So we need to check for that because url.Parse won't parse the ip:port format very well.
u, err := url.Parse(host)
Expand Down
Loading

0 comments on commit 321694c

Please sign in to comment.