diff --git a/README.md b/README.md index d2aaf2b34..4c4aca3e1 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ This is an example of how to build a kube-like controller with a single type. ```sh # assumes you have a working kubeconfig, not required if operating in-cluster -$ go run *.go -kubeconfig=$HOME/.kube/config +$ go run *.go -kubeconfig=$HOME/.kube/config -logtostderr=true -stderrthreshold=INFO # create a CustomResourceDefinition $ kubectl create -f artifacts/examples/crd.yaml @@ -39,8 +39,6 @@ $ kubectl create -f artifacts/examples/crd.yaml # create a custom resource of type Foo $ kubectl create -f artifacts/examples/example-foo.yaml -# check deployments created through the custom resource -$ kubectl get deployments ``` ## Use Cases diff --git a/controller.go b/controller.go index 9ca22f6f8..9db0ff2ce 100644 --- a/controller.go +++ b/controller.go @@ -21,23 +21,18 @@ import ( "time" "github.com/golang/glog" - appsv1beta2 "k8s.io/api/apps/v1beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - appslisters "k8s.io/client-go/listers/apps/v1beta2" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - samplev1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1" clientset "k8s.io/sample-controller/pkg/client/clientset/versioned" samplescheme "k8s.io/sample-controller/pkg/client/clientset/versioned/scheme" informers "k8s.io/sample-controller/pkg/client/informers/externalversions" @@ -53,9 +48,6 @@ const ( // to sync due to a Deployment of the same name already existing. ErrResourceExists = "ErrResourceExists" - // MessageResourceExists is the message used for Events when a resource - // fails to sync due to a Deployment already existing - MessageResourceExists = "Resource %q already exists and is not managed by Foo" // MessageResourceSynced is the message used for an Event fired when a Foo // is synced successfully MessageResourceSynced = "Foo synced successfully" @@ -68,10 +60,8 @@ type Controller struct { // sampleclientset is a clientset for our own API group sampleclientset clientset.Interface - deploymentsLister appslisters.DeploymentLister - deploymentsSynced cache.InformerSynced - foosLister listers.FooLister - foosSynced cache.InformerSynced + foosLister listers.FooLister + foosSynced cache.InformerSynced // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This @@ -91,9 +81,7 @@ func NewController( kubeInformerFactory kubeinformers.SharedInformerFactory, sampleInformerFactory informers.SharedInformerFactory) *Controller { - // obtain references to shared index informers for the Deployment and Foo - // types. - deploymentInformer := kubeInformerFactory.Apps().V1beta2().Deployments() + // obtain a reference to a shared index informer for the Foo type. fooInformer := sampleInformerFactory.Samplecontroller().V1alpha1().Foos() // Create event broadcaster @@ -107,14 +95,12 @@ func NewController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ - kubeclientset: kubeclientset, - sampleclientset: sampleclientset, - deploymentsLister: deploymentInformer.Lister(), - deploymentsSynced: deploymentInformer.Informer().HasSynced, - foosLister: fooInformer.Lister(), - foosSynced: fooInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"), - recorder: recorder, + kubeclientset: kubeclientset, + sampleclientset: sampleclientset, + foosLister: fooInformer.Lister(), + foosSynced: fooInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"), + recorder: recorder, } glog.Info("Setting up event handlers") @@ -125,26 +111,6 @@ func NewController( controller.enqueueFoo(new) }, }) - // Set up an event handler for when Deployment resources change. This - // handler will lookup the owner of the given Deployment, and if it is - // owned by a Foo resource will enqueue that Foo resource for - // processing. This way, we don't need to implement custom logic for - // handling Deployment resources. More info on this pattern: - // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md - deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.handleObject, - UpdateFunc: func(old, new interface{}) { - newDepl := new.(*appsv1beta2.Deployment) - oldDepl := old.(*appsv1beta2.Deployment) - if newDepl.ResourceVersion == oldDepl.ResourceVersion { - // Periodic resync will send update events for all known Deployments. - // Two different versions of the same Deployment will always have different RVs. - return - } - controller.handleObject(new) - }, - DeleteFunc: controller.handleObject, - }) return controller } @@ -162,7 +128,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { // Wait for the caches to be synced before starting workers glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, c.foosSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } @@ -240,6 +206,19 @@ func (c *Controller) processNextWorkItem() bool { return true } +// enqueueFoo takes a Foo resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than Foo. +func (c *Controller) enqueueFoo(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.workqueue.AddRateLimited(key) +} + // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. @@ -264,168 +243,8 @@ func (c *Controller) syncHandler(key string) error { return err } - deploymentName := foo.Spec.DeploymentName - if deploymentName == "" { - // We choose to absorb the error here as the worker would requeue the - // resource otherwise. Instead, the next time the resource is updated - // the resource will be queued again. - runtime.HandleError(fmt.Errorf("%s: deployment name must be specified", key)) - return nil - } - - // Get the deployment with the name specified in Foo.spec - deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) - // If the resource doesn't exist, we'll create it - if errors.IsNotFound(err) { - deployment, err = c.kubeclientset.AppsV1beta2().Deployments(foo.Namespace).Create(newDeployment(foo)) - } - - // If an error occurs during Get/Create, we'll requeue the item so we can - // attempt processing again later. This could have been caused by a - // temporary network failure, or any other transient reason. - if err != nil { - return err - } - - // If the Deployment is not controlled by this Foo resource, we should log - // a warning to the event recorder and ret - if !metav1.IsControlledBy(deployment, foo) { - msg := fmt.Sprintf(MessageResourceExists, deployment.Name) - c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg) - return fmt.Errorf(msg) - } - - // If this number of the replicas on the Foo resource is specified, and the - // number does not equal the current desired replicas on the Deployment, we - // should update the Deployment resource. - if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { - glog.V(4).Infof("Foor: %d, deplR: %d", *foo.Spec.Replicas, *deployment.Spec.Replicas) - deployment, err = c.kubeclientset.AppsV1beta2().Deployments(foo.Namespace).Update(newDeployment(foo)) - } - - // If an error occurs during Update, we'll requeue the item so we can - // attempt processing again later. THis could have been caused by a - // temporary network failure, or any other transient reason. - if err != nil { - return err - } - - // Finally, we update the status block of the Foo resource to reflect the - // current state of the world - err = c.updateFooStatus(foo, deployment) - if err != nil { - return err - } + //TODO Do stuff with Foo here! c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil } - -func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1beta2.Deployment) error { - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use DeepCopy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - fooCopy := foo.DeepCopy() - fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas - // Until #38113 is merged, we must use Update instead of UpdateStatus to - // update the Status block of the Foo resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy) - return err -} - -// enqueueFoo takes a Foo resource and converts it into a namespace/name -// string which is then put onto the work queue. This method should *not* be -// passed resources of any type other than Foo. -func (c *Controller) enqueueFoo(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtime.HandleError(err) - return - } - c.workqueue.AddRateLimited(key) -} - -// handleObject will take any resource implementing metav1.Object and attempt -// to find the Foo resource that 'owns' it. It does this by looking at the -// objects metadata.ownerReferences field for an appropriate OwnerReference. -// It then enqueues that Foo resource to be processed. If the object does not -// have an appropriate OwnerReference, it will simply be skipped. -func (c *Controller) handleObject(obj interface{}) { - var object metav1.Object - var ok bool - if object, ok = obj.(metav1.Object); !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) - return - } - object, ok = tombstone.Obj.(metav1.Object) - if !ok { - runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) - return - } - glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) - } - glog.V(4).Infof("Processing object: %s", object.GetName()) - if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { - // If this object is not owned by a Foo, we should not do anything more - // with it. - if ownerRef.Kind != "Foo" { - return - } - - foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name) - if err != nil { - glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name) - return - } - - c.enqueueFoo(foo) - return - } -} - -// newDeployment creates a new Deployment for a Foo resource. It also sets -// the appropriate OwnerReferences on the resource so handleObject can discover -// the Foo resource that 'owns' it. -func newDeployment(foo *samplev1alpha1.Foo) *appsv1beta2.Deployment { - labels := map[string]string{ - "app": "nginx", - "controller": foo.Name, - } - return &appsv1beta2.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: foo.Spec.DeploymentName, - Namespace: foo.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(foo, schema.GroupVersionKind{ - Group: samplev1alpha1.SchemeGroupVersion.Group, - Version: samplev1alpha1.SchemeGroupVersion.Version, - Kind: "Foo", - }), - }, - }, - Spec: appsv1beta2.DeploymentSpec{ - Replicas: foo.Spec.Replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "nginx", - Image: "nginx:latest", - }, - }, - }, - }, - }, - } -} diff --git a/main.go b/main.go index d3514fc3a..046b536b8 100644 --- a/main.go +++ b/main.go @@ -69,6 +69,7 @@ func main() { if err = controller.Run(2, stopCh); err != nil { glog.Fatalf("Error running controller: %s", err.Error()) } + glog.Flush() } func init() {