Skip to content

Commit

Permalink
add podgroup controller
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing4 committed Aug 1, 2019
1 parent e3fa672 commit d7b8df6
Show file tree
Hide file tree
Showing 9 changed files with 564 additions and 5 deletions.
11 changes: 7 additions & 4 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultSchedulerName = "volcano"
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -38,8 +39,9 @@ type ServerOption struct {
KubeAPIQPS float32
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
// concurrently. Larger number = faster job updating, but more CPU load.
WorkerThreads uint32
SchedulerName string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -60,6 +62,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
4 changes: 3 additions & 1 deletion cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package options

import (
"github.com/spf13/pflag"
"reflect"
"testing"

"github.com/spf13/pflag"
)

func TestAddFlags(t *testing.T) {
Expand All @@ -40,6 +41,7 @@ func TestAddFlags(t *testing.T) {
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
3 changes: 3 additions & 0 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/podgroup"
"volcano.sh/volcano/pkg/controllers/queue"
)

Expand Down Expand Up @@ -88,11 +89,13 @@ func Run(opt *options.ServerOption) error {
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/batch/v1alpha1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ const (
JobVersion = "volcano.sh/job-version"
// JobTypeKey job type key used in labels
JobTypeKey = "volcano.sh/job-type"
// PodgroupNamePrefix podgroup name prefix
PodgroupNamePrefix = "podgroup-"
)
145 changes: 145 additions & 0 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podgroup

import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2"
kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2"
)

// Controller the Podgroup Controller type
type Controller struct {
kubeClients kubernetes.Interface
kbClients kbver.Interface

podInformer coreinformers.PodInformer
pgInformer kbinfo.PodGroupInformer

// A store of pods
podLister corelisters.PodLister
podSynced func() bool

// A store of podgroups
pgLister kblister.PodGroupLister
pgSynced func() bool

queue workqueue.RateLimitingInterface
}

// NewPodgroupController create new Podgroup Controller
func NewPodgroupController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
schedulerName string,
) *Controller {
cc := &Controller{
kubeClients: kubeClient,
kbClients: kbClient,

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

factory := informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.podInformer = factory.Core().V1().Pods()
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch obj.(type) {
case *v1.Pod:
pod := obj.(*v1.Pod)
if pod.Spec.SchedulerName == schedulerName &&
(pod.Annotations == nil || pod.Annotations[scheduling.GroupNameAnnotationKey] == "") {
return true
}
return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
},
})

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha2().PodGroups()
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced

return cc
}

// Run start NewPodgroupController
func (cc *Controller) Run(stopCh <-chan struct{}) {
go cc.podInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)

cache.WaitForCacheSync(stopCh, cc.podSynced, cc.pgSynced)

go wait.Until(cc.worker, 0, stopCh)

glog.Infof("PodgroupController is running ...... ")
}

func (cc *Controller) worker() {
for cc.processNextReq() {
}
}

func (cc *Controller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
if shutdown {
glog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(podRequest)
defer cc.queue.Done(req)

pod, err := cc.podLister.Pods(req.podNamespace).Get(req.podName)
if err != nil {
glog.Errorf("Failed to get pod by <%v> from cache: %v", req, err)
return true
}

// normal pod use volcano
if err := cc.createNormalPodPGIfNotExist(pod); err != nil {
glog.Errorf("Failed to handle Pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
cc.queue.AddRateLimited(req)
return true
}

// If no error, forget it.
cc.queue.Forget(req)

return true
}
132 changes: 132 additions & 0 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podgroup

import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
)

type podRequest struct {
podName string
podNamespace string
}

func (cc *Controller) addPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
glog.Errorf("Failed to convert %v to v1.Pod", obj)
return
}

req := podRequest{
podName: pod.Name,
podNamespace: pod.Namespace,
}

cc.queue.Add(req)
}

func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
if pod.Annotations[scheduling.GroupNameAnnotationKey] == "" {
pod.Annotations[scheduling.GroupNameAnnotationKey] = pgName
} else {
if pod.Annotations[scheduling.GroupNameAnnotationKey] != pgName {
glog.Errorf("normal pod %s/%s annotations %s value is not %s, but %s", pod.Namespace, pod.Name,
scheduling.GroupNameAnnotationKey, pgName, pod.Annotations[scheduling.GroupNameAnnotationKey])
}
return nil
}

if _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Update(pod); err != nil {
glog.Errorf("Failed to update pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
return err
}

return nil
}

func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
pgName := generatePodgroupName(pod)

if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to get normal PodGroup for Pod <%s/%s>: %v",
pod.Namespace, pod.Name, err)
return err
}

pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: pgName,
OwnerReferences: newPGOwnerReferences(pod),
},
Spec: scheduling.PodGroupSpec{
MinMember: 1,
},
}

if _, err := cc.kbClients.SchedulingV1alpha2().PodGroups(pod.Namespace).Create(pg); err != nil {
glog.Errorf("Failed to create normal PodGroup for Pod <%s/%s>: %v",
pod.Namespace, pod.Name, err)
return err
}
}

return cc.updatePodAnnotations(pod, pgName)
}

func generatePodgroupName(pod *v1.Pod) string {
pgName := vkbatchv1.PodgroupNamePrefix

if len(pod.OwnerReferences) != 0 {
for _, ownerReference := range pod.OwnerReferences {
if ownerReference.Controller != nil && *ownerReference.Controller == true {
pgName += string(ownerReference.UID)
return pgName
}
}
}

pgName += string(pod.UID)

return pgName
}

func newPGOwnerReferences(pod *v1.Pod) []metav1.OwnerReference {
if len(pod.OwnerReferences) != 0 {
return pod.OwnerReferences
}

isController := true
return []metav1.OwnerReference{{
APIVersion: helpers.JobKind.GroupVersion().String(),
Controller: &isController,
UID: pod.UID,
}}
}
Loading

0 comments on commit d7b8df6

Please sign in to comment.