Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Refresh the default namespace periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Feb 26, 2019
1 parent 1d86d57 commit f11b18c
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 10 deletions.
14 changes: 12 additions & 2 deletions cluster/kubernetes/cached_disco_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package kubernetes

import (
"os"
"sync"
"testing"
"time"

"github.com/go-kit/kit/log"
crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -69,8 +72,15 @@ func TestCachedDiscovery(t *testing.T) {
}

cachedDisco, store, _ := makeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown, makeHandler)

namespacer, err := NewNamespacer(namespaceDefaulterFake("bar-ns"), cachedDisco)
shutdownWg := &sync.WaitGroup{}
shutdownWg.Add(1)
namespacer, err := NewNamespacer(
namespaceDefaulterFake("bar-ns"),
cachedDisco,
log.NewLogfmtLogger(os.Stdout),
shutdown,
shutdownWg,
)
if err != nil {
t.Fatal(err)
}
Expand Down
49 changes: 47 additions & 2 deletions cluster/kubernetes/namespacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,44 @@ package kubernetes

import (
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log"
"k8s.io/client-go/discovery"

kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
)

var updateFallbackNSPeroid = time.Minute * 2

type namespaceViaDiscovery struct {
fallbackNamespace string
disco discovery.DiscoveryInterface
defaulter namespaceDefaulter
sync.RWMutex
}

type namespaceDefaulter interface {
GetDefaultNamespace() (string, error)
}

// NewNamespacer creates an implementation of Namespacer
func NewNamespacer(ns namespaceDefaulter, d discovery.DiscoveryInterface) (*namespaceViaDiscovery, error) {
func NewNamespacer(
ns namespaceDefaulter,
d discovery.DiscoveryInterface,
l log.Logger,
shutdown chan struct{},
shutdownWg *sync.WaitGroup) (*namespaceViaDiscovery, error) {

fallback, err := ns.GetDefaultNamespace()
if err != nil {
return nil, err
}
return &namespaceViaDiscovery{fallbackNamespace: fallback, disco: d}, nil
result := &namespaceViaDiscovery{fallbackNamespace: fallback, disco: d, defaulter: ns}
shutdownWg.Add(1)
go result.updateFallbackNSLoop(updateFallbackNSPeroid, l, shutdown, shutdownWg)
return result, nil
}

// effectiveNamespace yields the namespace that would be used for this
Expand All @@ -35,6 +51,8 @@ func (n *namespaceViaDiscovery) EffectiveNamespace(m kresource.KubeManifest) (st
case err != nil:
return "", err
case namespaced && m.GetNamespace() == "":
n.RLock()
defer n.RUnlock()
return n.fallbackNamespace, nil
case !namespaced:
return "", nil
Expand All @@ -54,3 +72,30 @@ func (n *namespaceViaDiscovery) lookupNamespaced(groupVersion, kind string) (boo
}
return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind)
}

func (n *namespaceViaDiscovery) updateFallbackNSLoop(
period time.Duration,
logger log.Logger,
shutdown chan struct{},
shutdownWg *sync.WaitGroup) {

defer shutdownWg.Done()
ticker := time.NewTicker(period)
keepGoing := true
for keepGoing {
select {
case <-ticker.C:
fallbackNamespace, err := n.defaulter.GetDefaultNamespace()
if err != nil {
logger.Log("err", err)
break
}
n.Lock()
n.fallbackNamespace = fallbackNamespace
n.Unlock()
case <-shutdown:
ticker.Stop()
keepGoing = false
}
}
}
23 changes: 22 additions & 1 deletion cluster/kubernetes/namespacer_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package kubernetes

import (
"os"
"sync"
"testing"
"time"

"github.com/go-kit/kit/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corefake "k8s.io/client-go/kubernetes/fake"

Expand Down Expand Up @@ -39,11 +43,28 @@ func makeFakeClient() *corefake.Clientset {
}

func TestNamespaceDefaulting(t *testing.T) {

coreClient := makeFakeClient()
nser, err := NewNamespacer(namespaceDefaulterFake("fallback-ns"), coreClient.Discovery())
shutdown := make(chan struct{})
defer close(shutdown)
shutdownWg := &sync.WaitGroup{}
shutdownWg.Add(1)
savedUpdateFallbackNSPeroid := updateFallbackNSPeroid
updateFallbackNSPeroid = 20 * time.Millisecond
defer func() { updateFallbackNSPeroid = savedUpdateFallbackNSPeroid }()

nser, err := NewNamespacer(
namespaceDefaulterFake("fallback-ns"),
coreClient.Discovery(),
log.NewLogfmtLogger(os.Stdout),
shutdown,
shutdownWg,
)
if err != nil {
t.Fatal(err)
}
// Make sure we test the namespace update loop
time.Sleep(updateFallbackNSPeroid * 3)

const defs = `---
apiVersion: apps/v1
Expand Down
18 changes: 15 additions & 3 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package kubernetes

import (
"fmt"
"os"
"sort"
"strings"
"sync"
"testing"

"github.com/ghodss/yaml"
Expand All @@ -28,7 +30,7 @@ import (
"github.com/weaveworks/flux/cluster"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
fluxfake "github.com/weaveworks/flux/integrations/client/clientset/versioned/fake"
"github.com/weaveworks/flux/sync"
fluxsync "github.com/weaveworks/flux/sync"
)

const (
Expand Down Expand Up @@ -262,7 +264,17 @@ metadata:
}

test := func(t *testing.T, kube *Cluster, defs, expectedAfterSync string, expectErrors bool) {
namespacer, err := NewNamespacer(namespaceDefaulterFake(defaultTestNamespace), kube.client.coreClient.Discovery())
shutdownWg := &sync.WaitGroup{}
shutdownWg.Add(1)
shutdown := make(chan struct{})
defer close(shutdown)
namespacer, err := NewNamespacer(
namespaceDefaulterFake(defaultTestNamespace),
kube.client.coreClient.Discovery(),
log.NewLogfmtLogger(os.Stdout),
shutdown,
shutdownWg,
)
if err != nil {
t.Fatal(err)
}
Expand All @@ -278,7 +290,7 @@ metadata:
t.Fatal(err)
}

err = sync.Sync("testset", resources, kube)
err = fluxsync.Sync("testset", resources, kube)
if !expectErrors && err != nil {
t.Error(err)
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ func main() {
// There is only one way we currently interpret a repo of
// files as manifests, and that's as Kubernetes yamels.
k8sManifests = &kubernetes.Manifests{}
k8sManifests.Namespacer, err = kubernetes.NewNamespacer(kubectlApplier, discoClientset)

k8sManifests.Namespacer, err = kubernetes.NewNamespacer(
kubectlApplier,
discoClientset,
log.With(logger, "component", "namespacer"),
shutdown,
shutdownWg)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand Down

0 comments on commit f11b18c

Please sign in to comment.