From 83f0616a8aa3eb1debeeabed4fb923260d3ef921 Mon Sep 17 00:00:00 2001 From: Hang Yan Date: Fri, 20 Sep 2019 10:39:16 +0800 Subject: [PATCH] Use workqueue for chartrepo --- pkg/controller/chartrepo.go | 5 +- pkg/controller/controller.go | 128 +++++++++++++++++++++++++++++++-- pkg/controller/eventhandler.go | 6 +- 3 files changed, 129 insertions(+), 10 deletions(-) diff --git a/pkg/controller/chartrepo.go b/pkg/controller/chartrepo.go index e2dd0be9..e9c5630c 100644 --- a/pkg/controller/chartrepo.go +++ b/pkg/controller/chartrepo.go @@ -9,6 +9,7 @@ import ( "github.com/alauda/captain/pkg/util" "github.com/alauda/helm-crds/pkg/apis/app/v1alpha1" "helm.sh/helm/pkg/repo" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -131,7 +132,9 @@ func (c *Controller) createCharts(cr *v1alpha1.ChartRepo) error { klog.Infof("chart %s/%s not found, create", cr.GetName(), name) _, err = c.appClientSet.AppV1alpha1().Charts(cr.GetNamespace()).Create(chart) if err != nil { - return err + if !apierrors.IsAlreadyExists(err) { + return err + } } continue } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 94e52e4e..d76b9124 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -84,6 +84,7 @@ type Controller struct { helmRequestSynced cache.InformerSynced chartRepoSynced cache.InformerSynced + chartRepoLister listers.ChartRepoLister // ClusterCache is used to store Cluster resource ClusterCache *commoncache.Cache @@ -93,7 +94,8 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. - workQueue workqueue.RateLimitingInterface + workQueue workqueue.RateLimitingInterface + chartRepoWorkQueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder @@ -132,12 +134,14 @@ func NewController(mgr manager.Manager, opt *config.Options, stopCh <-chan struc clusterClient: clusterClient, globalClusterName: opt.GlobalClusterName, }, - restConfig: cfg, - recorder: mgr.GetEventRecorderFor(util.ComponentName), - helmRequestLister: informer.Lister(), - helmRequestSynced: informer.Informer().HasSynced, - chartRepoSynced: repoInformer.Informer().HasSynced, - workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "HelmRequests"), + restConfig: cfg, + recorder: mgr.GetEventRecorderFor(util.ComponentName), + helmRequestLister: informer.Lister(), + chartRepoLister: repoInformer.Lister(), + helmRequestSynced: informer.Informer().HasSynced, + chartRepoSynced: repoInformer.Informer().HasSynced, + workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "HelmRequests"), + chartRepoWorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRepos"), // refresh frequently ClusterCache: commoncache.New(1*time.Minute, 5*time.Minute), } @@ -173,6 +177,7 @@ func (c *Controller) GetClusterClient() clusterclientset.Interface { func (c *Controller) Start(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() defer c.workQueue.ShutDown() + defer c.chartRepoWorkQueue.ShutDown() // Start the informer factories to begin populating the informer caches klog.Info("Starting HelmRequest controller") @@ -187,6 +192,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { // Launch two workers to process HelmRequest resources for i := 0; i < 2; i++ { go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runChartRepoWorker, time.Second, stopCh) } klog.Info("Started workers") @@ -204,6 +210,114 @@ func (c *Controller) runWorker() { } } +func (c *Controller) runChartRepoWorker() { + for c.processNextChartRepo() { + } +} + +// processNextWorkItem will read a single work item off the chartRepoWorkQueue and +// attempt to process it, by calling the syncHandler. +func (c *Controller) processNextChartRepo() bool { + obj, shutdown := c.chartRepoWorkQueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.chartRepoWorkQueue.Done. + err := func(obj interface{}) error { + // We call Done here so the chartRepoWorkQueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the chartRepoWorkQueue and attempted again after a back-off + // period. + defer c.chartRepoWorkQueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the chartRepoWorkQueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // chartRepoWorkQueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // chartRepoWorkQueue. + if key, ok = obj.(string); !ok { + // As the item in the chartRepoWorkQueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.chartRepoWorkQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in chartRepoWorkQueue but got %#v", obj)) + return nil + } + // Start the syncHandler, passing it the namespace/name string of the + // HelmRequest resource to be synced. + if err := c.syncChartRepoHandler(key); err != nil { + // Put the item back on the chartRepoWorkQueue to handle any transient errors. + c.chartRepoWorkQueue.AddRateLimited(key) + return fmt.Errorf("error syncing chartrepo '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.chartRepoWorkQueue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the HelmRequest resource +// with the current status of the resource. +func (c *Controller) syncChartRepoHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + klog.V(9).Infof("") + + // Get the HelmRequest resource with this namespace/name + chartRepo, err := c.chartRepoLister.ChartRepos(namespace).Get(name) + if err != nil { + // The HelmRequest resource may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + utilruntime.HandleError(fmt.Errorf("repo '%s' in work queue no longer exists", key)) + return nil + } + + return err + } + + if !chartRepo.DeletionTimestamp.IsZero() { + klog.Infof("ChartRepo has not nil DeletionTimestamp, starting to delete it: %s", chartRepo.Name) + return nil + } + + c.syncChartRepo(chartRepo) + + return nil +} + +// enqueueHelmRequest takes a HelmRequest resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than HelmRequest. +func (c *Controller) enqueueChartRepo(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.chartRepoWorkQueue.Add(key) +} + // processNextWorkItem will read a single work item off the workQueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool { diff --git a/pkg/controller/eventhandler.go b/pkg/controller/eventhandler.go index 6ade4014..f80ca896 100644 --- a/pkg/controller/eventhandler.go +++ b/pkg/controller/eventhandler.go @@ -20,7 +20,9 @@ func (c *Controller) newChartRepoHandler() cache.ResourceEventHandler { klog.V(2).Info("receive new chartrepo") // TODO: use a queue - go c.syncChartRepo(new) + //go c.syncChartRepo(new) + + c.enqueueChartRepo(new) //if oldChartRepo.Spec.Secret != nil && newChartRepo.Spec.Secret != nil { // if oldChartRepo.Spec.Secret.Name != newChartRepo.Spec.Secret.Name { @@ -45,7 +47,7 @@ func (c *Controller) newChartRepoHandler() cache.ResourceEventHandler { } funcs := cache.ResourceEventHandlerFuncs{ - AddFunc: c.syncChartRepo, + AddFunc: c.enqueueChartRepo, UpdateFunc: updateFunc, DeleteFunc: deleteFunc, }