diff --git a/Gopkg.lock b/Gopkg.lock index f9696fc42..c1c4f6a82 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -622,6 +622,14 @@ revision = "0599d764e054d4e983bb120e30759179fafe3942" version = "v1.2.0" +[[projects]] + branch = "master" + digest = "1:2e43242796ee48ff0256eaf784ffaca015614ea5cb284cbc7b6e0fb65c219887" + name = "github.com/whilp/git-urls" + packages = ["."] + pruneopts = "" + revision = "31bac0d230fa29f36ed1b3279c2343752e7196c0" + [[projects]] branch = "master" digest = "1:2ea6df0f542cc95a5e374e9cdd81eaa599ed0d55366eef92d2f6b9efa2795c07" @@ -830,12 +838,21 @@ version = "kubernetes-1.11.0" [[projects]] - branch = "master" - digest = "1:0a865e7f317907161f1c2df5f270595b9d7ba09850b7eb39a2b11cc8f9a50d28" + digest = "1:26842d8ec9a7f675635f1c2248991f190610109c446f8b8f405be916d8241f12" name = "k8s.io/apiextensions-apiserver" - packages = ["pkg/features"] + packages = [ + "pkg/apis/apiextensions", + "pkg/apis/apiextensions/v1beta1", + "pkg/client/clientset/clientset", + "pkg/client/clientset/clientset/fake", + "pkg/client/clientset/clientset/scheme", + "pkg/client/clientset/clientset/typed/apiextensions/v1beta1", + "pkg/client/clientset/clientset/typed/apiextensions/v1beta1/fake", + "pkg/features", + ] pruneopts = "" - revision = "84f7c7786e298ad1479d00ff314a2cfa0006cd0c" + revision = "3de98c57bc05a81cf463e0ad7a0af4cec8a5b510" + version = "kubernetes-1.11.0" [[projects]] digest = "1:b6b2fb7b4da1ac973b64534ace2299a02504f16bc7820cb48edb8ca4077183e1" @@ -906,7 +923,9 @@ name = "k8s.io/client-go" packages = [ "discovery", + "discovery/cached", "discovery/fake", + "dynamic", "kubernetes", "kubernetes/fake", "kubernetes/scheme", @@ -1185,6 +1204,7 @@ "github.com/google/go-cmp/cmp", "github.com/gorilla/mux", "github.com/gorilla/websocket", + "github.com/imdario/mergo", "github.com/justinbarrick/go-k8s-portforward", "github.com/ncabatoff/go-seq/seq", "github.com/opencontainers/go-digest", @@ -1198,14 +1218,20 @@ "github.com/stretchr/testify/assert", "github.com/weaveworks/common/middleware", "github.com/weaveworks/go-checkpoint", + "github.com/whilp/git-urls", "golang.org/x/sys/unix", "golang.org/x/time/rate", "gopkg.in/yaml.v2", "k8s.io/api/apps/v1", "k8s.io/api/batch/v1beta1", "k8s.io/api/core/v1", + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1", + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset", + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake", "k8s.io/apimachinery/pkg/api/errors", + "k8s.io/apimachinery/pkg/api/meta", "k8s.io/apimachinery/pkg/apis/meta/v1", + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured", "k8s.io/apimachinery/pkg/labels", "k8s.io/apimachinery/pkg/runtime", "k8s.io/apimachinery/pkg/runtime/schema", @@ -1215,7 +1241,9 @@ "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/discovery", + "k8s.io/client-go/discovery/cached", "k8s.io/client-go/discovery/fake", + "k8s.io/client-go/dynamic", "k8s.io/client-go/kubernetes", "k8s.io/client-go/kubernetes/fake", "k8s.io/client-go/kubernetes/scheme", diff --git a/Gopkg.toml b/Gopkg.toml index 884f7acb5..4d09c1c77 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -26,6 +26,10 @@ required = ["k8s.io/code-generator/cmd/client-gen"] name = "k8s.io/apimachinery" version = "kubernetes-1.11.0" +[[constraint]] + name = "k8s.io/apiextensions-apiserver" + version = "kubernetes-1.11.0" + [[constraint]] name = "k8s.io/client-go" version = "8.0.0" @@ -57,3 +61,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"] [[override]] name = "github.com/BurntSushi/toml" version = "v0.3.1" + +[[constraint]] + name = "github.com/imdario/mergo" + version = "0.3.2" diff --git a/cluster/cluster.go b/cluster/cluster.go index d153ff328..b97278127 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -29,7 +29,7 @@ type Cluster interface { SomeControllers([]flux.ResourceID) ([]Controller, error) Ping() error Export() ([]byte, error) - Sync(SyncDef) error + Sync(SyncSet) error PublicSSHKey(regenerate bool) (ssh.PublicKey, error) } @@ -74,7 +74,7 @@ type Controller struct { Rollout RolloutStatus // Errors during the recurring sync from the Git repository to the // cluster will surface here. - SyncError error + SyncError error Containers ContainersOrExcuse } diff --git a/cluster/kubernetes/cached_disco.go b/cluster/kubernetes/cached_disco.go new file mode 100644 index 000000000..5f0a7e69a --- /dev/null +++ b/cluster/kubernetes/cached_disco.go @@ -0,0 +1,102 @@ +package kubernetes + +import ( + "sync" + "time" + + crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + crd "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/discovery" + discocache "k8s.io/client-go/discovery/cached" + toolscache "k8s.io/client-go/tools/cache" +) + +// This exists so that we can do our own invalidation. +type cachedDiscovery struct { + discovery.CachedDiscoveryInterface + + invalidMu sync.Mutex + invalid bool +} + +// The k8s.io/client-go v8.0.0 implementation of MemCacheDiscovery +// refreshes the cached values, synchronously, when Invalidate() is +// called. Since we want to invalidate every time a CRD changes, but +// only refresh values when we need to read the cached values, this +// method defers the invalidation until a read is done. +func (d *cachedDiscovery) Invalidate() { + d.invalidMu.Lock() + d.invalid = true + d.invalidMu.Unlock() +} + +// ServerResourcesForGroupVersion is the method used by the +// namespacer; so, this is the one where we check whether the cache +// has been invalidated. A cachedDiscovery implementation for more +// general use would do this for all methods (that weren't implemented +// purely in terms of other methods). +func (d *cachedDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + d.invalidMu.Lock() + invalid := d.invalid + d.invalid = false + d.invalidMu.Unlock() + if invalid { + d.CachedDiscoveryInterface.Invalidate() + } + return d.CachedDiscoveryInterface.ServerResourcesForGroupVersion(groupVersion) +} + +// MakeCachedDiscovery constructs a CachedDicoveryInterface that will +// be invalidated whenever the set of CRDs change. The idea is that +// the only avenue of a change to the API resources in a running +// system is CRDs being added, updated or deleted. +func MakeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}) discovery.CachedDiscoveryInterface { + result, _, _ := makeCachedDiscovery(d, c, shutdown, makeInvalidatingHandler) + return result +} + +// --- + +func makeInvalidatingHandler(cached discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler { + var handler toolscache.ResourceEventHandler = toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(_ interface{}) { + cached.Invalidate() + }, + UpdateFunc: func(_, _ interface{}) { + cached.Invalidate() + }, + DeleteFunc: func(_ interface{}) { + cached.Invalidate() + }, + } + return handler +} + +type makeHandle func(discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler + +// makeCachedDiscovery constructs a cached discovery client, with more +// flexibility than MakeCachedDiscovery; e.g., with extra handlers for +// testing. +func makeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}, handlerFn makeHandle) (*cachedDiscovery, toolscache.Store, toolscache.Controller) { + cachedDisco := &cachedDiscovery{CachedDiscoveryInterface: discocache.NewMemCacheClient(d)} + // We have an empty cache, so it's _a priori_ invalid. (Yes, that's the zero value, but better safe than sorry) + cachedDisco.Invalidate() + + crdClient := c.ApiextensionsV1beta1().CustomResourceDefinitions() + lw := &toolscache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return crdClient.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return crdClient.Watch(options) + }, + } + + handler := handlerFn(cachedDisco) + store, controller := toolscache.NewInformer(lw, &crdv1beta1.CustomResourceDefinition{}, 5*time.Minute, handler) + go controller.Run(shutdown) + return cachedDisco, store, controller +} diff --git a/cluster/kubernetes/cached_disco_test.go b/cluster/kubernetes/cached_disco_test.go new file mode 100644 index 000000000..82c5dfc6c --- /dev/null +++ b/cluster/kubernetes/cached_disco_test.go @@ -0,0 +1,134 @@ +package kubernetes + +import ( + "testing" + "time" + + crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" + toolscache "k8s.io/client-go/tools/cache" +) + +type chainHandler struct { + first toolscache.ResourceEventHandler + next toolscache.ResourceEventHandler +} + +func (h chainHandler) OnAdd(obj interface{}) { + h.first.OnAdd(obj) + h.next.OnAdd(obj) +} + +func (h chainHandler) OnUpdate(old, new interface{}) { + h.first.OnUpdate(old, new) + h.next.OnUpdate(old, new) +} + +func (h chainHandler) OnDelete(old interface{}) { + h.first.OnDelete(old) + h.next.OnDelete(old) +} + +func TestCachedDiscovery(t *testing.T) { + coreClient := makeFakeClient() + + myCRD := &crdv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "custom", + }, + } + crdClient := crdfake.NewSimpleClientset(myCRD) + + // Here's my fake API resource + myAPI := &metav1.APIResourceList{ + GroupVersion: "foo/v1", + APIResources: []metav1.APIResource{ + {Name: "customs", SingularName: "custom", Namespaced: true, Kind: "Custom", Verbs: getAndList}, + }, + } + + apiResources := coreClient.Fake.Resources + coreClient.Fake.Resources = append(apiResources, myAPI) + + shutdown := make(chan struct{}) + defer close(shutdown) + + // this extra handler means we can synchronise on the add later + // being processed + allowAdd := make(chan interface{}) + + addHandler := toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + allowAdd <- obj + }, + } + makeHandler := func(d discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler { + return chainHandler{first: addHandler, next: makeInvalidatingHandler(d)} + } + + cachedDisco, store, _ := makeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown, makeHandler) + + saved := getDefaultNamespace + getDefaultNamespace = func() (string, error) { return "bar-ns", nil } + defer func() { getDefaultNamespace = saved }() + namespacer, err := NewNamespacer(cachedDisco) + if err != nil { + t.Fatal(err) + } + + namespaced, err := namespacer.lookupNamespaced("foo/v1", "Custom") + if err != nil { + t.Fatal(err) + } + if !namespaced { + t.Error("got false from lookupNamespaced, expecting true") + } + + // In a cluster, we'd rely on the apiextensions server to reflect + // changes to CRDs to changes in the API resources. Here I will be + // more narrow, and just test that the API resources are reloaded + // when a CRD is updated or deleted. + + // This is delicate: we can't just change the value in-place, + // since that will update everyone's record of it, and the test + // below will trivially succeed. + updatedAPI := &metav1.APIResourceList{ + GroupVersion: "foo/v1", + APIResources: []metav1.APIResource{ + {Name: "customs", SingularName: "custom", Namespaced: false /* <-- changed */, Kind: "Custom", Verbs: getAndList}, + }, + } + coreClient.Fake.Resources = append(apiResources, updatedAPI) + + // Provoke the cached discovery client into invalidating + _, err = crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(myCRD) + if err != nil { + t.Fatal(err) + } + + // Wait for the update to "go through" + select { + case <-allowAdd: + break + case <-time.After(time.Second): + t.Fatal("timed out waiting for Add to happen") + } + + _, exists, err := store.Get(myCRD) + if err != nil { + t.Error(err) + } + if !exists { + t.Error("does not exist") + } + + namespaced, err = namespacer.lookupNamespaced("foo/v1", "Custom") + if err != nil { + t.Fatal(err) + } + if namespaced { + t.Error("got true from lookupNamespaced, expecting false (after changing it)") + } +} diff --git a/cluster/kubernetes/doc.go b/cluster/kubernetes/doc.go index 9d8d86e63..230cdba94 100644 --- a/cluster/kubernetes/doc.go +++ b/cluster/kubernetes/doc.go @@ -3,5 +3,4 @@ Package kubernetes provides implementations of `Cluster` and `Manifests` that interact with the Kubernetes API (using kubectl or the k8s API client). */ - package kubernetes diff --git a/cluster/kubernetes/fakedynamicclient_test.go b/cluster/kubernetes/fakedynamicclient_test.go new file mode 100644 index 000000000..7e4f3df42 --- /dev/null +++ b/cluster/kubernetes/fakedynamicclient_test.go @@ -0,0 +1,387 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +/* +This file was obtained from +https://github.com/kubernetes/client-go/blob/4b43750b963d2b6e0f7527fe558e71c47bfc5045/dynamic/fake/simple.go +and modified in the following way(s): + + - the package was changed to `kubernetes` + +This file is here because it has a fix for +https://github.com/kubernetes/client-go/issues/465, which is included +in client-go v9, which we're not able to vendor at this time. + +It can be removed, and the fake clientset from the original package +used, when we are ready to vendor client-go v9 and kubernetes-1.12. +*/ + +import ( + "strings" + gotesting "testing" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/testing" +) + +func TestFakeClientConstruction(t *gotesting.T) { +} + +func NewSimpleDynamicClient(scheme *runtime.Scheme, objects ...runtime.Object) *FakeDynamicClient { + // In order to use List with this client, you have to have the v1.List registered in your scheme. Neat thing though + // it does NOT have to be the *same* list + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "fake-dynamic-client-group", Version: "v1", Kind: "List"}, &unstructured.UnstructuredList{}) + + codecs := serializer.NewCodecFactory(scheme) + o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder()) + for _, obj := range objects { + if err := o.Add(obj); err != nil { + panic(err) + } + } + + cs := &FakeDynamicClient{} + cs.AddReactor("*", "*", testing.ObjectReaction(o)) + cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + ns := action.GetNamespace() + watch, err := o.Watch(gvr, ns) + if err != nil { + return false, nil, err + } + return true, watch, nil + }) + + return cs +} + +// Clientset implements clientset.Interface. Meant to be embedded into a +// struct to get a default implementation. This makes faking out just the method +// you want to test easier. +type FakeDynamicClient struct { + testing.Fake + scheme *runtime.Scheme +} + +type dynamicResourceClient struct { + client *FakeDynamicClient + namespace string + resource schema.GroupVersionResource +} + +var _ dynamic.Interface = &FakeDynamicClient{} + +func (c *FakeDynamicClient) Resource(resource schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + return &dynamicResourceClient{client: c, resource: resource} +} + +func (c *dynamicResourceClient) Namespace(ns string) dynamic.ResourceInterface { + ret := *c + ret.namespace = ns + return &ret +} + +func (c *dynamicResourceClient) Create(obj *unstructured.Unstructured, subresources ...string) (*unstructured.Unstructured, error) { + var uncastRet runtime.Object + var err error + switch { + case len(c.namespace) == 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootCreateAction(c.resource, obj), obj) + + case len(c.namespace) == 0 && len(subresources) > 0: + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + name := accessor.GetName() + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootCreateSubresourceAction(c.resource, name, strings.Join(subresources, "/"), obj), obj) + + case len(c.namespace) > 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewCreateAction(c.resource, c.namespace, obj), obj) + + case len(c.namespace) > 0 && len(subresources) > 0: + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + name := accessor.GetName() + uncastRet, err = c.client.Fake. + Invokes(testing.NewCreateSubresourceAction(c.resource, name, strings.Join(subresources, "/"), c.namespace, obj), obj) + + } + + if err != nil { + return nil, err + } + if uncastRet == nil { + return nil, err + } + + ret := &unstructured.Unstructured{} + if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil { + return nil, err + } + return ret, err +} + +func (c *dynamicResourceClient) Update(obj *unstructured.Unstructured, subresources ...string) (*unstructured.Unstructured, error) { + var uncastRet runtime.Object + var err error + switch { + case len(c.namespace) == 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootUpdateAction(c.resource, obj), obj) + + case len(c.namespace) == 0 && len(subresources) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(c.resource, strings.Join(subresources, "/"), obj), obj) + + case len(c.namespace) > 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewUpdateAction(c.resource, c.namespace, obj), obj) + + case len(c.namespace) > 0 && len(subresources) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewUpdateSubresourceAction(c.resource, strings.Join(subresources, "/"), c.namespace, obj), obj) + + } + + if err != nil { + return nil, err + } + if uncastRet == nil { + return nil, err + } + + ret := &unstructured.Unstructured{} + if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil { + return nil, err + } + return ret, err +} + +func (c *dynamicResourceClient) UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + var uncastRet runtime.Object + var err error + switch { + case len(c.namespace) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(c.resource, "status", obj), obj) + + case len(c.namespace) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewUpdateSubresourceAction(c.resource, "status", c.namespace, obj), obj) + + } + + if err != nil { + return nil, err + } + if uncastRet == nil { + return nil, err + } + + ret := &unstructured.Unstructured{} + if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil { + return nil, err + } + return ret, err +} + +func (c *dynamicResourceClient) Delete(name string, opts *metav1.DeleteOptions, subresources ...string) error { + var err error + switch { + case len(c.namespace) == 0 && len(subresources) == 0: + _, err = c.client.Fake. + Invokes(testing.NewRootDeleteAction(c.resource, name), &metav1.Status{Status: "dynamic delete fail"}) + + case len(c.namespace) == 0 && len(subresources) > 0: + _, err = c.client.Fake. + Invokes(testing.NewRootDeleteSubresourceAction(c.resource, strings.Join(subresources, "/"), name), &metav1.Status{Status: "dynamic delete fail"}) + + case len(c.namespace) > 0 && len(subresources) == 0: + _, err = c.client.Fake. + Invokes(testing.NewDeleteAction(c.resource, c.namespace, name), &metav1.Status{Status: "dynamic delete fail"}) + + case len(c.namespace) > 0 && len(subresources) > 0: + _, err = c.client.Fake. + Invokes(testing.NewDeleteSubresourceAction(c.resource, strings.Join(subresources, "/"), c.namespace, name), &metav1.Status{Status: "dynamic delete fail"}) + } + + return err +} + +func (c *dynamicResourceClient) DeleteCollection(opts *metav1.DeleteOptions, listOptions metav1.ListOptions) error { + var err error + switch { + case len(c.namespace) == 0: + action := testing.NewRootDeleteCollectionAction(c.resource, listOptions) + _, err = c.client.Fake.Invokes(action, &metav1.Status{Status: "dynamic deletecollection fail"}) + + case len(c.namespace) > 0: + action := testing.NewDeleteCollectionAction(c.resource, c.namespace, listOptions) + _, err = c.client.Fake.Invokes(action, &metav1.Status{Status: "dynamic deletecollection fail"}) + + } + + return err +} + +func (c *dynamicResourceClient) Get(name string, opts metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) { + var uncastRet runtime.Object + var err error + switch { + case len(c.namespace) == 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootGetAction(c.resource, name), &metav1.Status{Status: "dynamic get fail"}) + + case len(c.namespace) == 0 && len(subresources) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootGetSubresourceAction(c.resource, strings.Join(subresources, "/"), name), &metav1.Status{Status: "dynamic get fail"}) + + case len(c.namespace) > 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewGetAction(c.resource, c.namespace, name), &metav1.Status{Status: "dynamic get fail"}) + + case len(c.namespace) > 0 && len(subresources) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewGetSubresourceAction(c.resource, c.namespace, strings.Join(subresources, "/"), name), &metav1.Status{Status: "dynamic get fail"}) + } + + if err != nil { + return nil, err + } + if uncastRet == nil { + return nil, err + } + + ret := &unstructured.Unstructured{} + if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil { + return nil, err + } + return ret, err +} + +func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + var obj runtime.Object + var err error + switch { + case len(c.namespace) == 0: + obj, err = c.client.Fake. + Invokes(testing.NewRootListAction(c.resource, schema.GroupVersionKind{Group: "fake-dynamic-client-group", Version: "v1", Kind: "" /*List is appended by the tracker automatically*/}, opts), &metav1.Status{Status: "dynamic list fail"}) + + case len(c.namespace) > 0: + obj, err = c.client.Fake. + Invokes(testing.NewListAction(c.resource, schema.GroupVersionKind{Group: "fake-dynamic-client-group", Version: "v1", Kind: "" /*List is appended by the tracker automatically*/}, c.namespace, opts), &metav1.Status{Status: "dynamic list fail"}) + + } + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + + retUnstructured := &unstructured.Unstructured{} + if err := c.client.scheme.Convert(obj, retUnstructured, nil); err != nil { + return nil, err + } + entireList, err := retUnstructured.ToList() + if err != nil { + return nil, err + } + + list := &unstructured.UnstructuredList{} + for i := range entireList.Items { + item := &entireList.Items[i] + metadata, err := meta.Accessor(item) + if err != nil { + return nil, err + } + if label.Matches(labels.Set(metadata.GetLabels())) { + list.Items = append(list.Items, *item) + } + } + return list, nil +} + +func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { + switch { + case len(c.namespace) == 0: + return c.client.Fake. + InvokesWatch(testing.NewRootWatchAction(c.resource, opts)) + + case len(c.namespace) > 0: + return c.client.Fake. + InvokesWatch(testing.NewWatchAction(c.resource, c.namespace, opts)) + + } + + panic("math broke") +} + +func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error) { + var uncastRet runtime.Object + var err error + switch { + case len(c.namespace) == 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootPatchAction(c.resource, name, data), &metav1.Status{Status: "dynamic patch fail"}) + + case len(c.namespace) == 0 && len(subresources) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewRootPatchSubresourceAction(c.resource, name, data, subresources...), &metav1.Status{Status: "dynamic patch fail"}) + + case len(c.namespace) > 0 && len(subresources) == 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewPatchAction(c.resource, c.namespace, name, data), &metav1.Status{Status: "dynamic patch fail"}) + + case len(c.namespace) > 0 && len(subresources) > 0: + uncastRet, err = c.client.Fake. + Invokes(testing.NewPatchSubresourceAction(c.resource, c.namespace, name, data, subresources...), &metav1.Status{Status: "dynamic patch fail"}) + + } + + if err != nil { + return nil, err + } + if uncastRet == nil { + return nil, err + } + + ret := &unstructured.Unstructured{} + if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil { + return nil, err + } + return ret, err +} diff --git a/cluster/kubernetes/images.go b/cluster/kubernetes/images.go index acea81cce..8370ddb88 100644 --- a/cluster/kubernetes/images.go +++ b/cluster/kubernetes/images.go @@ -17,7 +17,7 @@ import ( func mergeCredentials(log func(...interface{}) error, includeImage func(imageName string) bool, - client extendedClient, + client ExtendedClient, namespace string, podTemplate apiv1.PodTemplateSpec, imageCreds registry.ImageCreds, seenCreds map[string]registry.Credentials) { diff --git a/cluster/kubernetes/images_test.go b/cluster/kubernetes/images_test.go index ba63812d8..a94682c5a 100644 --- a/cluster/kubernetes/images_test.go +++ b/cluster/kubernetes/images_test.go @@ -64,7 +64,7 @@ func TestMergeCredentials(t *testing.T) { makeServiceAccount(ns, saName, []string{secretName2}), makeImagePullSecret(ns, secretName1, "docker.io"), makeImagePullSecret(ns, secretName2, "quay.io")) - client := extendedClient{clientset, nil} + client := ExtendedClient{coreClient: clientset} creds := registry.ImageCreds{} @@ -97,7 +97,7 @@ func TestMergeCredentials_ImageExclusion(t *testing.T) { } clientset := fake.NewSimpleClientset() - client := extendedClient{clientset, nil} + client := ExtendedClient{coreClient: clientset} var includeImage = func(imageName string) bool { for _, exp := range []string{"k8s.gcr.io/*", "*test*"} { diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index 7f4459f12..da67544c2 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -8,48 +8,38 @@ import ( k8syaml "github.com/ghodss/yaml" "github.com/go-kit/kit/log" "github.com/pkg/errors" - fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned" - "gopkg.in/yaml.v2" apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" + k8sclientdynamic "k8s.io/client-go/dynamic" k8sclient "k8s.io/client-go/kubernetes" "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" - "github.com/weaveworks/flux/resource" + fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned" "github.com/weaveworks/flux/ssh" ) type coreClient k8sclient.Interface +type dynamicClient k8sclientdynamic.Interface type fluxHelmClient fhrclient.Interface +type discoveryClient discovery.DiscoveryInterface -type extendedClient struct { +type ExtendedClient struct { coreClient + dynamicClient fluxHelmClient + discoveryClient } -// --- internal types for keeping track of syncing - -type metadata struct { - Name string `yaml:"name"` - Namespace string `yaml:"namespace"` -} - -type apiObject struct { - resource.Resource - Kind string `yaml:"kind"` - Metadata metadata `yaml:"metadata"` -} - -// A convenience for getting an minimal object from some bytes. -func parseObj(def []byte) (*apiObject, error) { - obj := apiObject{} - return &obj, yaml.Unmarshal(def, &obj) -} - -func (o *apiObject) hasNamespace() bool { - return o.Metadata.Namespace != "" +func MakeClusterClientset(core coreClient, dyn dynamicClient, fluxhelm fluxHelmClient, disco discoveryClient) ExtendedClient { + return ExtendedClient{ + coreClient: core, + dynamicClient: dyn, + fluxHelmClient: fluxhelm, + discoveryClient: disco, + } } // --- add-ons @@ -90,8 +80,12 @@ func isAddon(obj k8sObject) bool { // Cluster is a handle to a Kubernetes API server. // (Typically, this code is deployed into the same cluster.) type Cluster struct { - client extendedClient - applier Applier + // Do garbage collection when syncing resources + GC bool + + client ExtendedClient + applier Applier + version string // string response for the version command. logger log.Logger sshKeyRing ssh.KeyRing @@ -109,19 +103,9 @@ type Cluster struct { } // NewCluster returns a usable cluster. -func NewCluster(clientset k8sclient.Interface, - fluxHelmClientset fhrclient.Interface, - applier Applier, - sshKeyRing ssh.KeyRing, - logger log.Logger, - nsWhitelist []string, - imageExcludeList []string) *Cluster { - +func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, nsWhitelist []string, imageExcludeList []string) *Cluster { c := &Cluster{ - client: extendedClient{ - clientset, - fluxHelmClientset, - }, + client: client, applier: applier, logger: logger, sshKeyRing: sshKeyRing, @@ -212,61 +196,12 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er return allControllers, nil } -// Sync performs the given actions on resources. Operations are -// asynchronous, but serialised. -func (c *Cluster) Sync(spec cluster.SyncDef) error { - logger := log.With(c.logger, "method", "Sync") - - cs := makeChangeSet() - var errs cluster.SyncError - for _, action := range spec.Actions { - stages := []struct { - res resource.Resource - cmd string - }{ - {action.Delete, "delete"}, - {action.Apply, "apply"}, - } - for _, stage := range stages { - if stage.res == nil { - continue - } - obj, err := parseObj(stage.res.Bytes()) - if err == nil { - obj.Resource = stage.res - cs.stage(stage.cmd, obj) - } else { - errs = append(errs, cluster.ResourceError{Resource: stage.res, Error: err}) - break - } - } - } - - c.mu.Lock() - defer c.mu.Unlock() - c.muSyncErrors.RLock() - if applyErrs := c.applier.apply(logger, cs, c.syncErrors); len(applyErrs) > 0 { - errs = append(errs, applyErrs...) - } - c.muSyncErrors.RUnlock() - - // If `nil`, errs is a cluster.SyncError(nil) rather than error(nil) - if errs == nil { - return nil - } - - // It is expected that Cluster.Sync is invoked with *all* resources. - // Otherwise it will override previously recorded sync errors. - c.setSyncErrors(errs) - return errs -} - func (c *Cluster) setSyncErrors(errs cluster.SyncError) { c.muSyncErrors.Lock() defer c.muSyncErrors.Unlock() c.syncErrors = make(map[flux.ResourceID]error) for _, e := range errs { - c.syncErrors[e.ResourceID()] = e.Error + c.syncErrors[e.ResourceID] = e.Error } } @@ -322,22 +257,6 @@ func (c *Cluster) Export() ([]byte, error) { return config.Bytes(), nil } -// kind & apiVersion must be passed separately as the object's TypeMeta is not populated -func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error { - yamlBytes, err := k8syaml.Marshal(object) - if err != nil { - return err - } - buffer.WriteString("---\n") - buffer.WriteString("apiVersion: ") - buffer.WriteString(apiVersion) - buffer.WriteString("\nkind: ") - buffer.WriteString(kind) - buffer.WriteString("\n") - buffer.Write(yamlBytes) - return nil -} - func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) { if regenerate { if err := c.sshKeyRing.Regenerate(); err != nil { @@ -380,3 +299,19 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) { } return namespaces.Items, nil } + +// kind & apiVersion must be passed separately as the object's TypeMeta is not populated +func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error { + yamlBytes, err := k8syaml.Marshal(object) + if err != nil { + return err + } + buffer.WriteString("---\n") + buffer.WriteString("apiVersion: ") + buffer.WriteString(apiVersion) + buffer.WriteString("\nkind: ") + buffer.WriteString(kind) + buffer.WriteString("\n") + buffer.Write(yamlBytes) + return nil +} diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index e11538d17..0d80fd56a 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -25,8 +25,8 @@ func newNamespace(name string) *apiv1.Namespace { func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []string) { clientset := fakekubernetes.NewSimpleClientset(newNamespace("default"), newNamespace("kube-system")) - - c := NewCluster(clientset, nil, nil, nil, log.NewNopLogger(), namespace, []string{}) + client := ExtendedClient{coreClient: clientset} + c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{}) namespaces, err := c.getAllowedNamespaces() if err != nil { diff --git a/cluster/kubernetes/manifests.go b/cluster/kubernetes/manifests.go index 18c424161..4bf8eecac 100644 --- a/cluster/kubernetes/manifests.go +++ b/cluster/kubernetes/manifests.go @@ -7,15 +7,46 @@ import ( "github.com/weaveworks/flux/resource" ) +// namespacer assigns namespaces to manifests that need it (or "" if +// the manifest should not have a namespace. +type namespacer interface { + // EffectiveNamespace gives the namespace that would be used were + // the manifest to be applied. This may be "", indicating that it + // should not have a namespace (i.e., it's a cluster-level + // resource). + EffectiveNamespace(kresource.KubeManifest) (string, error) +} + +// Manifests is an implementation of cluster.Manifests, particular to +// Kubernetes. Aside from loading manifests from files, it does some +// "post-processsing" to make sure the view of the manifests is what +// would be applied; in particular, it fills in the namespace of +// manifests that would be given a default namespace when applied. type Manifests struct { + Namespacer namespacer } -func (c *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) { - return kresource.Load(base, paths) +func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) { + result := map[string]resource.Resource{} + for _, km := range manifests { + if nser != nil { + ns, err := nser.EffectiveNamespace(km) + if err != nil { + return nil, err + } + km.SetNamespace(ns) + } + result[km.ResourceID().String()] = km + } + return result, nil } -func (c *Manifests) ParseManifests(allDefs []byte) (map[string]resource.Resource, error) { - return kresource.ParseMultidoc(allDefs, "exported") +func (c *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) { + manifests, err := kresource.Load(base, paths) + if err != nil { + return nil, err + } + return postProcess(manifests, c.Namespacer) } func (c *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) { diff --git a/cluster/kubernetes/namespacer.go b/cluster/kubernetes/namespacer.go new file mode 100644 index 000000000..8386e7425 --- /dev/null +++ b/cluster/kubernetes/namespacer.go @@ -0,0 +1,82 @@ +package kubernetes + +import ( + "fmt" + + "k8s.io/client-go/discovery" + "k8s.io/client-go/tools/clientcmd" + + kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" +) + +// The namespace to presume if something doesn't have one, and we +// haven't been told what to use as a fallback. This is what +// `kubectl` uses when there's no config setting the fallback +// namespace. +const defaultFallbackNamespace = "default" + +type namespaceViaDiscovery struct { + fallbackNamespace string + disco discovery.DiscoveryInterface +} + +// NewNamespacer creates an implementation of Namespacer +func NewNamespacer(d discovery.DiscoveryInterface) (*namespaceViaDiscovery, error) { + fallback, err := getDefaultNamespace() + if err != nil { + return nil, err + } + return &namespaceViaDiscovery{fallbackNamespace: fallback, disco: d}, nil +} + +// getDefaultNamespace returns the fallback namespace used by the +// when a namespaced resource doesn't have one specified. This is +// used when syncing to anticipate the identity of a resource in the +// cluster given the manifest from a file (which may be missing the +// namespace). +// A variable is used for mocking in tests. +var getDefaultNamespace = func() (string, error) { + config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), + &clientcmd.ConfigOverrides{}, + ).RawConfig() + if err != nil { + return "", err + } + + cc := config.CurrentContext + if c, ok := config.Contexts[cc]; ok && c.Namespace != "" { + return c.Namespace, nil + } + + return defaultFallbackNamespace, nil +} + +// effectiveNamespace yields the namespace that would be used for this +// resource were it applied, taking into account the kind of the +// resource, and local configuration. +func (n *namespaceViaDiscovery) EffectiveNamespace(m kresource.KubeManifest) (string, error) { + namespaced, err := n.lookupNamespaced(m.GroupVersion(), m.GetKind()) + switch { + case err != nil: + return "", err + case namespaced && m.GetNamespace() == "": + return n.fallbackNamespace, nil + case !namespaced: + return "", nil + } + return m.GetNamespace(), nil +} + +func (n *namespaceViaDiscovery) lookupNamespaced(groupVersion, kind string) (bool, error) { + resourceList, err := n.disco.ServerResourcesForGroupVersion(groupVersion) + if err != nil { + return false, fmt.Errorf("error looking up API resources for %s.%s: %s", kind, groupVersion, err.Error()) + } + for _, resource := range resourceList.APIResources { + if resource.Kind == kind { + return resource.Namespaced, nil + } + } + return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind) +} diff --git a/cluster/kubernetes/namespacer_test.go b/cluster/kubernetes/namespacer_test.go new file mode 100644 index 000000000..6a1a97476 --- /dev/null +++ b/cluster/kubernetes/namespacer_test.go @@ -0,0 +1,117 @@ +package kubernetes + +import ( + "io/ioutil" + "os" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corefake "k8s.io/client-go/kubernetes/fake" + + kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" +) + +var getAndList = metav1.Verbs([]string{"get", "list"}) + +func makeFakeClient() *corefake.Clientset { + apiResources := []*metav1.APIResourceList{ + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + {Name: "deployments", SingularName: "deployment", Namespaced: true, Kind: "Deployment", Verbs: getAndList}, + }, + }, + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "namespaces", SingularName: "namespace", Namespaced: false, Kind: "Namespace", Verbs: getAndList}, + }, + }, + } + + coreClient := corefake.NewSimpleClientset() + coreClient.Fake.Resources = apiResources + return coreClient +} + +func TestNamespaceDefaulting(t *testing.T) { + testKubeconfig := `apiVersion: v1 +clusters: [] +contexts: +- context: + cluster: cluster + namespace: namespace + user: user + name: context +current-context: context +kind: Config +preferences: {} +users: [] +` + err := ioutil.WriteFile("testkubeconfig", []byte(testKubeconfig), 0600) + if err != nil { + t.Fatal("cannot create test kubeconfig file") + } + defer os.Remove("testkubeconfig") + + os.Setenv("KUBECONFIG", "testkubeconfig") + defer os.Unsetenv("KUBECONFIG") + coreClient := makeFakeClient() + + ns, err := getDefaultNamespace() + if err != nil { + t.Fatal("cannot get default namespace") + } + if ns != "namespace" { + t.Fatal("unexpected default namespace", ns) + } + + nser, err := NewNamespacer(coreClient.Discovery()) + if err != nil { + t.Fatal(err) + } + + const defs = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hasNamespace + namespace: foo-ns +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: noNamespace +--- +apiVersion: v1 +kind: Namespace +metadata: + name: notNamespaced + namespace: spurious +` + + manifests, err := kresource.ParseMultidoc([]byte(defs), "") + if err != nil { + t.Fatal(err) + } + + assertEffectiveNamespace := func(id, expected string) { + res, ok := manifests[id] + if !ok { + t.Errorf("manifest for %q not found", id) + return + } + got, err := nser.EffectiveNamespace(res) + if err != nil { + t.Errorf("error getting effective namespace for %q: %s", id, err.Error()) + return + } + if got != expected { + t.Errorf("expected effective namespace of %q, got %q", expected, got) + } + } + + assertEffectiveNamespace("foo-ns:deployment/hasNamespace", "foo-ns") + assertEffectiveNamespace(":deployment/noNamespace", "namespace") + assertEffectiveNamespace("spurious:namespace/notNamespaced", "") +} diff --git a/cluster/kubernetes/resource/list.go b/cluster/kubernetes/resource/list.go index f9f9fc743..3b53c3682 100644 --- a/cluster/kubernetes/resource/list.go +++ b/cluster/kubernetes/resource/list.go @@ -1,10 +1,6 @@ package resource -import ( - "github.com/weaveworks/flux/resource" -) - type List struct { baseObject - Items []resource.Resource + Items []KubeManifest } diff --git a/cluster/kubernetes/resource/load.go b/cluster/kubernetes/resource/load.go index 8803f2a3b..69f3efec4 100644 --- a/cluster/kubernetes/resource/load.go +++ b/cluster/kubernetes/resource/load.go @@ -9,17 +9,16 @@ import ( "path/filepath" "github.com/pkg/errors" - "github.com/weaveworks/flux/resource" ) // Load takes paths to directories or files, and creates an object set // based on the file(s) therein. Resources are named according to the // file content, rather than the file name of directory structure. -func Load(base string, paths []string) (map[string]resource.Resource, error) { +func Load(base string, paths []string) (map[string]KubeManifest, error) { if _, err := os.Stat(base); os.IsNotExist(err) { return nil, fmt.Errorf("git path %q not found", base) } - objs := map[string]resource.Resource{} + objs := map[string]KubeManifest{} charts, err := newChartTracker(base) if err != nil { return nil, errors.Wrapf(err, "walking %q for chartdirs", base) @@ -127,14 +126,14 @@ func looksLikeChart(dir string) bool { // ParseMultidoc takes a dump of config (a multidoc YAML) and // constructs an object set from the resources represented therein. -func ParseMultidoc(multidoc []byte, source string) (map[string]resource.Resource, error) { - objs := map[string]resource.Resource{} +func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, error) { + objs := map[string]KubeManifest{} chunks := bufio.NewScanner(bytes.NewReader(multidoc)) initialBuffer := make([]byte, 4096) // Matches startBufSize in bufio/scan.go chunks.Buffer(initialBuffer, 1024*1024) // Allow growth to 1MB chunks.Split(splitYAMLDocument) - var obj resource.Resource + var obj KubeManifest var err error for chunks.Scan() { // It's not guaranteed that the return value of Bytes() will not be mutated later: diff --git a/cluster/kubernetes/resource/load_test.go b/cluster/kubernetes/resource/load_test.go index 15b102c11..1b99c31a6 100644 --- a/cluster/kubernetes/resource/load_test.go +++ b/cluster/kubernetes/resource/load_test.go @@ -161,9 +161,11 @@ items: - kind: Deployment metadata: name: foo + namespace: ns - kind: Service metadata: name: bar + namespace: ns ` res, err := unmarshalObject("", []byte(doc)) if err != nil { @@ -177,8 +179,8 @@ items: t.Fatalf("expected two items, got %+v", list.Items) } for i, id := range []flux.ResourceID{ - flux.MustParseResourceID("default:deployment/foo"), - flux.MustParseResourceID("default:service/bar")} { + flux.MustParseResourceID("ns:deployment/foo"), + flux.MustParseResourceID("ns:service/bar")} { if list.Items[i].ResourceID() != id { t.Errorf("At %d, expected %q, got %q", i, id, list.Items[i].ResourceID()) } diff --git a/cluster/kubernetes/resource/resource.go b/cluster/kubernetes/resource/resource.go index f73d90b67..67701c3c7 100644 --- a/cluster/kubernetes/resource/resource.go +++ b/cluster/kubernetes/resource/resource.go @@ -13,39 +13,75 @@ import ( const ( PolicyPrefix = "flux.weave.works/" + ClusterScope = "" ) +// KubeManifest represents a manifest for a Kubernetes resource. For +// some Kubernetes-specific purposes we need more information that can +// be obtained from `resource.Resource`. +type KubeManifest interface { + resource.Resource + GroupVersion() string + GetKind() string + GetNamespace() string + SetNamespace(string) +} + // -- unmarshaling code for specific object and field types // struct to embed in objects, to provide default implementation type baseObject struct { source string bytes []byte - Kind string `yaml:"kind"` - Meta struct { + + // these are present for unmarshalling into the struct + APIVersion string `yaml:"apiVersion"` + Kind string `yaml:"kind"` + Meta struct { Namespace string `yaml:"namespace"` Name string `yaml:"name"` Annotations map[string]string `yaml:"annotations,omitempty"` } `yaml:"metadata"` } +// GroupVersion implements KubeManifest.GroupVersion, so things with baseObject embedded are < KubeManifest +func (o baseObject) GroupVersion() string { + return o.APIVersion +} + +// GetNamespace implements KubeManifest.GetNamespace, so things embedding baseObject are < KubeManifest +func (o baseObject) GetNamespace() string { + return o.Meta.Namespace +} + +// GetKind implements KubeManifest.GetKind +func (o baseObject) GetKind() string { + return o.Kind +} + func (o baseObject) ResourceID() flux.ResourceID { ns := o.Meta.Namespace if ns == "" { - ns = "default" + ns = ClusterScope } return flux.MakeResourceID(ns, o.Kind, o.Meta.Name) } +// SetNamespace implements KubeManifest.SetNamespace, so things with +// *baseObject embedded are < KubeManifest. NB pointer receiver. +func (o *baseObject) SetNamespace(ns string) { + o.Meta.Namespace = ns +} + // It's useful for comparisons in tests to be able to remove the // record of bytes func (o *baseObject) debyte() { o.bytes = nil } -func (o baseObject) Policy() policy.Set { +func PolicyFromAnnotations(annotations map[string]string) policy.Set { set := policy.Set{} - for k, v := range o.Meta.Annotations { + for k, v := range annotations { if strings.HasPrefix(k, PolicyPrefix) { p := strings.TrimPrefix(k, PolicyPrefix) if v == "true" { @@ -58,6 +94,10 @@ func (o baseObject) Policy() policy.Set { return set } +func (o baseObject) Policy() policy.Set { + return PolicyFromAnnotations(o.Meta.Annotations) +} + func (o baseObject) Source() string { return o.source } @@ -66,7 +106,7 @@ func (o baseObject) Bytes() []byte { return o.bytes } -func unmarshalObject(source string, bytes []byte) (resource.Resource, error) { +func unmarshalObject(source string, bytes []byte) (KubeManifest, error) { var base = baseObject{source: source, bytes: bytes} if err := yaml.Unmarshal(bytes, &base); err != nil { return nil, err @@ -78,7 +118,7 @@ func unmarshalObject(source string, bytes []byte) (resource.Resource, error) { return r, nil } -func unmarshalKind(base baseObject, bytes []byte) (resource.Resource, error) { +func unmarshalKind(base baseObject, bytes []byte) (KubeManifest, error) { switch base.Kind { case "CronJob": var cj = CronJob{baseObject: base} @@ -144,7 +184,7 @@ type rawList struct { func unmarshalList(base baseObject, raw *rawList, list *List) error { list.baseObject = base - list.Items = make([]resource.Resource, len(raw.Items), len(raw.Items)) + list.Items = make([]KubeManifest, len(raw.Items), len(raw.Items)) for i, item := range raw.Items { bytes, err := yaml.Marshal(item) if err != nil { diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index a2bd1b0f1..2dfba3b1d 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -2,6 +2,8 @@ package kubernetes import ( "bytes" + "crypto/sha1" + "encoding/hex" "fmt" "io" "os/exec" @@ -9,24 +11,288 @@ import ( "strings" "time" - rest "k8s.io/client-go/rest" - "github.com/go-kit/kit/log" + "github.com/imdario/mergo" "github.com/pkg/errors" + "gopkg.in/yaml.v2" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + rest "k8s.io/client-go/rest" + "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" + kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" + "github.com/weaveworks/flux/policy" + "github.com/weaveworks/flux/resource" +) + +const ( + syncSetLabel = kresource.PolicyPrefix + "sync-set" + checksumAnnotation = kresource.PolicyPrefix + "sync-checksum" ) +// Sync takes a definition of what should be running in the cluster, +// and attempts to make the cluster conform. An error return does not +// necessarily indicate complete failure; some resources may succeed +// in being synced, and some may fail (for example, they may be +// malformed). +func (c *Cluster) Sync(spec cluster.SyncSet) error { + logger := log.With(c.logger, "method", "Sync") + + // Keep track of the checksum of each resource, so we can compare + // them during garbage collection. + checksums := map[string]string{} + + // NB we get all resources, since we care about leaving unsynced, + // _ignored_ resources alone. + clusterResources, err := c.getResourcesBySelector("") + if err != nil { + return errors.Wrap(err, "collating resources in cluster for sync") + } + + cs := makeChangeSet() + var errs cluster.SyncError + for _, res := range spec.Resources { + id := res.ResourceID().String() + // make a record of the checksum, whether we stage it to + // be applied or not, so that we don't delete it later. + csum := sha1.Sum(res.Bytes()) + checkHex := hex.EncodeToString(csum[:]) + checksums[id] = checkHex + if res.Policy().Has(policy.Ignore) { + logger.Log("info", "not applying resource; ignore annotation in file", "resource", res.ResourceID(), "source", res.Source()) + continue + } + // It's possible to give a cluster resource the "ignore" + // annotation directly -- e.g., with `kubectl annotate` -- so + // we need to examine the cluster resource here too. + if cres, ok := clusterResources[id]; ok && cres.Policy().Has(policy.Ignore) { + logger.Log("info", "not applying resource; ignore annotation in cluster resource", "resource", cres.ResourceID()) + continue + } + resBytes, err := applyMetadata(res, spec.Name, checkHex) + if err == nil { + cs.stage("apply", res.ResourceID(), res.Source(), resBytes) + } else { + errs = append(errs, cluster.ResourceError{ResourceID: res.ResourceID(), Source: res.Source(), Error: err}) + break + } + } + + c.mu.Lock() + defer c.mu.Unlock() + c.muSyncErrors.RLock() + if applyErrs := c.applier.apply(logger, cs, c.syncErrors); len(applyErrs) > 0 { + errs = append(errs, applyErrs...) + } + c.muSyncErrors.RUnlock() + + if c.GC { + deleteErrs, gcFailure := c.collectGarbage(spec, checksums, logger) + if gcFailure != nil { + return gcFailure + } + errs = append(errs, deleteErrs...) + } + + // If `nil`, errs is a cluster.SyncError(nil) rather than error(nil), so it cannot be returned directly. + if errs == nil { + return nil + } + + // It is expected that Cluster.Sync is invoked with *all* resources. + // Otherwise it will override previously recorded sync errors. + c.setSyncErrors(errs) + return errs +} + +func (c *Cluster) collectGarbage( + spec cluster.SyncSet, + checksums map[string]string, + logger log.Logger) (cluster.SyncError, error) { + + orphanedResources := makeChangeSet() + + clusterResources, err := c.getResourcesInSyncSet(spec.Name) + if err != nil { + return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection") + } + + for resourceID, res := range clusterResources { + actual := res.GetChecksum() + expected, ok := checksums[resourceID] + + switch { + case !ok: // was not recorded as having been staged for application + c.logger.Log("info", "cluster resource not in resources to be synced; deleting", "resource", resourceID) + orphanedResources.stage("delete", res.ResourceID(), "", res.IdentifyingBytes()) + case actual != expected: + c.logger.Log("warning", "resource to be synced has not been updated; skipping", "resource", resourceID) + continue + default: + // The checksum is the same, indicating that it was + // applied earlier. Leave it alone. + } + } + + return c.applier.apply(logger, orphanedResources, nil), nil +} + +// --- internals in support of Sync + +type kuberesource struct { + obj *unstructured.Unstructured + namespaced bool +} + +// ResourceID returns the ResourceID for this resource loaded from the +// cluster. +func (r *kuberesource) ResourceID() flux.ResourceID { + ns, kind, name := r.obj.GetNamespace(), r.obj.GetKind(), r.obj.GetName() + if !r.namespaced { + ns = kresource.ClusterScope + } + return flux.MakeResourceID(ns, kind, name) +} + +// Bytes returns a byte slice description, including enough info to +// identify the resource (but not momre) +func (r *kuberesource) IdentifyingBytes() []byte { + return []byte(fmt.Sprintf(` +apiVersion: %s +kind: %s +metadata: + namespace: %q + name: %q +`, r.obj.GetAPIVersion(), r.obj.GetKind(), r.obj.GetNamespace(), r.obj.GetName())) +} + +func (r *kuberesource) Policy() policy.Set { + return kresource.PolicyFromAnnotations(r.obj.GetAnnotations()) +} + +// GetChecksum returns the checksum recorded on the resource from +// Kubernetes, or an empty string if it's not present. +func (r *kuberesource) GetChecksum() string { + return r.obj.GetAnnotations()[checksumAnnotation] +} + +func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) { + listOptions := meta_v1.ListOptions{} + if selector != "" { + listOptions.LabelSelector = selector + } + + resources, err := c.client.discoveryClient.ServerResources() + if err != nil { + return nil, err + } + + result := map[string]*kuberesource{} + + contains := func(a []string, x string) bool { + for _, n := range a { + if x == n { + return true + } + } + return false + } + + for _, resource := range resources { + for _, apiResource := range resource.APIResources { + verbs := apiResource.Verbs + if !contains(verbs, "list") { + continue + } + + groupVersion, err := schema.ParseGroupVersion(resource.GroupVersion) + if err != nil { + return nil, err + } + + resourceClient := c.client.dynamicClient.Resource(groupVersion.WithResource(apiResource.Name)) + data, err := resourceClient.List(listOptions) + if err != nil { + return nil, err + } + + for i, item := range data.Items { + apiVersion := item.GetAPIVersion() + kind := item.GetKind() + + itemDesc := fmt.Sprintf("%s:%s", apiVersion, kind) + // https://github.com/kontena/k8s-client/blob/6e9a7ba1f03c255bd6f06e8724a1c7286b22e60f/lib/k8s/stack.rb#L17-L22 + if itemDesc == "v1:ComponentStatus" || itemDesc == "v1:Endpoints" { + continue + } + // TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?) + + res := &kuberesource{obj: &data.Items[i], namespaced: apiResource.Namespaced} + result[res.ResourceID().String()] = res + } + } + } + + return result, nil +} + +// exportResourcesInStack collates all the resources that belong to a +// stack, i.e., were applied by flux. +func (c *Cluster) getResourcesInSyncSet(name string) (map[string]*kuberesource, error) { + return c.getResourcesBySelector(fmt.Sprintf("%s=%s", syncSetLabel, name)) // means "has label <>" +} + +func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) { + definition := map[interface{}]interface{}{} + if err := yaml.Unmarshal(res.Bytes(), &definition); err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to parse yaml from %s", res.Source())) + } + + mixin := map[string]interface{}{} + + if set != "" { + mixinLabels := map[string]string{} + mixinLabels[syncSetLabel] = set + mixin["labels"] = mixinLabels + } + + if checksum != "" { + mixinAnnotations := map[string]string{} + mixinAnnotations[checksumAnnotation] = checksum + mixin["annotations"] = mixinAnnotations + } + + mergo.Merge(&definition, map[interface{}]interface{}{ + "metadata": mixin, + }) + + bytes, err := yaml.Marshal(definition) + if err != nil { + return nil, errors.Wrap(err, "failed to serialize yaml after applying metadata") + } + return bytes, nil +} + +// --- internal types for keeping track of syncing + +type applyObject struct { + ResourceID flux.ResourceID + Source string + Payload []byte +} + type changeSet struct { - objs map[string][]*apiObject + objs map[string][]applyObject } func makeChangeSet() changeSet { - return changeSet{objs: make(map[string][]*apiObject)} + return changeSet{objs: make(map[string][]applyObject)} } -func (c *changeSet) stage(cmd string, o *apiObject) { - c.objs[cmd] = append(c.objs[cmd], o) +func (c *changeSet) stage(cmd string, id flux.ResourceID, source string, bytes []byte) { + c.objs[cmd] = append(c.objs[cmd], applyObject{id, source, bytes}) } // Applier is something that will apply a changeset to the cluster. @@ -76,18 +342,18 @@ func (c *Kubectl) connectArgs() []string { // in the partial ordering of Kubernetes resources, according to which // kinds depend on which (derived by hand). func rankOfKind(kind string) int { - switch kind { + switch strings.ToLower(kind) { // Namespaces answer to NOONE - case "Namespace": + case "namespace": return 0 // These don't go in namespaces; or do, but don't depend on anything else - case "CustomResourceDefinition", "ServiceAccount", "ClusterRole", "Role", "PersistentVolume", "Service": + case "customresourcedefinition", "serviceaccount", "clusterrole", "role", "persistentvolume", "service": return 1 // These depend on something above, but not each other - case "ResourceQuota", "LimitRange", "Secret", "ConfigMap", "RoleBinding", "ClusterRoleBinding", "PersistentVolumeClaim", "Ingress": + case "resourcequota", "limitrange", "secret", "configmap", "rolebinding", "clusterrolebinding", "persistentvolumeclaim", "ingress": return 2 // Same deal, next layer - case "DaemonSet", "Deployment", "ReplicationController", "ReplicaSet", "Job", "CronJob", "StatefulSet": + case "daemonset", "deployment", "replicationcontroller", "replicaset", "job", "cronjob", "statefulset": return 3 // Assumption: anything not mentioned isn't depended _upon_, so // can come last. @@ -96,7 +362,7 @@ func rankOfKind(kind string) int { } } -type applyOrder []*apiObject +type applyOrder []applyObject func (objs applyOrder) Len() int { return len(objs) @@ -107,27 +373,29 @@ func (objs applyOrder) Swap(i, j int) { } func (objs applyOrder) Less(i, j int) bool { - ranki, rankj := rankOfKind(objs[i].Kind), rankOfKind(objs[j].Kind) + _, ki, ni := objs[i].ResourceID.Components() + _, kj, nj := objs[j].ResourceID.Components() + ranki, rankj := rankOfKind(ki), rankOfKind(kj) if ranki == rankj { - return objs[i].Metadata.Name < objs[j].Metadata.Name + return ni < nj } return ranki < rankj } func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.ResourceID]error) (errs cluster.SyncError) { - f := func(objs []*apiObject, cmd string, args ...string) { + f := func(objs []applyObject, cmd string, args ...string) { if len(objs) == 0 { return } logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs)) args = append(args, cmd) - var multi, single []*apiObject + var multi, single []applyObject if len(errored) == 0 { multi = objs } else { for _, obj := range objs { - if _, ok := errored[obj.ResourceID()]; ok { + if _, ok := errored[obj.ResourceID]; ok { // Resources that errored before shall be applied separately single = append(single, obj) } else { @@ -143,16 +411,20 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.Resour } } for _, obj := range single { - r := bytes.NewReader(obj.Bytes()) + r := bytes.NewReader(obj.Payload) if err := c.doCommand(logger, r, args...); err != nil { - errs = append(errs, cluster.ResourceError{obj.Resource, err}) + errs = append(errs, cluster.ResourceError{ + ResourceID: obj.ResourceID, + Source: obj.Source, + Error: err, + }) } } } // When deleting objects, the only real concern is that we don't // try to delete things that have already been deleted by - // Kubernete's GC -- most notably, resources in a namespace which + // Kubernetes' GC -- most notably, resources in a namespace which // is also being deleted. GC does not have the dependency ranking, // but we can use it as a shortcut to avoid the above problem at // least. @@ -185,11 +457,11 @@ func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) erro return err } -func makeMultidoc(objs []*apiObject) *bytes.Buffer { +func makeMultidoc(objs []applyObject) *bytes.Buffer { buf := &bytes.Buffer{} for _, obj := range objs { buf.WriteString("\n---\n") - buf.Write(obj.Bytes()) + buf.Write(obj.Payload) } return buf } diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index b3c09cc44..d1de2d7a2 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -1,54 +1,211 @@ package kubernetes import ( + "fmt" "sort" + "strings" "testing" + "github.com/ghodss/yaml" "github.com/go-kit/kit/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + // "k8s.io/apimachinery/pkg/runtime/serializer" + // "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + // dynamicfake "k8s.io/client-go/dynamic/fake" + // k8sclient "k8s.io/client-go/kubernetes" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/discovery" + corefake "k8s.io/client-go/kubernetes/fake" + k8s_testing "k8s.io/client-go/testing" "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" - "github.com/weaveworks/flux/policy" + kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" + fluxfake "github.com/weaveworks/flux/integrations/client/clientset/versioned/fake" + "github.com/weaveworks/flux/sync" ) -type mockApplier struct { - commandRun bool -} +const ( + defaultTestNamespace = "unusual-default" +) + +func fakeClients() ExtendedClient { + scheme := runtime.NewScheme() -func (m *mockApplier) apply(_ log.Logger, c changeSet, errored map[flux.ResourceID]error) cluster.SyncError { - if len(c.objs) != 0 { - m.commandRun = true + // Set this to `true` to output a trace of the API actions called + // while running the tests + const debug = false + + getAndList := metav1.Verbs([]string{"get", "list"}) + // Adding these means the fake dynamic client will find them, and + // be able to enumerate (list and get) the resources that we care + // about + apiResources := []*metav1.APIResourceList{ + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + {Name: "deployments", SingularName: "deployment", Namespaced: true, Kind: "Deployment", Verbs: getAndList}, + }, + }, + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + {Name: "namespaces", SingularName: "namespace", Namespaced: false, Kind: "Namespace", Verbs: getAndList}, + }, + }, } - return nil -} -type rsc struct { - id string - bytes []byte + coreClient := corefake.NewSimpleClientset(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: defaultTestNamespace}}) + fluxClient := fluxfake.NewSimpleClientset() + dynamicClient := NewSimpleDynamicClient(scheme) // NB from this package, rather than the official one, since we needed a patched version + + // Assigned here, since this is _also_ used by the (fake) + // discovery client therein, and ultimately by + // getResourcesInStack since that uses the core clientset to + // enumerate the namespaces. + coreClient.Fake.Resources = apiResources + + if debug { + for _, fake := range []*k8s_testing.Fake{&coreClient.Fake, &fluxClient.Fake, &dynamicClient.Fake} { + fake.PrependReactor("*", "*", func(action k8s_testing.Action) (bool, runtime.Object, error) { + gvr := action.GetResource() + fmt.Printf("[DEBUG] action: %s ns:%s %s/%s %s\n", action.GetVerb(), action.GetNamespace(), gvr.Group, gvr.Version, gvr.Resource) + return false, nil, nil + }) + } + } + + return ExtendedClient{ + coreClient: coreClient, + fluxHelmClient: fluxClient, + dynamicClient: dynamicClient, + discoveryClient: coreClient.Discovery(), + } } -func (r rsc) ResourceID() flux.ResourceID { - return flux.MustParseResourceID(r.id) +// fakeApplier is an Applier that just forwards changeset operations +// to a dynamic client. It doesn't try to properly patch resources +// when that might be expected; it just overwrites them. But this is +// enough for checking whether sync operations succeeded and had the +// correct effect, which is either to "upsert", or delete, resources. +type fakeApplier struct { + client dynamic.Interface + discovery discovery.DiscoveryInterface + defaultNS string + commandRun bool } -func (r rsc) Bytes() []byte { - return r.bytes +func groupVersionResource(res *unstructured.Unstructured) schema.GroupVersionResource { + gvk := res.GetObjectKind().GroupVersionKind() + return schema.GroupVersionResource{Group: gvk.Group, Version: gvk.Version, Resource: strings.ToLower(gvk.Kind) + "s"} } -func (r rsc) Policy() policy.Set { - return nil +func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[flux.ResourceID]error) cluster.SyncError { + var errs []cluster.ResourceError + + operate := func(obj applyObject, cmd string) { + a.commandRun = true + var unstruct map[string]interface{} + if err := yaml.Unmarshal(obj.Payload, &unstruct); err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + res := &unstructured.Unstructured{Object: unstruct} + + // This is a special case trapdoor, for testing failure to + // apply a resource. + if errStr := res.GetAnnotations()["error"]; errStr != "" { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, fmt.Errorf(errStr)}) + return + } + + gvr := groupVersionResource(res) + c := a.client.Resource(gvr) + // This is an approximation to what `kubectl` does in filling + // in the fallback namespace (from config). In the case of + // non-namespaced entities, it will be ignored by the fake + // client (FIXME: make sure of this). + apiRes := findAPIResource(gvr, a.discovery) + if apiRes == nil { + panic("no APIResource found for " + gvr.String()) + } + + var dc dynamic.ResourceInterface = c + ns := res.GetNamespace() + if apiRes.Namespaced { + if ns == "" { + ns = a.defaultNS + res.SetNamespace(ns) + } + dc = c.Namespace(ns) + } + name := res.GetName() + + if cmd == "apply" { + _, err := dc.Get(name, metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): + _, err = dc.Create(res) //, &metav1.CreateOptions{}) + case err == nil: + _, err = dc.Update(res) //, &metav1.UpdateOptions{}) + } + if err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + } else if cmd == "delete" { + if err := dc.Delete(name, &metav1.DeleteOptions{}); err != nil { + errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) + return + } + } else { + panic("unknown action: " + cmd) + } + } + + for _, obj := range cs.objs["delete"] { + operate(obj, "delete") + } + for _, obj := range cs.objs["apply"] { + operate(obj, "apply") + } + if len(errs) == 0 { + return nil + } + return errs } -func (r rsc) Source() string { - return "test" +func findAPIResource(gvr schema.GroupVersionResource, disco discovery.DiscoveryInterface) *metav1.APIResource { + groupVersion := gvr.Version + if gvr.Group != "" { + groupVersion = gvr.Group + "/" + groupVersion + } + reses, err := disco.ServerResourcesForGroupVersion(groupVersion) + if err != nil { + return nil + } + for _, res := range reses.APIResources { + if res.Name == gvr.Resource { + return &res + } + } + return nil } // --- -func setup(t *testing.T) (*Cluster, *mockApplier) { - applier := &mockApplier{} +func setup(t *testing.T) (*Cluster, *fakeApplier) { + clients := fakeClients() + applier := &fakeApplier{client: clients.dynamicClient, discovery: clients.coreClient.Discovery(), defaultNS: defaultTestNamespace} kube := &Cluster{ applier: applier, + client: clients, logger: log.NewNopLogger(), } return kube, applier @@ -56,7 +213,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) { func TestSyncNop(t *testing.T) { kube, mock := setup(t) - if err := kube.Sync(cluster.SyncDef{}); err != nil { + if err := kube.Sync(cluster.SyncSet{}); err != nil { t.Errorf("%#v", err) } if mock.commandRun { @@ -64,49 +221,340 @@ func TestSyncNop(t *testing.T) { } } -func TestSyncMalformed(t *testing.T) { - kube, mock := setup(t) - err := kube.Sync(cluster.SyncDef{ - Actions: []cluster.SyncAction{ - cluster.SyncAction{ - Apply: rsc{"default:deployment/trash", []byte("garbage")}, - }, - }, - }) - if err == nil { - t.Error("expected error because malformed resource def, but got nil") +func TestSync(t *testing.T) { + const defs1 = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dep1 + namespace: foobar +` + + const defs2 = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dep2 + namespace: foobar +` + + const defs3 = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dep3 + namespace: other +` + + // checkSame is a check that a result returned from the cluster is + // the same as an expected. labels and annotations may be altered + // by the sync process; we'll look at the "spec" field as an + // indication of whether the resources are equivalent or not. + checkSame := func(t *testing.T, expected []byte, actual *unstructured.Unstructured) { + var expectedSpec struct{ Spec map[string]interface{} } + if err := yaml.Unmarshal(expected, &expectedSpec); err != nil { + t.Error(err) + return + } + if expectedSpec.Spec != nil { + assert.Equal(t, expectedSpec.Spec, actual.Object["spec"]) + } } - if mock.commandRun { - t.Error("expected no commands run") + + test := func(t *testing.T, kube *Cluster, defs, expectedAfterSync string, expectErrors bool) { + saved := getDefaultNamespace + getDefaultNamespace = func() (string, error) { return defaultTestNamespace, nil } + defer func() { getDefaultNamespace = saved }() + namespacer, err := NewNamespacer(kube.client.coreClient.Discovery()) + if err != nil { + t.Fatal(err) + } + + resources0, err := kresource.ParseMultidoc([]byte(defs), "before") + if err != nil { + t.Fatal(err) + } + + // Needed to get from KubeManifest to resource.Resource + resources, err := postProcess(resources0, namespacer) + if err != nil { + t.Fatal(err) + } + + err = sync.Sync("testset", resources, kube) + if !expectErrors && err != nil { + t.Error(err) + } + expected, err := kresource.ParseMultidoc([]byte(expectedAfterSync), "after") + if err != nil { + panic(err) + } + + // Now check that the resources were created + actual, err := kube.getResourcesInSyncSet("testset") + if err != nil { + t.Fatal(err) + } + + for id := range actual { + if _, ok := expected[id]; !ok { + t.Errorf("resource present after sync but not in resources applied: %q (present: %v)", id, actual) + if j, err := yaml.Marshal(actual[id].obj); err == nil { + println(string(j)) + } + continue + } + checkSame(t, expected[id].Bytes(), actual[id].obj) + } + for id := range expected { + if _, ok := actual[id]; !ok { + t.Errorf("resource supposed to be synced but not present: %q (present: %v)", id, actual) + } + // no need to compare values, since we already considered + // the intersection of actual and expected above. + } } + + t.Run("sync adds and GCs resources", func(t *testing.T) { + kube, _ := setup(t) + + // without GC on, resources persist if they are not mentioned in subsequent syncs. + test(t, kube, "", "", false) + test(t, kube, defs1, defs1, false) + test(t, kube, defs1+defs2, defs1+defs2, false) + test(t, kube, defs3, defs1+defs2+defs3, false) + + // Now with GC switched on. That means if we don't include a + // resource in a sync, it should be deleted. + kube.GC = true + test(t, kube, defs2+defs3, defs3+defs2, false) + test(t, kube, defs1+defs2, defs1+defs2, false) + test(t, kube, "", "", false) + }) + + t.Run("sync won't delete non-namespaced resources", func(t *testing.T) { + kube, _ := setup(t) + kube.GC = true + + const nsDef = ` +apiVersion: v1 +kind: Namespace +metadata: + name: bar-ns +` + test(t, kube, nsDef, nsDef, false) + }) + + t.Run("sync won't delete resources that got the fallback namespace when created", func(t *testing.T) { + // NB: this tests the fake client implementation to some + // extent as well. It relies on it to reflect the kubectl + // behaviour of giving things that need a namespace some + // fallback (this would come from kubeconfig usually); and, + // for things that _don't_ have a namespace to have it + // stripped out. + kube, _ := setup(t) + kube.GC = true + const withoutNS = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: depFallbackNS +` + const withNS = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: depFallbackNS + namespace: ` + defaultTestNamespace + ` +` + test(t, kube, withoutNS, withNS, false) + }) + + t.Run("sync won't delete if apply failed", func(t *testing.T) { + kube, _ := setup(t) + kube.GC = true + + const defs1invalid = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 + annotations: + error: fail to apply this +` + test(t, kube, defs1, defs1, false) + test(t, kube, defs1invalid, defs1, true) + }) + + t.Run("sync doesn't apply or delete manifests marked with ignore", func(t *testing.T) { + kube, _ := setup(t) + kube.GC = true + + const dep1 = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 +spec: + metadata: + labels: {app: foo} +` + + const dep2 = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep2 + annotations: {flux.weave.works/ignore: "true"} +` + + // dep1 is created, but dep2 is ignored + test(t, kube, dep1+dep2, dep1, false) + + const dep1ignored = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 + annotations: + flux.weave.works/ignore: "true" +spec: + metadata: + labels: {app: bar} +` + // dep1 is not updated, but neither is it deleted + test(t, kube, dep1ignored+dep2, dep1, false) + }) + + t.Run("sync doesn't update a cluster resource marked with ignore", func(t *testing.T) { + const dep1 = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 +spec: + metadata: + labels: + app: original +` + kube, _ := setup(t) + // This just checks the starting assumption: dep1 exists in the cluster + test(t, kube, dep1, dep1, false) + + // Now we'll mark it as ignored _in the cluster_ (i.e., the + // equivalent of `kubectl annotate`) + dc := kube.client.dynamicClient + rc := dc.Resource(schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }) + res, err := rc.Namespace("foobar").Get("dep1", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + annots := res.GetAnnotations() + annots["flux.weave.works/ignore"] = "true" + res.SetAnnotations(annots) + if _, err = rc.Namespace("foobar").Update(res); err != nil { + t.Fatal(err) + } + + const mod1 = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 +spec: + metadata: + labels: + app: modified +` + // Check that dep1, which is marked ignore in the cluster, is + // neither updated or deleted + test(t, kube, mod1, dep1, false) + }) + + t.Run("sync doesn't update or delete a pre-existing resource marked with ignore", func(t *testing.T) { + kube, _ := setup(t) + + const existing = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 + annotations: {flux.weave.works/ignore: "true"} +spec: + metadata: + labels: {foo: original} +` + var dep1obj map[string]interface{} + err := yaml.Unmarshal([]byte(existing), &dep1obj) + assert.NoError(t, err) + dep1res := &unstructured.Unstructured{Object: dep1obj} + gvr := groupVersionResource(dep1res) + // Put the pre-existing resource in the cluster + dc := kube.client.dynamicClient.Resource(gvr).Namespace(dep1res.GetNamespace()) + _, err = dc.Create(dep1res) + assert.NoError(t, err) + + // Check that our resource-getting also sees the pre-existing resource + resources, err := kube.getResourcesBySelector("") + assert.NoError(t, err) + assert.Contains(t, resources, "foobar:deployment/dep1") + + // NB test checks the _synced_ resources, so this just asserts + // the precondition, that nothing is synced + test(t, kube, "", "", false) + + // .. but, our resource is still there. + r, err := dc.Get(dep1res.GetName(), metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, r) + + const update = `--- +apiVersion: apps/v1 +kind: Deployment +metadata: + namespace: foobar + name: dep1 +spec: + metadata: + labels: {foo: modified} +` + + // Check that it's not been synced (i.e., still not included in synced resources) + test(t, kube, update, "", false) + + // Check that it still exists, as created + r, err = dc.Get(dep1res.GetName(), metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, r) + checkSame(t, []byte(existing), r) + }) } +// ---- + // TestApplyOrder checks that applyOrder works as expected. func TestApplyOrder(t *testing.T) { - objs := []*apiObject{ - { - Kind: "Deployment", - Metadata: metadata{ - Name: "deploy", - }, - }, - { - Kind: "Secret", - Metadata: metadata{ - Name: "secret", - }, - }, - { - Kind: "Namespace", - Metadata: metadata{ - Name: "namespace", - }, - }, + objs := []applyObject{ + {ResourceID: flux.MakeResourceID("test", "Deployment", "deploy")}, + {ResourceID: flux.MakeResourceID("test", "Secret", "secret")}, + {ResourceID: flux.MakeResourceID("", "Namespace", "namespace")}, } sort.Sort(applyOrder(objs)) for i, name := range []string{"namespace", "secret", "deploy"} { - if objs[i].Metadata.Name != name { - t.Errorf("Expected %q at position %d, got %q", name, i, objs[i].Metadata.Name) + _, _, objName := objs[i].ResourceID.Components() + if objName != name { + t.Errorf("Expected %q at position %d, got %q", name, i, objName) } } } diff --git a/cluster/manifests.go b/cluster/manifests.go index 8314dfe70..fe74a4863 100644 --- a/cluster/manifests.go +++ b/cluster/manifests.go @@ -31,8 +31,6 @@ type Manifests interface { // supplied as absolute paths to directories or files; at least // one path should be supplied, even if it is the same as `baseDir`. LoadManifests(baseDir string, paths []string) (map[string]resource.Resource, error) - // Parse the manifests given in an exported blob - ParseManifests([]byte) (map[string]resource.Resource, error) // UpdatePolicies modifies a manifest to apply the policy update specified UpdatePolicies([]byte, flux.ResourceID, policy.Update) ([]byte, error) } diff --git a/cluster/mock.go b/cluster/mock.go index 4943bce44..2035146ff 100644 --- a/cluster/mock.go +++ b/cluster/mock.go @@ -14,12 +14,10 @@ type Mock struct { SomeServicesFunc func([]flux.ResourceID) ([]Controller, error) PingFunc func() error ExportFunc func() ([]byte, error) - SyncFunc func(SyncDef) error + SyncFunc func(SyncSet) error PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error) - ParseManifestsFunc func([]byte) (map[string]resource.Resource, error) - UpdateManifestFunc func(path, resourceID string, f func(def []byte) ([]byte, error)) error UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error) } @@ -39,7 +37,7 @@ func (m *Mock) Export() ([]byte, error) { return m.ExportFunc() } -func (m *Mock) Sync(c SyncDef) error { +func (m *Mock) Sync(c SyncSet) error { return m.SyncFunc(c) } @@ -55,14 +53,6 @@ func (m *Mock) LoadManifests(base string, paths []string) (map[string]resource.R return m.LoadManifestsFunc(base, paths) } -func (m *Mock) ParseManifests(def []byte) (map[string]resource.Resource, error) { - return m.ParseManifestsFunc(def) -} - -func (m *Mock) UpdateManifest(path string, resourceID string, f func(def []byte) ([]byte, error)) error { - return m.UpdateManifestFunc(path, resourceID, f) -} - func (m *Mock) UpdatePolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) { return m.UpdatePoliciesFunc(def, id, p) } diff --git a/cluster/sync.go b/cluster/sync.go index 9859ee00f..ebe3aab9e 100644 --- a/cluster/sync.go +++ b/cluster/sync.go @@ -3,26 +3,28 @@ package cluster import ( "strings" + "github.com/weaveworks/flux" "github.com/weaveworks/flux/resource" ) // Definitions for use in synchronising a cluster with a git repo. -// SyncAction represents either the deletion or application (create or -// update) of a resource. -type SyncAction struct { - Delete resource.Resource // ) one of these - Apply resource.Resource // ) -} - -type SyncDef struct { - // The actions to undertake - Actions []SyncAction +// SyncSet groups the set of resources to be updated. Usually this is +// the set of resources found in a git repo; in any case, it must +// represent the complete set of resources, as garbage collection will +// assume missing resources should be deleted. The name is used to +// distinguish the resources from a set from other resources -- e.g., +// cluster resources not marked as belonging to a set will not be +// deleted by garbage collection. +type SyncSet struct { + Name string + Resources []resource.Resource } type ResourceError struct { - resource.Resource - Error error + ResourceID flux.ResourceID + Source string + Error error } type SyncError []ResourceError @@ -30,7 +32,7 @@ type SyncError []ResourceError func (err SyncError) Error() string { var errs []string for _, e := range err { - errs = append(errs, e.ResourceID().String()+": "+e.Error.Error()) + errs = append(errs, e.ResourceID.String()+": "+e.Error.Error()) } return strings.Join(errs, "; ") } diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 3a3e75b78..d5b5346b7 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -17,7 +17,9 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" - k8sifclient "github.com/weaveworks/flux/integrations/client/clientset/versioned" + integrations "github.com/weaveworks/flux/integrations/client/clientset/versioned" + crd "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + k8sclientdynamic "k8s.io/client-go/dynamic" k8sclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -94,8 +96,10 @@ func main() { gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits") gitTimeout = fs.Duration("git-timeout", 20*time.Second, "duration after which git operations time out") + // syncing syncInterval = fs.Duration("sync-interval", 5*time.Minute, "apply config in git to cluster at least this often, even if there are no new commits") + syncGC = fs.Bool("sync-garbage-collection", false, "experimental; delete resources that were created by fluxd, but are no longer in the git repo") // registry memcachedHostname = fs.String("memcached-hostname", "memcached", "hostname for memcached service.") @@ -186,11 +190,36 @@ func main() { *sshKeygenDir = *k8sSecretVolumeMountPath } + // Mechanical components. + + // When we can receive from this channel, it indicates that we + // are ready to shut down. + errc := make(chan error) + // This signals other routines to shut down; + shutdown := make(chan struct{}) + // .. and this is to wait for other routines to shut down cleanly. + shutdownWg := &sync.WaitGroup{} + + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + errc <- fmt.Errorf("%s", <-c) + }() + + // This means we can return, and it will use the shutdown + // protocol. + defer func() { + // wait here until stopping. + logger.Log("exiting", <-errc) + close(shutdown) + shutdownWg.Wait() + }() + // Cluster component. var clusterVersion string var sshKeyRing ssh.KeyRing var k8s cluster.Cluster - var k8sManifests cluster.Manifests + var k8sManifests *kubernetes.Manifests var imageCreds func() registry.ImageCreds { restClientConfig, err := rest.InClusterConfig() @@ -207,13 +236,25 @@ func main() { logger.Log("err", err) os.Exit(1) } + dynamicClientset, err := k8sclientdynamic.NewForConfig(restClientConfig) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } - ifclientset, err := k8sifclient.NewForConfig(restClientConfig) + integrationsClientset, err := integrations.NewForConfig(restClientConfig) if err != nil { logger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err)) os.Exit(1) } + crdClient, err := crd.NewForConfig(restClientConfig) + if err != nil { + logger.Log("error", fmt.Sprintf("Error building API extensions (CRD) clientset: %v", err)) + os.Exit(1) + } + discoClientset := kubernetes.MakeCachedDiscovery(clientset.Discovery(), crdClient, shutdown) + serverVersion, err := clientset.ServerVersion() if err != nil { logger.Log("err", err) @@ -261,7 +302,9 @@ func main() { logger.Log("kubectl", kubectl) kubectlApplier := kubernetes.NewKubectl(kubectl, restClientConfig) - k8sInst := kubernetes.NewCluster(clientset, ifclientset, kubectlApplier, sshKeyRing, logger, *k8sNamespaceWhitelist, *registryExcludeImage) + client := kubernetes.MakeClusterClientset(clientset, dynamicClientset, integrationsClientset, discoClientset) + k8sInst := kubernetes.NewCluster(client, kubectlApplier, sshKeyRing, logger, *k8sNamespaceWhitelist, *registryExcludeImage) + k8sInst.GC = *syncGC if err := k8sInst.Ping(); err != nil { logger.Log("ping", err) @@ -274,6 +317,12 @@ func main() { // There is only one way we currently interpret a repo of // files as manifests, and that's as Kubernetes yamels. k8sManifests = &kubernetes.Manifests{} + k8sManifests.Namespacer, err = kubernetes.NewNamespacer(discoClientset) + + if err != nil { + logger.Log("err", err) + os.Exit(1) + } } // Wrap the procedure for collecting images to scan @@ -354,31 +403,6 @@ func main() { } } - // Mechanical components. - - // When we can receive from this channel, it indicates that we - // are ready to shut down. - errc := make(chan error) - // This signals other routines to shut down; - shutdown := make(chan struct{}) - // .. and this is to wait for other routines to shut down cleanly. - shutdownWg := &sync.WaitGroup{} - - go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - errc <- fmt.Errorf("%s", <-c) - }() - - // This means we can return, and it will use the shutdown - // protocol. - defer func() { - // wait here until stopping. - logger.Log("exiting", <-errc) - close(shutdown) - shutdownWg.Wait() - }() - // Checkpoint: we want to include the fact of whether the daemon // was given a Git repo it could clone; but the expected scenario // is that it will have been set up already, and we don't want to diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 6cdaca4b6..26b2f4e5d 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -21,7 +21,6 @@ import ( "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/cluster/kubernetes" kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" - "github.com/weaveworks/flux/cluster/kubernetes/testfiles" "github.com/weaveworks/flux/event" "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" @@ -357,9 +356,9 @@ func TestDaemon_NotifyChange(t *testing.T) { ctx := context.Background() var syncCalled int - var syncDef *cluster.SyncDef + var syncDef *cluster.SyncSet var syncMu sync.Mutex - mockK8s.SyncFunc = func(def cluster.SyncDef) error { + mockK8s.SyncFunc = func(def cluster.SyncSet) error { syncMu.Lock() syncCalled++ syncDef = &def @@ -384,8 +383,6 @@ func TestDaemon_NotifyChange(t *testing.T) { t.Errorf("Sync was not called once, was called %d times", syncCalled) } else if syncDef == nil { t.Errorf("Sync was called with a nil syncDef") - } else if len(syncDef.Actions) != len(testfiles.ResourceMap) { - t.Errorf("Expected Sync called with %d actions (resources), was called with %d", len(testfiles.ResourceMap), len(syncDef.Actions)) } // Check that history was written to @@ -598,6 +595,16 @@ func mustParseImageRef(ref string) image.Ref { return r } +type anonNamespacer func(kresource.KubeManifest) string + +func (fn anonNamespacer) EffectiveNamespace(m kresource.KubeManifest) (string, error) { + return fn(m), nil +} + +var alwaysDefault anonNamespacer = func(kresource.KubeManifest) string { + return "default" +} + func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEventWriter, func(func())) { logger := log.NewNopLogger() @@ -650,19 +657,13 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven return []cluster.Controller{}, nil } k8s.ExportFunc = func() ([]byte, error) { return testBytes, nil } - k8s.LoadManifestsFunc = kresource.Load - k8s.ParseManifestsFunc = func(allDefs []byte) (map[string]resource.Resource, error) { - return kresource.ParseMultidoc(allDefs, "test") - } k8s.PingFunc = func() error { return nil } k8s.SomeServicesFunc = func([]flux.ResourceID) ([]cluster.Controller, error) { return []cluster.Controller{ singleService, }, nil } - k8s.SyncFunc = func(def cluster.SyncDef) error { return nil } - k8s.UpdatePoliciesFunc = (&kubernetes.Manifests{}).UpdatePolicies - k8s.UpdateImageFunc = (&kubernetes.Manifests{}).UpdateImage + k8s.SyncFunc = func(def cluster.SyncSet) error { return nil } } var imageRegistry registry.Registry @@ -697,7 +698,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven Repo: repo, GitConfig: params, Cluster: k8s, - Manifests: &kubernetes.Manifests{}, + Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault}, Registry: imageRegistry, V: testVersion, Jobs: jobs, @@ -826,10 +827,7 @@ func (w *wait) ForImageTag(t *testing.T, d *Daemon, service, container, tag stri defer co.Clean() dirs := co.ManifestDirs() - m, err := d.Manifests.LoadManifests(co.Dir(), dirs) - assert.NoError(t, err) - - resources, err := d.Manifests.ParseManifests(m[service].Bytes()) + resources, err := d.Manifests.LoadManifests(co.Dir(), dirs) assert.NoError(t, err) workload, ok := resources[service].(resource.Workload) diff --git a/daemon/loop.go b/daemon/loop.go index dd0bc8823..2ab75e2b9 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -2,6 +2,8 @@ package daemon import ( "context" + "crypto/sha256" + "encoding/base64" "fmt" "strings" "sync" @@ -161,6 +163,9 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb fluxmetrics.LabelSuccess, fmt.Sprint(retErr == nil), ).Observe(time.Since(started).Seconds()) }() + + syncSetName := makeSyncLabel(d.Repo.Origin(), d.GitConfig) + // We don't care how long this takes overall, only about not // getting bogged down in certain operations, so use an // undeadlined context in general. @@ -205,15 +210,14 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb } var resourceErrors []event.ResourceError - // TODO supply deletes argument from somewhere (command-line?) - if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false); err != nil { + if err := fluxsync.Sync(syncSetName, allResources, d.Cluster); err != nil { logger.Log("err", err) switch syncerr := err.(type) { case cluster.SyncError: for _, e := range syncerr { resourceErrors = append(resourceErrors, event.ResourceError{ - ID: e.ResourceID(), - Path: e.Source(), + ID: e.ResourceID, + Path: e.Source, Error: e.Error.Error(), }) } @@ -450,3 +454,16 @@ func isUnknownRevision(err error) bool { (strings.Contains(err.Error(), "unknown revision or path not in the working tree.") || strings.Contains(err.Error(), "bad revision")) } + +func makeSyncLabel(remote git.Remote, conf git.Config) string { + urlbit := remote.SafeURL() + pathshash := sha256.New() + pathshash.Write([]byte(urlbit)) + pathshash.Write([]byte(conf.Branch)) + for _, path := range conf.Paths { + pathshash.Write([]byte(path)) + } + // the prefix is in part to make sure it's a valid (Kubernetes) + // label value -- a modest abstraction leak + return "git-" + base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil)) +} diff --git a/daemon/loop_test.go b/daemon/loop_test.go index 547928a43..2abff845f 100644 --- a/daemon/loop_test.go +++ b/daemon/loop_test.go @@ -15,14 +15,13 @@ import ( "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" - kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" + "github.com/weaveworks/flux/cluster/kubernetes" "github.com/weaveworks/flux/cluster/kubernetes/testfiles" "github.com/weaveworks/flux/event" "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" "github.com/weaveworks/flux/job" registryMock "github.com/weaveworks/flux/registry/mock" - "github.com/weaveworks/flux/resource" ) const ( @@ -42,10 +41,6 @@ func daemon(t *testing.T) (*Daemon, func()) { repo, repoCleanup := gittest.Repo(t) k8s = &cluster.Mock{} - k8s.LoadManifestsFunc = kresource.Load - k8s.ParseManifestsFunc = func(allDefs []byte) (map[string]resource.Resource, error) { - return kresource.ParseMultidoc(allDefs, "exported") - } k8s.ExportFunc = func() ([]byte, error) { return nil, nil } events = &mockEventWriter{} @@ -68,7 +63,7 @@ func daemon(t *testing.T) (*Daemon, func()) { jobs := job.NewQueue(shutdown, wg) d := &Daemon{ Cluster: k8s, - Manifests: k8s, + Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault}, Registry: ®istryMock.Registry{}, Repo: repo, GitConfig: gitConfig, @@ -94,13 +89,13 @@ func TestPullAndSync_InitialSync(t *testing.T) { defer cleanup() syncCalled := 0 - var syncDef *cluster.SyncDef + var syncDef *cluster.SyncSet expectedResourceIDs := flux.ResourceIDs{} for id, _ := range testfiles.ResourceMap { expectedResourceIDs = append(expectedResourceIDs, id) } expectedResourceIDs.Sort() - k8s.SyncFunc = func(def cluster.SyncDef) error { + k8s.SyncFunc = func(def cluster.SyncSet) error { syncCalled++ syncDef = &def return nil @@ -117,8 +112,6 @@ func TestPullAndSync_InitialSync(t *testing.T) { t.Errorf("Sync was not called once, was called %d times", syncCalled) } else if syncDef == nil { t.Errorf("Sync was called with a nil syncDef") - } else if len(syncDef.Actions) != len(expectedResourceIDs) { - t.Errorf("Sync was not called with %d actions (resources), was called with %d", len(expectedResourceIDs), len(syncDef.Actions)) } // The emitted event has all service ids @@ -167,13 +160,13 @@ func TestDoSync_NoNewCommits(t *testing.T) { } syncCalled := 0 - var syncDef *cluster.SyncDef + var syncDef *cluster.SyncSet expectedResourceIDs := flux.ResourceIDs{} for id, _ := range testfiles.ResourceMap { expectedResourceIDs = append(expectedResourceIDs, id) } expectedResourceIDs.Sort() - k8s.SyncFunc = func(def cluster.SyncDef) error { + k8s.SyncFunc = func(def cluster.SyncSet) error { syncCalled++ syncDef = &def return nil @@ -192,8 +185,6 @@ func TestDoSync_NoNewCommits(t *testing.T) { t.Errorf("Sync was not called once, was called %d times", syncCalled) } else if syncDef == nil { t.Errorf("Sync was called with a nil syncDef") - } else if len(syncDef.Actions) != len(expectedResourceIDs) { - t.Errorf("Sync was not called with %d actions, was called with: %d", len(expectedResourceIDs), len(syncDef.Actions)) } // The emitted event has no service ids @@ -239,7 +230,7 @@ func TestDoSync_WithNewCommit(t *testing.T) { } // Push some new changes dirs := checkout.ManifestDirs() - err = cluster.UpdateManifest(k8s, checkout.Dir(), dirs, flux.MustParseResourceID("default:deployment/helloworld"), func(def []byte) ([]byte, error) { + err = cluster.UpdateManifest(d.Manifests, checkout.Dir(), dirs, flux.MustParseResourceID("default:deployment/helloworld"), func(def []byte) ([]byte, error) { // A simple modification so we have changes to push return []byte(strings.Replace(string(def), "replicas: 5", "replicas: 4", -1)), nil }) @@ -265,13 +256,13 @@ func TestDoSync_WithNewCommit(t *testing.T) { } syncCalled := 0 - var syncDef *cluster.SyncDef + var syncDef *cluster.SyncSet expectedResourceIDs := flux.ResourceIDs{} for id, _ := range testfiles.ResourceMap { expectedResourceIDs = append(expectedResourceIDs, id) } expectedResourceIDs.Sort() - k8s.SyncFunc = func(def cluster.SyncDef) error { + k8s.SyncFunc = func(def cluster.SyncSet) error { syncCalled++ syncDef = &def return nil @@ -288,8 +279,6 @@ func TestDoSync_WithNewCommit(t *testing.T) { t.Errorf("Sync was not called once, was called %d times", syncCalled) } else if syncDef == nil { t.Errorf("Sync was called with a nil syncDef") - } else if len(syncDef.Actions) != len(expectedResourceIDs) { - t.Errorf("Sync was not called with %d actions, was called with %d", len(expectedResourceIDs), len(syncDef.Actions)) } // The emitted event has no service ids diff --git a/git/repo.go b/git/repo.go index f81fe47ba..5f104521d 100644 --- a/git/repo.go +++ b/git/repo.go @@ -44,11 +44,6 @@ const ( RepoReady GitRepoStatus = "ready" // has been written to, so ready to sync ) -// Remote points at a git repo somewhere. -type Remote struct { - URL string // clone from here -} - type Repo struct { // As supplied to constructor origin Remote diff --git a/git/url.go b/git/url.go new file mode 100644 index 000000000..d59cafd4f --- /dev/null +++ b/git/url.go @@ -0,0 +1,24 @@ +package git + +import ( + "fmt" + "net/url" + + "github.com/whilp/git-urls" +) + +// Remote points at a git repo somewhere. +type Remote struct { + URL string // clone from here +} + +func (r Remote) SafeURL() string { + u, err := giturls.Parse(r.URL) + if err != nil { + return fmt.Sprintf("", r.URL) + } + if u.User != nil { + u.User = url.User(u.User.Username()) + } + return u.String() +} diff --git a/git/url_test.go b/git/url_test.go new file mode 100644 index 000000000..b119010f3 --- /dev/null +++ b/git/url_test.go @@ -0,0 +1,20 @@ +package git + +import ( + "strings" + "testing" +) + +func TestSafeURL(t *testing.T) { + const password = "abc123" + for _, url := range []string{ + "git@github.com:weaveworks/flux", + "https://user@example.com:5050/repo.git", + "https://user:" + password + "@example.com:5050/repo.git", + } { + u := Remote{url} + if strings.Contains(u.SafeURL(), password) { + t.Errorf("Safe URL for %s contains password %q", url, password) + } + } +} diff --git a/release/releaser_test.go b/release/releaser_test.go index faa9e7520..ac001f1a9 100644 --- a/release/releaser_test.go +++ b/release/releaser_test.go @@ -13,6 +13,7 @@ import ( "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/cluster/kubernetes" + kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" "github.com/weaveworks/flux/image" @@ -21,6 +22,12 @@ import ( "github.com/weaveworks/flux/update" ) +type constNamespacer string + +func (ns constNamespacer) EffectiveNamespace(kresource.KubeManifest) (string, error) { + return string(ns), nil +} + var ( // This must match the value in cluster/kubernetes/testfiles/data.go helloContainer = "greeter" @@ -135,7 +142,7 @@ var ( }, }, } - mockManifests = &kubernetes.Manifests{} + mockManifests = &kubernetes.Manifests{Namespacer: constNamespacer("default")} ) func mockCluster(running ...cluster.Controller) *cluster.Mock { @@ -1084,7 +1091,7 @@ func Test_BadRelease(t *testing.T) { ctx = &ReleaseContext{ cluster: cluster, - manifests: &badManifests{Manifests: kubernetes.Manifests{}}, + manifests: &badManifests{Manifests: kubernetes.Manifests{constNamespacer("default")}}, repo: checkout2, registry: mockRegistry, } diff --git a/site/daemon.md b/site/daemon.md index 948e84cae..49bbd490a 100644 --- a/site/daemon.md +++ b/site/daemon.md @@ -62,8 +62,9 @@ fluxd requires setup and offers customization though a multitude of flags. |--git-notes-ref | `flux` | ref to use for keeping commit annotations in git notes| |--git-poll-interval | `5m` | period at which to fetch any new commits from the git repo | |--git-timeout | `20s` | duration after which git operations time out | -|**syncing** | | control over how config is applied to the cluster | +|**syncing** | | control over how config is applied to the cluster | |--sync-interval | `5m` | apply the git config to the cluster at least this often. New commits may provoke more frequent syncs | +|--sync-garbage-collection | `false` | experimental: when set, fluxd will delete resources that it created, but are no longer present in git (see [garbage collection](./garbagecollection.md)) | |**registry cache** | | (none of these need overriding, usually) | |--memcached-hostname | `memcached` | hostname for memcached service to use for caching image metadata| |--memcached-timeout | `1s` | maximum time to wait before giving up on memcached requests| diff --git a/site/faq.md b/site/faq.md index 5fa390b2f..422235ab0 100644 --- a/site/faq.md +++ b/site/faq.md @@ -16,6 +16,7 @@ menu_order: 60 * [Is there any special directory layout I need in my git repo?](#is-there-any-special-directory-layout-i-need-in-my-git-repo) * [Why does Flux need a git ssh key with write access?](#why-does-flux-need-a-git-ssh-key-with-write-access) * [Does Flux automatically sync changes back to git?](#does-flux-automatically-sync-changes-back-to-git) + * [Will Flux delete resources when I remove them from git?](#will-flux-delete-resources-when-i-remove-them-from-git) * [How do I give Flux access to an image registry?](#how-do-i-give-flux-access-to-an-image-registry) * [How often does Flux check for new images?](#how-often-does-flux-check-for-new-images) * [How often does Flux check for new git commits (and can I make it sync faster)?](#how-often-does-flux-check-for-new-git-commits-and-can-i-make-it-sync-faster) @@ -145,6 +146,18 @@ For more information about Flux commands see [the fluxctl docs](./fluxctl.md). No. It applies changes to git only when a Flux command or API call makes them. +### Will Flux delete resources when I remove them from git? + +Flux has an experimental (for now) garbage collection feature, +enabled by passing the command-line flag `--sync-garbage-collection` +to fluxd. + +The garbage collection is conservative: it is designed to not delete +resources that were not created by fluxd. This means it will sometimes +_not_ delete resources that _were_ created by fluxd, when +reconfigured. Read more about garbage collection +[here](./garbagecollection.md). + ### How do I give Flux access to an image registry? Flux transparently looks at the image pull secrets that you attach to diff --git a/site/garbagecollection.md b/site/garbagecollection.md new file mode 100644 index 000000000..d7de28cda --- /dev/null +++ b/site/garbagecollection.md @@ -0,0 +1,45 @@ +## Garbage collection + +Part of syncing a cluster with a git repository is getting rid of +resources in the cluster that have been removed in the repository. You +can tell fluxd to do this "garbage collection" using the command-line +flag `--sync-garbage-collection`. It's important to know how it +operates, and appreciate its limitations, before enabling it. + +### How garbage collection works + +When garbage collection is enabled, syncing is done in two phases: + + 1. Apply all the manifests in the git repo (as delimited by the + branch and path arguments), and give each resource a label marking + it as having been synced from this source. + + 2. Ask the cluster for all the resources marked as being from this + source, and delete those that were not applied in step 1. + +In the above, "source" refers to the particular combination of git +repo URL, branch, and paths that this fluxd has been configured to +use, which is taken as identifying the resources under _this_ fluxd's +control. + +We need to be careful about identifying these accurately, since +getting it wrong could mean _not_ deleting resources that should be +deleted; or (much worse), deleting resources that are under another +fluxd's control. + +The definition of "source" affects how garbage collection behaves when +you reconfigure fluxd. It is intended to be conservative: it ensures +that fluxd will not delete resources that it did not create. + +### Limitations of this approach + +In general, if you change an element of the source (the git repo URL, +branch, and paths), there is a possiblility that resources no longer +present in the new source will be missed (i.e., not deleted) by +garbage collection, and you will need to delete them by hand. + +| Config change | What happens | +|-------------------|--------------| +| git URL or branch | If the manifests at the new git repo are the same, they will all be relabelled, and things will proceed as usual. If they are different, the resources from the old repo will be missed by garbage collection and will need to be deleted by hand | +| path added | Existing resources will be relabelled, and new resources (from manifests in the new path) will be created. Then things will proceed as usual. | +| path removed | The resources from manifests in the removed path will be missed by garbage collection, and will need to be deleted by hand. Other resources will be treated as usual. | diff --git a/sync/sync.go b/sync/sync.go index e0df168a3..b617c407f 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -1,78 +1,30 @@ package sync import ( - "github.com/go-kit/kit/log" - "github.com/pkg/errors" - "github.com/weaveworks/flux/cluster" - "github.com/weaveworks/flux/policy" "github.com/weaveworks/flux/resource" ) -// Sync synchronises the cluster to the files in a directory -func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, - deletes bool) error { - // Get a map of resources defined in the cluster - clusterBytes, err := clus.Export() - - if err != nil { - return errors.Wrap(err, "exporting resource defs from cluster") - } - clusterResources, err := m.ParseManifests(clusterBytes) - if err != nil { - return errors.Wrap(err, "parsing exported resources") - } - - // Everything that's in the cluster but not in the repo, delete; - // everything that's in the repo, apply. This is an approximation - // to figuring out what's changed, and applying that. We're - // relying on Kubernetes to decide for each application if it is a - // no-op. - sync := cluster.SyncDef{} - - // DANGER ZONE (tamara) This works and is dangerous. At the moment will delete Flux and - // other pods unless the relevant manifests are part of the user repo. Needs a lot of thought - // before this cleanup cluster feature can be unleashed on the world. - if deletes { - for id, res := range clusterResources { - prepareSyncDelete(logger, repoResources, id, res, &sync) - } - } - - for id, res := range repoResources { - prepareSyncApply(logger, clusterResources, id, res, &sync) - } - - return clus.Sync(sync) +// Syncer has the methods we need to be able to compile and run a sync +type Syncer interface { + Sync(cluster.SyncSet) error } -func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) { - if len(repoResources) == 0 { - return - } - if res.Policy().Has(policy.Ignore) { - logger.Log("resource", res.ResourceID(), "ignore", "delete") - return - } - if _, ok := repoResources[id]; !ok { - sync.Actions = append(sync.Actions, cluster.SyncAction{ - Delete: res, - }) +// Sync synchronises the cluster to the files under a directory. +func Sync(setName string, repoResources map[string]resource.Resource, clus Syncer) error { + set := makeSet(setName, repoResources) + if err := clus.Sync(set); err != nil { + return err } + return nil } -func prepareSyncApply(logger log.Logger, clusterResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) { - if res.Policy().Has(policy.Ignore) { - logger.Log("resource", res.ResourceID(), "ignore", "apply") - return - } - if cres, ok := clusterResources[id]; ok { - if cres.Policy().Has(policy.Ignore) { - logger.Log("resource", res.ResourceID(), "ignore", "apply") - return - } +func makeSet(name string, repoResources map[string]resource.Resource) cluster.SyncSet { + s := cluster.SyncSet{Name: name} + var resources []resource.Resource + for _, res := range repoResources { + resources = append(resources, res) } - sync.Actions = append(sync.Actions, cluster.SyncAction{ - Apply: res, - }) + s.Resources = resources + return s } diff --git a/sync/sync_test.go b/sync/sync_test.go index e9d62dcf9..011bbc3d5 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -1,38 +1,26 @@ package sync import ( - "bytes" - "fmt" - "os" - "os/exec" - "reflect" - "strings" "testing" - "github.com/go-kit/kit/log" - - // "github.com/weaveworks/flux" - "context" + "github.com/stretchr/testify/assert" "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/cluster/kubernetes" - "github.com/weaveworks/flux/cluster/kubernetes/testfiles" "github.com/weaveworks/flux/git" "github.com/weaveworks/flux/git/gittest" "github.com/weaveworks/flux/resource" ) +// Test that cluster.Sync gets called with the appropriate things when +// run. func TestSync(t *testing.T) { checkout, cleanup := setup(t) defer cleanup() - // Let's test that cluster.Sync gets called with the appropriate - // things when we add and remove resources from the config. - // Start with nothing running. We should be told to apply all the things. - mockCluster := &cluster.Mock{} manifests := &kubernetes.Manifests{} - var clus cluster.Cluster = &syncCluster{mockCluster, map[string][]byte{}} + clus := &syncCluster{map[string]string{}} dirs := checkout.ManifestDirs() resources, err := manifests.LoadManifests(checkout.Dir(), dirs) @@ -40,134 +28,10 @@ func TestSync(t *testing.T) { t.Fatal(err) } - if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil { - t.Fatal(err) - } - checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs) - - for _, res := range testfiles.ServiceMap(checkout.Dir()) { - if err := execCommand("rm", res[0]); err != nil { - t.Fatal(err) - } - commitAction := git.CommitAction{Author: "", Message: "deleted " + res[0]} - if err := checkout.CommitAndPush(context.Background(), commitAction, nil); err != nil { - t.Fatal(err) - } - break - } - - resources, err = manifests.LoadManifests(checkout.Dir(), dirs) - if err != nil { - t.Fatal(err) - } - if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil { + if err := Sync("synctest", resources, clus); err != nil { t.Fatal(err) } - checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs) -} - -func TestPrepareSyncDelete(t *testing.T) { - var tests = []struct { - msg string - repoRes map[string]resource.Resource - res resource.Resource - expected *cluster.SyncDef - }{ - { - msg: "No repo resources provided during sync delete", - repoRes: map[string]resource.Resource{}, - res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{}, - }, - { - msg: "No policy to ignore in place during sync delete", - repoRes: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), - "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), - "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), - "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), - }, - res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{}, - }, - { - msg: "No policy to ignore during sync delete", - repoRes: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), - "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), - "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), - "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), - }, - res: mockResourceWithoutIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{Delete: mockResourceWithoutIgnorePolicy("service", "ns1", "s2")}}}, - }, - } - - logger := log.NewNopLogger() - for _, sc := range tests { - sync := &cluster.SyncDef{} - prepareSyncDelete(logger, sc.repoRes, sc.res.ResourceID().String(), sc.res, sync) - - if !reflect.DeepEqual(sc.expected, sync) { - t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expected, sync) - } - } -} - -func TestPrepareSyncApply(t *testing.T) { - var tests = []struct { - msg string - clusRes map[string]resource.Resource - res resource.Resource - expected *cluster.SyncDef - }{ - { - msg: "No repo resources provided during sync apply", - clusRes: map[string]resource.Resource{}, - res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{}, - }, - { - msg: "No policy to ignore in place during sync apply", - clusRes: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), - "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), - "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), - "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), - }, - res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{}, - }, - { - msg: "No policy to ignore during sync apply", - clusRes: map[string]resource.Resource{ - "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), - "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), - "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), - "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), - "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), - "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), - }, - res: mockResourceWithoutIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{Apply: mockResourceWithoutIgnorePolicy("service", "ns1", "s2")}}}, - }, - } - - logger := log.NewNopLogger() - for _, sc := range tests { - sync := &cluster.SyncDef{} - prepareSyncApply(logger, sc.clusRes, sc.res.ResourceID().String(), sc.res, sync) - - if !reflect.DeepEqual(sc.expected, sync) { - t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expected, sync) - } - } + checkClusterMatchesFiles(t, manifests, clus.resources, checkout.Dir(), dirs) } // --- @@ -183,49 +47,22 @@ func setup(t *testing.T) (*git.Checkout, func()) { return gittest.Checkout(t) } -func execCommand(cmd string, args ...string) error { - c := exec.Command(cmd, args...) - fmt.Printf("exec: %s %s\n", cmd, strings.Join(args, " ")) - c.Stderr = os.Stderr - c.Stdout = os.Stdout - return c.Run() -} - // A cluster that keeps track of exactly what it's been told to apply // or delete and parrots it back when asked to Export. This is as -// mechanically simple as possible! +// mechanically simple as possible. -type syncCluster struct { - *cluster.Mock - resources map[string][]byte -} +type syncCluster struct{ resources map[string]string } -func (p *syncCluster) Sync(def cluster.SyncDef) error { +func (p *syncCluster) Sync(def cluster.SyncSet) error { println("=== Syncing ===") - for _, action := range def.Actions { - if action.Delete != nil { - println("Deleting " + action.Delete.ResourceID().String()) - delete(p.resources, action.Delete.ResourceID().String()) - } - if action.Apply != nil { - println("Applying " + action.Apply.ResourceID().String()) - p.resources[action.Apply.ResourceID().String()] = action.Apply.Bytes() - } + for _, resource := range def.Resources { + println("Applying " + resource.ResourceID().String()) + p.resources[resource.ResourceID().String()] = string(resource.Bytes()) } println("=== Done syncing ===") return nil } -func (p *syncCluster) Export() ([]byte, error) { - // We need a response for Export, which is supposed to supply the - // entire configuration as a lump of bytes. - var configs [][]byte - for _, config := range p.resources { - configs = append(configs, config) - } - return bytes.Join(configs, []byte("\n---\n")), nil -} - func resourcesToStrings(resources map[string]resource.Resource) map[string]string { res := map[string]string{} for k, r := range resources { @@ -236,24 +73,11 @@ func resourcesToStrings(resources map[string]resource.Resource) map[string]strin // Our invariant is that the model we can export from the cluster // should always reflect what's in git. So, let's check that. -func checkClusterMatchesFiles(t *testing.T, m cluster.Manifests, c cluster.Cluster, base string, dirs []string) { - conf, err := c.Export() - if err != nil { - t.Fatal(err) - } - resources, err := m.ParseManifests(conf) - if err != nil { - t.Fatal(err) - } +func checkClusterMatchesFiles(t *testing.T, m cluster.Manifests, resources map[string]string, base string, dirs []string) { files, err := m.LoadManifests(base, dirs) if err != nil { t.Fatal(err) } - expected := resourcesToStrings(files) - got := resourcesToStrings(resources) - - if !reflect.DeepEqual(expected, got) { - t.Errorf("expected:\n%#v\ngot:\n%#v", expected, got) - } + assert.Equal(t, resources, resourcesToStrings(files)) }