From e4e871e0bf905cc45fde758b17fd751e7bc6881a Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Sat, 3 Aug 2019 10:33:04 +0800 Subject: [PATCH] optimize controllers --- cmd/controllers/app/server.go | 42 ++++++++++--------- .../garbagecollector/garbagecollector.go | 4 +- .../garbagecollector/garbagecollector_test.go | 2 +- pkg/controllers/job/job_controller.go | 32 ++++++-------- pkg/controllers/job/job_controller_actions.go | 4 +- .../job/job_controller_actions_test.go | 2 +- .../job/job_controller_handler_test.go | 27 +++++------- .../job/job_controller_plugins_test.go | 8 ++-- pkg/controllers/podgroup/pg_controller.go | 4 +- .../podgroup/pg_controller_test.go | 4 +- 10 files changed, 61 insertions(+), 68 deletions(-) diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index ceec83bdd4..565bebaeed 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -26,6 +26,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -38,8 +39,6 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" - kbver "volcano.sh/volcano/pkg/client/clientset/versioned" - "volcano.sh/volcano/cmd/controllers/app/options" vkclient "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/garbagecollector" @@ -81,23 +80,7 @@ func Run(opt *options.ServerOption) error { return err } - // TODO: add user agent for different controllers - kubeClient := clientset.NewForConfigOrDie(config) - kbClient := kbver.NewForConfigOrDie(config) - vkClient := vkclient.NewForConfigOrDie(config) - - jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads) - queueController := queue.NewQueueController(kubeClient, kbClient) - garbageCollector := garbagecollector.New(vkClient) - pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName) - - run := func(ctx context.Context) { - go jobController.Run(ctx.Done()) - go queueController.Run(ctx.Done()) - go garbageCollector.Run(ctx.Done()) - go pgController.Run(ctx.Done()) - <-ctx.Done() - } + run := startControllers(config, opt) if !opt.EnableLeaderElection { run(context.TODO()) @@ -147,3 +130,24 @@ func Run(opt *options.ServerOption) error { }) return fmt.Errorf("lost lease") } + +func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) { + // TODO: add user agent for different controllers + kubeClient := clientset.NewForConfigOrDie(config) + vkClient := vkclient.NewForConfigOrDie(config) + + sharedInformers := informers.NewSharedInformerFactory(kubeClient, 0) + + jobController := job.NewJobController(kubeClient, vkClient, sharedInformers, opt.WorkerThreads) + queueController := queue.NewQueueController(kubeClient, vkClient) + garbageCollector := garbagecollector.NewGarbageCollector(vkClient) + pgController := podgroup.NewPodgroupController(kubeClient, vkClient, sharedInformers, opt.SchedulerName) + + return func(ctx context.Context) { + go jobController.Run(ctx.Done()) + go queueController.Run(ctx.Done()) + go garbageCollector.Run(ctx.Done()) + go pgController.Run(ctx.Done()) + <-ctx.Done() + } +} diff --git a/pkg/controllers/garbagecollector/garbagecollector.go b/pkg/controllers/garbagecollector/garbagecollector.go index 0da9cbb763..95519be58e 100644 --- a/pkg/controllers/garbagecollector/garbagecollector.go +++ b/pkg/controllers/garbagecollector/garbagecollector.go @@ -58,8 +58,8 @@ type GarbageCollector struct { queue workqueue.RateLimitingInterface } -// New creates an instance of GarbageCollector -func New(vkClient vkver.Interface) *GarbageCollector { +// NewGarbageCollector creates an instance of GarbageCollector +func NewGarbageCollector(vkClient vkver.Interface) *GarbageCollector { jobInformer := vkinfoext.NewSharedInformerFactory(vkClient, 0).Batch().V1alpha1().Jobs() gb := &GarbageCollector{ diff --git a/pkg/controllers/garbagecollector/garbagecollector_test.go b/pkg/controllers/garbagecollector/garbagecollector_test.go index 698af4181f..2a1735a022 100644 --- a/pkg/controllers/garbagecollector/garbagecollector_test.go +++ b/pkg/controllers/garbagecollector/garbagecollector_test.go @@ -83,7 +83,7 @@ func TestGarbageCollector_ProcessTTL(t *testing.T) { }, } for i, testcase := range testcases { - gc := New(volcanoclient.NewSimpleClientset()) + gc := NewGarbageCollector(volcanoclient.NewSimpleClientset()) expired, err := gc.processTTL(testcase.Job) if err != nil { diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 1aa45a7be3..af8358f2a5 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - kbver "volcano.sh/volcano/pkg/client/clientset/versioned" kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2" kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2" @@ -61,16 +60,14 @@ import ( type Controller struct { kubeClients kubernetes.Interface vkClients vkver.Interface - kbClients kbver.Interface - jobInformer vkbatchinfo.JobInformer - podInformer coreinformers.PodInformer - pvcInformer coreinformers.PersistentVolumeClaimInformer - pgInformer kbinfo.PodGroupInformer - svcInformer coreinformers.ServiceInformer - cmdInformer vkcoreinfo.CommandInformer - pcInformer schedv1.PriorityClassInformer - sharedInformers informers.SharedInformerFactory + jobInformer vkbatchinfo.JobInformer + podInformer coreinformers.PodInformer + pvcInformer coreinformers.PersistentVolumeClaimInformer + pgInformer kbinfo.PodGroupInformer + svcInformer coreinformers.ServiceInformer + cmdInformer vkcoreinfo.CommandInformer + pcInformer schedv1.PriorityClassInformer // A store of jobs jobLister vkbatchlister.JobLister @@ -113,8 +110,8 @@ type Controller struct { // NewJobController create new Job Controller func NewJobController( kubeClient kubernetes.Interface, - kbClient kbver.Interface, vkClient vkver.Interface, + sharedInformers informers.SharedInformerFactory, workers uint32, ) *Controller { @@ -127,7 +124,6 @@ func NewJobController( cc := &Controller{ kubeClients: kubeClient, vkClients: vkClient, - kbClients: kbClient, queueList: make([]workqueue.RateLimitingInterface, workers, workers), commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), cache: jobcache.New(), @@ -158,8 +154,7 @@ func NewJobController( cc.cmdLister = cc.cmdInformer.Lister() cc.cmdSynced = cc.cmdInformer.Informer().HasSynced - cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0) - cc.podInformer = cc.sharedInformers.Core().V1().Pods() + cc.podInformer = sharedInformers.Core().V1().Pods() cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cc.addPod, UpdateFunc: cc.updatePod, @@ -169,22 +164,22 @@ func NewJobController( cc.podLister = cc.podInformer.Lister() cc.podSynced = cc.podInformer.Informer().HasSynced - cc.pvcInformer = cc.sharedInformers.Core().V1().PersistentVolumeClaims() + cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims() cc.pvcLister = cc.pvcInformer.Lister() cc.pvcSynced = cc.pvcInformer.Informer().HasSynced - cc.svcInformer = cc.sharedInformers.Core().V1().Services() + cc.svcInformer = sharedInformers.Core().V1().Services() cc.svcLister = cc.svcInformer.Lister() cc.svcSynced = cc.svcInformer.Informer().HasSynced - cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha2().PodGroups() + cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.vkClients, 0).Scheduling().V1alpha2().PodGroups() cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: cc.updatePodGroup, }) cc.pgLister = cc.pgInformer.Lister() cc.pgSynced = cc.pgInformer.Informer().HasSynced - cc.pcInformer = cc.sharedInformers.Scheduling().V1beta1().PriorityClasses() + cc.pcInformer = sharedInformers.Scheduling().V1beta1().PriorityClasses() cc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cc.addPriorityClass, DeleteFunc: cc.deletePriorityClass, @@ -203,7 +198,6 @@ func NewJobController( // Run start JobController func (cc *Controller) Run(stopCh <-chan struct{}) { - go cc.sharedInformers.Start(stopCh) go cc.jobInformer.Informer().Run(stopCh) go cc.podInformer.Informer().Run(stopCh) go cc.pvcInformer.Informer().Run(stopCh) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 5d22c97e2a..6032740631 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -126,7 +126,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM } // Delete PodGroup - if err := cc.kbClients.SchedulingV1alpha2().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil { + if err := cc.vkClients.SchedulingV1alpha2().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil { if !apierrors.IsNotFound(err) { glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v", job.Namespace, job.Name, err) @@ -468,7 +468,7 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error { }, } - if _, err = cc.kbClients.SchedulingV1alpha2().PodGroups(job.Namespace).Create(pg); err != nil { + if _, err = cc.vkClients.SchedulingV1alpha2().PodGroups(job.Namespace).Create(pg); err != nil { if !apierrors.IsAlreadyExists(err) { glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v", job.Namespace, job.Name, err) diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index 83df17afdd..58184f007e 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -505,7 +505,7 @@ func TestCreatePodGroupIfNotExistFunc(t *testing.T) { t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpextVal, err) } - _, err = fakeController.kbClients.SchedulingV1alpha2().PodGroups(namespace).Get(testcase.Job.Name, metav1.GetOptions{}) + _, err = fakeController.vkClients.SchedulingV1alpha2().PodGroups(namespace).Get(testcase.Job.Name, metav1.GetOptions{}) if err != nil { t.Error("Expected PodGroup to get created, but not created") } diff --git a/pkg/controllers/job/job_controller_handler_test.go b/pkg/controllers/job/job_controller_handler_test.go index 1df6e2a599..fa7f080f06 100644 --- a/pkg/controllers/job/job_controller_handler_test.go +++ b/pkg/controllers/job/job_controller_handler_test.go @@ -18,22 +18,22 @@ package job import ( "fmt" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" "testing" - "volcano.sh/volcano/pkg/apis/helpers" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkbusv1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" + "volcano.sh/volcano/pkg/apis/helpers" kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" - kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned" vkclientset "volcano.sh/volcano/pkg/client/clientset/versioned" - //"volcano.sh/volcano/pkg/controllers/job" ) func newController() *Controller { @@ -45,23 +45,16 @@ func newController() *Controller { }, ) - kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{ - Host: "", - ContentConfig: rest.ContentConfig{ - GroupVersion: &kbv1.SchemeGroupVersion, - }, - }, - ) - - config := &rest.Config{ + vkclient := vkclientset.NewForConfigOrDie(&rest.Config{ Host: "", ContentConfig: rest.ContentConfig{ GroupVersion: &vkv1.SchemeGroupVersion, }, - } + }) + + sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - vkclient := vkclientset.NewForConfigOrDie(config) - controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3) + controller := NewJobController(kubeClientSet, vkclient, sharedInformers, 3) return controller } diff --git a/pkg/controllers/job/job_controller_plugins_test.go b/pkg/controllers/job/job_controller_plugins_test.go index 460814f461..da3ed9246d 100644 --- a/pkg/controllers/job/job_controller_plugins_test.go +++ b/pkg/controllers/job/job_controller_plugins_test.go @@ -22,20 +22,20 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" kubeclient "k8s.io/client-go/kubernetes/fake" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake" - - kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake" ) func newFakeController() *Controller { - KubeBatchClientSet := kubebatchclient.NewSimpleClientset() VolcanoClientSet := volcanoclient.NewSimpleClientset() KubeClientSet := kubeclient.NewSimpleClientset() - controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3) + sharedInformers := informers.NewSharedInformerFactory(KubeClientSet, 0) + + controller := NewJobController(KubeClientSet, VolcanoClientSet, sharedInformers, 3) return controller } diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go index 40d2b56e12..c8bee2c227 100644 --- a/pkg/controllers/podgroup/pg_controller.go +++ b/pkg/controllers/podgroup/pg_controller.go @@ -58,6 +58,7 @@ type Controller struct { func NewPodgroupController( kubeClient kubernetes.Interface, kbClient kbver.Interface, + sharedInformers informers.SharedInformerFactory, schedulerName string, ) *Controller { cc := &Controller{ @@ -67,8 +68,7 @@ func NewPodgroupController( queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } - factory := informers.NewSharedInformerFactory(cc.kubeClients, 0) - cc.podInformer = factory.Core().V1().Pods() + cc.podInformer = sharedInformers.Core().V1().Pods() cc.podLister = cc.podInformer.Lister() cc.podSynced = cc.podInformer.Informer().HasSynced cc.podInformer.Informer().AddEventHandler( diff --git a/pkg/controllers/podgroup/pg_controller_test.go b/pkg/controllers/podgroup/pg_controller_test.go index 1e0f60412d..20c01255e4 100644 --- a/pkg/controllers/podgroup/pg_controller_test.go +++ b/pkg/controllers/podgroup/pg_controller_test.go @@ -22,6 +22,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" kubeclient "k8s.io/client-go/kubernetes/fake" scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" @@ -31,8 +32,9 @@ import ( func newFakeController() *Controller { KubeClientSet := kubeclient.NewSimpleClientset() KubeBatchClientSet := kubebatchclient.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(KubeClientSet, 0) - controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, "volcano") + controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, sharedInformers, "volcano") return controller }