Skip to content

Commit

Permalink
Merge pull request kubernetes#2 from grantr/remove-deployments
Browse files Browse the repository at this point in the history
Remove deployment objects
  • Loading branch information
grantr authored Jan 5, 2018
2 parents 4e4f336 + 26b29af commit 5ac1ef7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 208 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ This is an example of how to build a kube-like controller with a single type.

```sh
# assumes you have a working kubeconfig, not required if operating in-cluster
$ go run *.go -kubeconfig=$HOME/.kube/config
$ go run *.go -kubeconfig=$HOME/.kube/config -logtostderr=true -stderrthreshold=INFO

# create a CustomResourceDefinition
$ kubectl create -f artifacts/examples/crd.yaml

# create a custom resource of type Foo
$ kubectl create -f artifacts/examples/example-foo.yaml

# check deployments created through the custom resource
$ kubectl get deployments
```

## Use Cases
Expand Down
229 changes: 24 additions & 205 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,18 @@ import (
"time"

"github.com/golang/glog"
appsv1beta2 "k8s.io/api/apps/v1beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1beta2"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

samplev1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
clientset "k8s.io/sample-controller/pkg/client/clientset/versioned"
samplescheme "k8s.io/sample-controller/pkg/client/clientset/versioned/scheme"
informers "k8s.io/sample-controller/pkg/client/informers/externalversions"
Expand All @@ -53,9 +48,6 @@ const (
// to sync due to a Deployment of the same name already existing.
ErrResourceExists = "ErrResourceExists"

// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Deployment already existing
MessageResourceExists = "Resource %q already exists and is not managed by Foo"
// MessageResourceSynced is the message used for an Event fired when a Foo
// is synced successfully
MessageResourceSynced = "Foo synced successfully"
Expand All @@ -68,10 +60,8 @@ type Controller struct {
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface

deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
Expand All @@ -91,9 +81,7 @@ func NewController(
kubeInformerFactory kubeinformers.SharedInformerFactory,
sampleInformerFactory informers.SharedInformerFactory) *Controller {

// obtain references to shared index informers for the Deployment and Foo
// types.
deploymentInformer := kubeInformerFactory.Apps().V1beta2().Deployments()
// obtain a reference to a shared index informer for the Foo type.
fooInformer := sampleInformerFactory.Samplecontroller().V1alpha1().Foos()

// Create event broadcaster
Expand All @@ -107,14 +95,12 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}

glog.Info("Setting up event handlers")
Expand All @@ -125,26 +111,6 @@ func NewController(
controller.enqueueFoo(new)
},
})
// Set up an event handler for when Deployment resources change. This
// handler will lookup the owner of the given Deployment, and if it is
// owned by a Foo resource will enqueue that Foo resource for
// processing. This way, we don't need to implement custom logic for
// handling Deployment resources. More info on this pattern:
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1beta2.Deployment)
oldDepl := old.(*appsv1beta2.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})

return controller
}
Expand All @@ -162,7 +128,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {

// Wait for the caches to be synced before starting workers
glog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

Expand Down Expand Up @@ -240,6 +206,19 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
Expand All @@ -264,168 +243,8 @@ func (c *Controller) syncHandler(key string) error {
return err
}

deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
runtime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}

// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1beta2().Deployments(foo.Namespace).Create(newDeployment(foo))
}

// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}

// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and ret
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}

// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
glog.V(4).Infof("Foor: %d, deplR: %d", *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1beta2().Deployments(foo.Namespace).Update(newDeployment(foo))
}

// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. THis could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}

// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
//TODO Do stuff with Foo here!

c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}

func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1beta2.Deployment) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
fooCopy := foo.DeepCopy()
fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// Until #38113 is merged, we must use Update instead of UpdateStatus to
// update the Status block of the Foo resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy)
return err
}

// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}

// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
glog.V(4).Infof("Processing object: %s", object.GetName())
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more
// with it.
if ownerRef.Kind != "Foo" {
return
}

foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
glog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
return
}

c.enqueueFoo(foo)
return
}
}

// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1beta2.Deployment {
labels := map[string]string{
"app": "nginx",
"controller": foo.Name,
}
return &appsv1beta2.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: foo.Spec.DeploymentName,
Namespace: foo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(foo, schema.GroupVersionKind{
Group: samplev1alpha1.SchemeGroupVersion.Group,
Version: samplev1alpha1.SchemeGroupVersion.Version,
Kind: "Foo",
}),
},
},
Spec: appsv1beta2.DeploymentSpec{
Replicas: foo.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func main() {
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
glog.Flush()
}

func init() {
Expand Down

0 comments on commit 5ac1ef7

Please sign in to comment.