diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index 83897ee87..9d4cda88c 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -258,7 +258,7 @@ func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*ku func (c *Cluster) listAllowedResources( namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) { if !namespaced { - // The resource is not namespaced + // The resource is not namespaced, everything is allowed resourceClient := c.client.dynamicClient.Resource(gvr) data, err := resourceClient.List(options) if err != nil { diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index 7c4a055df..12f042e79 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -21,6 +21,7 @@ import ( // k8sclient "k8s.io/client-go/kubernetes" "github.com/stretchr/testify/assert" "k8s.io/client-go/discovery" + k8sclient "k8s.io/client-go/kubernetes" corefake "k8s.io/client-go/kubernetes/fake" k8s_testing "k8s.io/client-go/testing" @@ -95,10 +96,10 @@ func fakeClients() ExtendedClient { // enough for checking whether sync operations succeeded and had the // correct effect, which is either to "upsert", or delete, resources. type fakeApplier struct { - client dynamic.Interface - discovery discovery.DiscoveryInterface - defaultNS string - commandRun bool + dynamicClient dynamic.Interface + coreClient k8sclient.Interface + defaultNS string + commandRun bool } func groupVersionResource(res *unstructured.Unstructured) schema.GroupVersionResource { @@ -126,12 +127,12 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[flux.Resource } gvr := groupVersionResource(res) - c := a.client.Resource(gvr) + c := a.dynamicClient.Resource(gvr) // This is an approximation to what `kubectl` does in filling // in the fallback namespace (from config). In the case of // non-namespaced entities, it will be ignored by the fake // client (FIXME: make sure of this). - apiRes := findAPIResource(gvr, a.discovery) + apiRes := findAPIResource(gvr, a.coreClient.Discovery()) if apiRes == nil { panic("no APIResource found for " + gvr.String()) } @@ -159,11 +160,40 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[flux.Resource errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } + if res.GetKind() == "Namespace" { + // We also create namespaces in the core fake client since the dynamic client + // and core clients don't share resources + var ns corev1.Namespace + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstruct, &ns); err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + _, err := a.coreClient.CoreV1().Namespaces().Get(ns.Name, metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): + _, err = a.coreClient.CoreV1().Namespaces().Create(&ns) + case err == nil: + _, err = a.coreClient.CoreV1().Namespaces().Update(&ns) + } + if err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + } + } else if cmd == "delete" { if err := dc.Delete(name, &metav1.DeleteOptions{}); err != nil { errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } + if res.GetKind() == "Namespace" { + // We also create namespaces in the core fake client since the dynamic client + // and core clients don't share resources + if err := a.coreClient.CoreV1().Namespaces().Delete(res.GetName(), &metav1.DeleteOptions{}); err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + } } else { panic("unknown action: " + cmd) } @@ -202,7 +232,7 @@ func findAPIResource(gvr schema.GroupVersionResource, disco discovery.DiscoveryI func setup(t *testing.T) (*Cluster, *fakeApplier) { clients := fakeClients() - applier := &fakeApplier{client: clients.dynamicClient, discovery: clients.coreClient.Discovery(), defaultNS: defaultTestNamespace} + applier := &fakeApplier{dynamicClient: clients.dynamicClient, coreClient: clients.coreClient, defaultNS: defaultTestNamespace} kube := &Cluster{ applier: applier, client: clients, @@ -222,6 +252,13 @@ func TestSyncNop(t *testing.T) { } func TestSync(t *testing.T) { + const ns1 = `--- +apiVersion: v1 +kind: Namespace +metadata: + name: foobar +` + const defs1 = `--- apiVersion: apps/v1 kind: Deployment @@ -238,6 +275,13 @@ metadata: namespace: foobar ` + const ns3 = `--- +apiVersion: v1 +kind: Namespace +metadata: + name: other +` + const defs3 = `--- apiVersion: apps/v1 kind: Deployment @@ -245,7 +289,6 @@ metadata: name: dep3 namespace: other ` - // checkSame is a check that a result returned from the cluster is // the same as an expected. labels and annotations may be altered // by the sync process; we'll look at the "spec" field as an @@ -320,15 +363,15 @@ metadata: // without GC on, resources persist if they are not mentioned in subsequent syncs. test(t, kube, "", "", false) - test(t, kube, defs1, defs1, false) - test(t, kube, defs1+defs2, defs1+defs2, false) - test(t, kube, defs3, defs1+defs2+defs3, false) + test(t, kube, ns1+defs1, ns1+defs1, false) + test(t, kube, ns1+defs1+defs2, ns1+defs1+defs2, false) + test(t, kube, ns3+defs3, ns1+defs1+defs2+ns3+defs3, false) // Now with GC switched on. That means if we don't include a // resource in a sync, it should be deleted. kube.GC = true - test(t, kube, defs2+defs3, defs3+defs2, false) - test(t, kube, defs1+defs2, defs1+defs2, false) + test(t, kube, ns1+defs2+ns3+defs3, ns1+defs2+ns3+defs3, false) + test(t, kube, ns1+defs1+defs2, ns1+defs1+defs2, false) test(t, kube, "", "", false) }) @@ -385,7 +428,7 @@ metadata: `, depName, depNS) // Add dep to the cluster through syncing - test(t, kube, dep, dep, false) + test(t, kube, ns1+dep, ns1+dep, false) // Add a copy of dep (including the GCmark label) with different name directly to the cluster gvr := schema.GroupVersionResource{ @@ -421,7 +464,7 @@ metadata: kube, _ := setup(t) kube.GC = true - const defs1invalid = ` + const defs1invalid = `--- apiVersion: apps/v1 kind: Deployment metadata: @@ -430,8 +473,8 @@ metadata: annotations: error: fail to apply this ` - test(t, kube, defs1, defs1, false) - test(t, kube, defs1invalid, defs1, true) + test(t, kube, ns1+defs1, ns1+defs1, false) + test(t, kube, ns1+defs1invalid, ns1+defs1invalid, true) }) t.Run("sync doesn't apply or delete manifests marked with ignore", func(t *testing.T) { @@ -459,7 +502,7 @@ metadata: ` // dep1 is created, but dep2 is ignored - test(t, kube, dep1+dep2, dep1, false) + test(t, kube, ns1+dep1+dep2, ns1+dep1, false) const dep1ignored = `--- apiVersion: apps/v1 @@ -474,11 +517,11 @@ spec: labels: {app: bar} ` // dep1 is not updated, but neither is it deleted - test(t, kube, dep1ignored+dep2, dep1, false) + test(t, kube, ns1+dep1ignored+dep2, ns1+dep1, false) }) t.Run("sync doesn't update a cluster resource marked with ignore", func(t *testing.T) { - const dep1 = ` + const dep1 = `--- apiVersion: apps/v1 kind: Deployment metadata: @@ -491,7 +534,7 @@ spec: ` kube, _ := setup(t) // This just checks the starting assumption: dep1 exists in the cluster - test(t, kube, dep1, dep1, false) + test(t, kube, ns1+dep1, ns1+dep1, false) // Now we'll mark it as ignored _in the cluster_ (i.e., the // equivalent of `kubectl annotate`) @@ -512,7 +555,7 @@ spec: t.Fatal(err) } - const mod1 = ` + const mod1 = `--- apiVersion: apps/v1 kind: Deployment metadata: @@ -525,7 +568,7 @@ spec: ` // Check that dep1, which is marked ignore in the cluster, is // neither updated or deleted - test(t, kube, mod1, dep1, false) + test(t, kube, ns1+mod1, ns1+dep1, false) }) t.Run("sync doesn't update or delete a pre-existing resource marked with ignore", func(t *testing.T) { @@ -547,7 +590,12 @@ spec: assert.NoError(t, err) dep1res := &unstructured.Unstructured{Object: dep1obj} gvr := groupVersionResource(dep1res) + var ns1obj corev1.Namespace + err = yaml.Unmarshal([]byte(ns1), &ns1obj) + assert.NoError(t, err) // Put the pre-existing resource in the cluster + _, err = kube.client.coreClient.CoreV1().Namespaces().Create(&ns1obj) + assert.NoError(t, err) dc := kube.client.dynamicClient.Resource(gvr).Namespace(dep1res.GetNamespace()) _, err = dc.Create(dep1res) assert.NoError(t, err)