From f11b18cfce4fb644b62a81d366bf57a7627316b3 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 26 Feb 2019 11:27:21 +0000 Subject: [PATCH] Refresh the default namespace periodically --- cluster/kubernetes/cached_disco_test.go | 14 ++++++- cluster/kubernetes/namespacer.go | 49 ++++++++++++++++++++++++- cluster/kubernetes/namespacer_test.go | 23 +++++++++++- cluster/kubernetes/sync_test.go | 18 +++++++-- cmd/fluxd/main.go | 8 +++- 5 files changed, 102 insertions(+), 10 deletions(-) diff --git a/cluster/kubernetes/cached_disco_test.go b/cluster/kubernetes/cached_disco_test.go index 0efed8bd78..54b6e58c15 100644 --- a/cluster/kubernetes/cached_disco_test.go +++ b/cluster/kubernetes/cached_disco_test.go @@ -1,9 +1,12 @@ package kubernetes import ( + "os" + "sync" "testing" "time" + "github.com/go-kit/kit/log" crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -69,8 +72,15 @@ func TestCachedDiscovery(t *testing.T) { } cachedDisco, store, _ := makeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown, makeHandler) - - namespacer, err := NewNamespacer(namespaceDefaulterFake("bar-ns"), cachedDisco) + shutdownWg := &sync.WaitGroup{} + shutdownWg.Add(1) + namespacer, err := NewNamespacer( + namespaceDefaulterFake("bar-ns"), + cachedDisco, + log.NewLogfmtLogger(os.Stdout), + shutdown, + shutdownWg, + ) if err != nil { t.Fatal(err) } diff --git a/cluster/kubernetes/namespacer.go b/cluster/kubernetes/namespacer.go index 4f2d4c41ed..d67cf96706 100644 --- a/cluster/kubernetes/namespacer.go +++ b/cluster/kubernetes/namespacer.go @@ -2,15 +2,22 @@ package kubernetes import ( "fmt" + "sync" + "time" + "github.com/go-kit/kit/log" "k8s.io/client-go/discovery" kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" ) +var updateFallbackNSPeroid = time.Minute * 2 + type namespaceViaDiscovery struct { fallbackNamespace string disco discovery.DiscoveryInterface + defaulter namespaceDefaulter + sync.RWMutex } type namespaceDefaulter interface { @@ -18,12 +25,21 @@ type namespaceDefaulter interface { } // NewNamespacer creates an implementation of Namespacer -func NewNamespacer(ns namespaceDefaulter, d discovery.DiscoveryInterface) (*namespaceViaDiscovery, error) { +func NewNamespacer( + ns namespaceDefaulter, + d discovery.DiscoveryInterface, + l log.Logger, + shutdown chan struct{}, + shutdownWg *sync.WaitGroup) (*namespaceViaDiscovery, error) { + fallback, err := ns.GetDefaultNamespace() if err != nil { return nil, err } - return &namespaceViaDiscovery{fallbackNamespace: fallback, disco: d}, nil + result := &namespaceViaDiscovery{fallbackNamespace: fallback, disco: d, defaulter: ns} + shutdownWg.Add(1) + go result.updateFallbackNSLoop(updateFallbackNSPeroid, l, shutdown, shutdownWg) + return result, nil } // effectiveNamespace yields the namespace that would be used for this @@ -35,6 +51,8 @@ func (n *namespaceViaDiscovery) EffectiveNamespace(m kresource.KubeManifest) (st case err != nil: return "", err case namespaced && m.GetNamespace() == "": + n.RLock() + defer n.RUnlock() return n.fallbackNamespace, nil case !namespaced: return "", nil @@ -54,3 +72,30 @@ func (n *namespaceViaDiscovery) lookupNamespaced(groupVersion, kind string) (boo } return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind) } + +func (n *namespaceViaDiscovery) updateFallbackNSLoop( + period time.Duration, + logger log.Logger, + shutdown chan struct{}, + shutdownWg *sync.WaitGroup) { + + defer shutdownWg.Done() + ticker := time.NewTicker(period) + keepGoing := true + for keepGoing { + select { + case <-ticker.C: + fallbackNamespace, err := n.defaulter.GetDefaultNamespace() + if err != nil { + logger.Log("err", err) + break + } + n.Lock() + n.fallbackNamespace = fallbackNamespace + n.Unlock() + case <-shutdown: + ticker.Stop() + keepGoing = false + } + } +} diff --git a/cluster/kubernetes/namespacer_test.go b/cluster/kubernetes/namespacer_test.go index b0ec71b689..4c66349c85 100644 --- a/cluster/kubernetes/namespacer_test.go +++ b/cluster/kubernetes/namespacer_test.go @@ -1,8 +1,12 @@ package kubernetes import ( + "os" + "sync" "testing" + "time" + "github.com/go-kit/kit/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corefake "k8s.io/client-go/kubernetes/fake" @@ -39,11 +43,28 @@ func makeFakeClient() *corefake.Clientset { } func TestNamespaceDefaulting(t *testing.T) { + coreClient := makeFakeClient() - nser, err := NewNamespacer(namespaceDefaulterFake("fallback-ns"), coreClient.Discovery()) + shutdown := make(chan struct{}) + defer close(shutdown) + shutdownWg := &sync.WaitGroup{} + shutdownWg.Add(1) + savedUpdateFallbackNSPeroid := updateFallbackNSPeroid + updateFallbackNSPeroid = 20 * time.Millisecond + defer func() { updateFallbackNSPeroid = savedUpdateFallbackNSPeroid }() + + nser, err := NewNamespacer( + namespaceDefaulterFake("fallback-ns"), + coreClient.Discovery(), + log.NewLogfmtLogger(os.Stdout), + shutdown, + shutdownWg, + ) if err != nil { t.Fatal(err) } + // Make sure we test the namespace update loop + time.Sleep(updateFallbackNSPeroid * 3) const defs = `--- apiVersion: apps/v1 diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index 5520ac3b2e..6be5900c68 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -2,8 +2,10 @@ package kubernetes import ( "fmt" + "os" "sort" "strings" + "sync" "testing" "github.com/ghodss/yaml" @@ -28,7 +30,7 @@ import ( "github.com/weaveworks/flux/cluster" kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" fluxfake "github.com/weaveworks/flux/integrations/client/clientset/versioned/fake" - "github.com/weaveworks/flux/sync" + fluxsync "github.com/weaveworks/flux/sync" ) const ( @@ -262,7 +264,17 @@ metadata: } test := func(t *testing.T, kube *Cluster, defs, expectedAfterSync string, expectErrors bool) { - namespacer, err := NewNamespacer(namespaceDefaulterFake(defaultTestNamespace), kube.client.coreClient.Discovery()) + shutdownWg := &sync.WaitGroup{} + shutdownWg.Add(1) + shutdown := make(chan struct{}) + defer close(shutdown) + namespacer, err := NewNamespacer( + namespaceDefaulterFake(defaultTestNamespace), + kube.client.coreClient.Discovery(), + log.NewLogfmtLogger(os.Stdout), + shutdown, + shutdownWg, + ) if err != nil { t.Fatal(err) } @@ -278,7 +290,7 @@ metadata: t.Fatal(err) } - err = sync.Sync("testset", resources, kube) + err = fluxsync.Sync("testset", resources, kube) if !expectErrors && err != nil { t.Error(err) } diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index f9af51dc88..bd303abe59 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -317,8 +317,12 @@ func main() { // There is only one way we currently interpret a repo of // files as manifests, and that's as Kubernetes yamels. k8sManifests = &kubernetes.Manifests{} - k8sManifests.Namespacer, err = kubernetes.NewNamespacer(kubectlApplier, discoClientset) - + k8sManifests.Namespacer, err = kubernetes.NewNamespacer( + kubectlApplier, + discoClientset, + log.With(logger, "component", "namespacer"), + shutdown, + shutdownWg) if err != nil { logger.Log("err", err) os.Exit(1)