Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean some redundant job/pg/gc controllers code #405

Merged
merged 1 commit into from
Aug 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -38,8 +39,6 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "volcano.sh/volcano/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
Expand Down Expand Up @@ -81,23 +80,7 @@ func Run(opt *options.ServerOption) error {
return err
}

// TODO: add user agent for different controllers
kubeClient := clientset.NewForConfigOrDie(config)
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}
run := startControllers(config, opt)

if !opt.EnableLeaderElection {
run(context.TODO())
Expand Down Expand Up @@ -147,3 +130,24 @@ func Run(opt *options.ServerOption) error {
})
return fmt.Errorf("lost lease")
}

func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
// TODO: add user agent for different controllers
kubeClient := clientset.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

sharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)

jobController := job.NewJobController(kubeClient, vkClient, sharedInformers, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, vkClient)
garbageCollector := garbagecollector.NewGarbageCollector(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, vkClient, sharedInformers, opt.SchedulerName)

return func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}
}
4 changes: 2 additions & 2 deletions pkg/controllers/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type GarbageCollector struct {
queue workqueue.RateLimitingInterface
}

// New creates an instance of GarbageCollector
func New(vkClient vkver.Interface) *GarbageCollector {
// NewGarbageCollector creates an instance of GarbageCollector
func NewGarbageCollector(vkClient vkver.Interface) *GarbageCollector {
jobInformer := vkinfoext.NewSharedInformerFactory(vkClient, 0).Batch().V1alpha1().Jobs()

gb := &GarbageCollector{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestGarbageCollector_ProcessTTL(t *testing.T) {
},
}
for i, testcase := range testcases {
gc := New(volcanoclient.NewSimpleClientset())
gc := NewGarbageCollector(volcanoclient.NewSimpleClientset())

expired, err := gc.processTTL(testcase.Job)
if err != nil {
Expand Down
32 changes: 13 additions & 19 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2"
kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2"
Expand All @@ -61,16 +60,14 @@ import (
type Controller struct {
kubeClients kubernetes.Interface
vkClients vkver.Interface
kbClients kbver.Interface

jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
pcInformer schedv1.PriorityClassInformer
sharedInformers informers.SharedInformerFactory
jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
pcInformer schedv1.PriorityClassInformer

// A store of jobs
jobLister vkbatchlister.JobLister
Expand Down Expand Up @@ -113,8 +110,8 @@ type Controller struct {
// NewJobController create new Job Controller
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
sharedInformers informers.SharedInformerFactory,
workers uint32,
) *Controller {

Expand All @@ -127,7 +124,6 @@ func NewJobController(
cc := &Controller{
kubeClients: kubeClient,
vkClients: vkClient,
kbClients: kbClient,
queueList: make([]workqueue.RateLimitingInterface, workers, workers),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
Expand Down Expand Up @@ -158,8 +154,7 @@ func NewJobController(
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.podInformer = cc.sharedInformers.Core().V1().Pods()
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
Expand All @@ -169,22 +164,22 @@ func NewJobController(
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced

cc.pvcInformer = cc.sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced

cc.svcInformer = cc.sharedInformers.Core().V1().Services()
cc.svcInformer = sharedInformers.Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha2().PodGroups()
cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.vkClients, 0).Scheduling().V1alpha2().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cc.updatePodGroup,
})
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced

cc.pcInformer = cc.sharedInformers.Scheduling().V1beta1().PriorityClasses()
cc.pcInformer = sharedInformers.Scheduling().V1beta1().PriorityClasses()
cc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPriorityClass,
DeleteFunc: cc.deletePriorityClass,
Expand All @@ -203,7 +198,6 @@ func NewJobController(
// Run start JobController
func (cc *Controller) Run(stopCh <-chan struct{}) {

go cc.sharedInformers.Start(stopCh)
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
}

// Delete PodGroup
if err := cc.kbClients.SchedulingV1alpha2().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
if err := cc.vkClients.SchedulingV1alpha2().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
Expand Down Expand Up @@ -468,7 +468,7 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
},
}

if _, err = cc.kbClients.SchedulingV1alpha2().PodGroups(job.Namespace).Create(pg); err != nil {
if _, err = cc.vkClients.SchedulingV1alpha2().PodGroups(job.Namespace).Create(pg); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestCreatePodGroupIfNotExistFunc(t *testing.T) {
t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpextVal, err)
}

_, err = fakeController.kbClients.SchedulingV1alpha2().PodGroups(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
_, err = fakeController.vkClients.SchedulingV1alpha2().PodGroups(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
if err != nil {
t.Error("Expected PodGroup to get created, but not created")
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ package job

import (
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"testing"
"volcano.sh/volcano/pkg/apis/helpers"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkbusv1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned"
vkclientset "volcano.sh/volcano/pkg/client/clientset/versioned"
//"volcano.sh/volcano/pkg/controllers/job"
)

func newController() *Controller {
Expand All @@ -45,23 +45,16 @@ func newController() *Controller {
},
)

kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &kbv1.SchemeGroupVersion,
},
},
)

config := &rest.Config{
vkclient := vkclientset.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &vkv1.SchemeGroupVersion,
},
}
})

sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)

vkclient := vkclientset.NewForConfigOrDie(config)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3)
controller := NewJobController(kubeClientSet, vkclient, sharedInformers, 3)

return controller
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclient "k8s.io/client-go/kubernetes/fake"

vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake"

kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake"
)

func newFakeController() *Controller {
KubeBatchClientSet := kubebatchclient.NewSimpleClientset()
VolcanoClientSet := volcanoclient.NewSimpleClientset()
KubeClientSet := kubeclient.NewSimpleClientset()

controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3)
sharedInformers := informers.NewSharedInformerFactory(KubeClientSet, 0)

controller := NewJobController(KubeClientSet, VolcanoClientSet, sharedInformers, 3)
return controller
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Controller struct {
func NewPodgroupController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
sharedInformers informers.SharedInformerFactory,
schedulerName string,
) *Controller {
cc := &Controller{
Expand All @@ -67,8 +68,7 @@ func NewPodgroupController(
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

factory := informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.podInformer = factory.Core().V1().Pods()
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.podInformer.Informer().AddEventHandler(
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/podgroup/pg_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
kubeclient "k8s.io/client-go/kubernetes/fake"

scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
Expand All @@ -31,8 +32,9 @@ import (
func newFakeController() *Controller {
KubeClientSet := kubeclient.NewSimpleClientset()
KubeBatchClientSet := kubebatchclient.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(KubeClientSet, 0)

controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, "volcano")
controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, sharedInformers, "volcano")
return controller
}

Expand Down