Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Bring registry tests up to date
Browse files Browse the repository at this point in the history
 - rate_limiter_test.go tested that contexts were not shared between
   transports; but we no longer implement the transport under test.
 - the use of memcached has changed
 - removed some spurious cache warmer tests
 - move the mock registry objects (which are handy elsewhere) to
   registry/mock
 - remove the tests that check if the registry assembles manifests
   from individual cache entries; it no longer does that
 - check that the cache warmer populates the cache, then the registry
   can read the result
 - move and rewrite the registry integration test

and a change made /en passant/:

 - supply the registry cache expiry argument to the cache and use it
   - and don't supply it to the warmer, which doesn't use it
  • Loading branch information
squaremo committed Nov 30, 2017
1 parent 390d1d1 commit 5514161
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 434 deletions.
6 changes: 3 additions & 3 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func main() {
memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 20*time.Minute, "Duration to keep cached registry tag info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to poll registry for new images")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host")
registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")

Expand Down Expand Up @@ -240,6 +240,7 @@ func main() {
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
Expand Down Expand Up @@ -270,7 +271,6 @@ func main() {
cacheWarmer = &cache.Warmer{
Logger: warmerLogger,
ClientFactory: remoteFactory,
Expiry: *registryCacheExpiry,
Cache: cacheClient,
Burst: *registryBurst,
}
Expand Down
13 changes: 8 additions & 5 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/registry"
registryMock "github.com/weaveworks/flux/registry/mock"
"github.com/weaveworks/flux/remote"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/update"
Expand Down Expand Up @@ -405,11 +406,13 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, *mockEventWriter)
img1 := makeImageInfo(currentHelloImage, time.Now())
img2 := makeImageInfo(newHelloImage, time.Now().Add(1*time.Second))
img3 := makeImageInfo("another/service:latest", time.Now().Add(1*time.Second))
imageRegistry = registry.NewMockRegistry([]image.Info{
img1,
img2,
img3,
}, nil)
imageRegistry = &registryMock.Registry{
Images: []image.Info{
img1,
img2,
img3,
},
}
}

events := &mockEventWriter{}
Expand Down
4 changes: 2 additions & 2 deletions daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/git/gittest"
"github.com/weaveworks/flux/job"
"github.com/weaveworks/flux/registry"
registryMock "github.com/weaveworks/flux/registry/mock"
"github.com/weaveworks/flux/resource"
)

Expand Down Expand Up @@ -66,7 +66,7 @@ func daemon(t *testing.T) (*Daemon, func()) {
d := &Daemon{
Cluster: k8s,
Manifests: k8s,
Registry: registry.NewMockRegistry(nil, nil),
Registry: &registryMock.Registry{},
Checkout: working,
Jobs: jobs,
JobStatusCache: &job.StatusCache{Size: 100},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// +build integration

package registry
package memcached

import (
"flag"
"os"
"strings"
"sync"
Expand All @@ -13,20 +12,19 @@ import (
"github.com/go-kit/kit/log"

"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/registry/cache"
"github.com/weaveworks/flux/registry/middleware"
)

var (
memcachedIPs = flag.String("memcached-ips", "127.0.0.1:11211", "space-separated host:port values for memcached to connect to")
)
// memcachedIPs flag from memcached_test.go

// This tests a real memcache cache and a request to a real docker
// repository. It is intended to be an end-to-end integration test
// for the warmer since I had a few bugs introduced when
// refactoring. This should cover against these bugs.
// This tests a real memcached cache and a request to a real docker
// repository. It is intended to be an end-to-end integration test for
// the warmer since I had a few bugs introduced when refactoring. This
// should cover against these bugs.
func TestWarming_WarmerWriteCacheRead(t *testing.T) {
mc := cache.NewFixedServerMemcacheClient(cache.MemcacheConfig{
mc := NewFixedServerMemcacheClient(MemcacheConfig{
Timeout: time.Second,
UpdateInterval: 1 * time.Minute,
Logger: log.With(log.NewLogfmtLogger(os.Stderr), "component", "memcached"),
Expand All @@ -36,30 +34,18 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stderr)

remote := NewRemoteClientFactory(
log.With(logger, "component", "client"),
middleware.RateLimiterConfig{200, 10},
)

cache := NewCacheClientFactory(
log.With(logger, "component", "cache"),
mc,
time.Hour,
)
remote := &registry.RemoteClientFactory{
Logger: log.With(logger, "component", "client"),
Limiters: &middleware.RateLimiters{RPS: 200, Burst: 10},
Trace: true,
}

r := NewRegistry(
cache,
log.With(logger, "component", "registry"),
512,
)
r := &cache.Cache{mc}

w := Warmer{
w := &cache.Warmer{
Logger: log.With(logger, "component", "warmer"),
ClientFactory: remote,
Creds: NoCredentials(),
Expiry: time.Hour,
Reader: mc,
Writer: mc,
Cache: mc,
Burst: 125,
}

Expand All @@ -71,9 +57,9 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) {
}()

shutdownWg.Add(1)
go w.Loop(shutdown, shutdownWg, func() ImageCreds {
return ImageCreds{
id.Name: NoCredentials(),
go w.Loop(shutdown, shutdownWg, func() registry.ImageCreds {
return registry.ImageCreds{
id.Name: registry.NoCredentials(),
}
})

Expand Down
28 changes: 20 additions & 8 deletions registry/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const (
expiry = time.Hour
DefaultExpiry = time.Hour
)

// MemcacheClient is a memcache client that gets its server list from SRV
Expand All @@ -26,6 +26,7 @@ type MemcacheClient struct {
serverList *memcache.ServerList
hostname string
service string
ttl time.Duration
logger log.Logger

quit chan struct{}
Expand All @@ -36,6 +37,7 @@ type MemcacheClient struct {
type MemcacheConfig struct {
Host string
Service string
Expiry time.Duration
Timeout time.Duration
UpdateInterval time.Duration
Logger log.Logger
Expand All @@ -53,10 +55,15 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

err := newClient.updateMemcacheServers()
if err != nil {
config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host))
Expand All @@ -79,10 +86,15 @@ func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) *M
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

return newClient
}

Expand All @@ -104,19 +116,19 @@ func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
return []byte{}, time.Time{}, err
}
}
expiry := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(expiry), 0), nil
exTime := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(exTime), 0), nil
}

// SetKey sets the value at a key.
func (c *MemcacheClient) SetKey(k cache.Keyer, v []byte) error {
expiry := time.Now().Add(expiry).Unix()
expiryBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(expiryBytes, uint32(expiry))
exTime := time.Now().Add(c.ttl).Unix()
exBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(exBytes, uint32(exTime))
if err := c.client.Set(&memcache.Item{
Key: k.Key(),
Value: append(expiryBytes, v...),
Expiration: int32(expiry),
Value: append(exBytes, v...),
Expiration: int32(exTime),
}); err != nil {
c.logger.Log("err", errors.Wrap(err, "storing in memcache"))
return err
Expand Down
24 changes: 2 additions & 22 deletions registry/cache/memcached/memcached_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// +build integration

package memcached

import (
Expand Down Expand Up @@ -38,8 +39,7 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) {
t.Fatal(err)
}

// Get the expiry
expiry, err := mc.GetExpiration(key)
cached, expiry, err := mc.GetKey(key)
if err != nil {
t.Fatal(err)
}
Expand All @@ -49,27 +49,7 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) {
if expiry.Before(time.Now()) {
t.Fatal("Expiry should be in the future")
}
}

func TestMemcache_ReadWrite(t *testing.T) {
// Memcache client
mc := NewFixedServerMemcacheClient(MemcacheConfig{
Timeout: time.Second,
UpdateInterval: 1 * time.Minute,
Logger: log.With(log.NewLogfmtLogger(os.Stderr), "component", "memcached"),
}, strings.Fields(*memcachedIPs)...)

// Set some dummy data
err := mc.SetKey(key, val)
if err != nil {
t.Fatal(err)
}

// Get the data
cached, err := mc.GetKey(key)
if err != nil {
t.Fatal(err)
}
if string(cached) != string(val) {
t.Fatalf("Should have returned %q, but got %q", string(val), string(cached))
}
Expand Down
5 changes: 2 additions & 3 deletions registry/cache/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ const askForNewImagesInterval = time.Minute
// registries.
type Warmer struct {
Logger log.Logger
ClientFactory *registry.RemoteClientFactory
Expiry time.Duration
ClientFactory registry.ClientFactory
Cache Client
Burst int
Priority chan image.Name
Expand All @@ -40,7 +39,7 @@ type backlogItem struct {
func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) {
defer wg.Done()

if w.Logger == nil || w.ClientFactory == nil || w.Expiry == 0 || w.Cache == nil {
if w.Logger == nil || w.ClientFactory == nil || w.Cache == nil {
panic("registry.Warmer fields are nil")
}

Expand Down
Loading

0 comments on commit 5514161

Please sign in to comment.