-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean code #92
Clean code #92
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,9 @@ limitations under the License. | |
package job | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/golang/glog" | ||
|
||
"k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/informers" | ||
"k8s.io/client-go/kubernetes" | ||
|
@@ -38,8 +35,6 @@ import ( | |
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" | ||
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" | ||
|
||
v1corev1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" | ||
"volcano.sh/volcano/pkg/apis/helpers" | ||
vkver "volcano.sh/volcano/pkg/client/clientset/versioned" | ||
vkscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" | ||
vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" | ||
|
@@ -127,53 +122,18 @@ func NewJobController(config *rest.Config) *Controller { | |
cc.jobSynced = cc.jobInformer.Informer().HasSynced | ||
|
||
cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands() | ||
cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ | ||
FilterFunc: func(obj interface{}) bool { | ||
switch t := obj.(type) { | ||
case *v1corev1.Command: | ||
return helpers.ControlledBy(t, helpers.JobKind) | ||
case cache.DeletedFinalStateUnknown: | ||
if cmd, ok := t.Obj.(*v1corev1.Command); ok { | ||
return helpers.ControlledBy(cmd, helpers.JobKind) | ||
} | ||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Command", obj)) | ||
return false | ||
default: | ||
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj)) | ||
return false | ||
} | ||
}, | ||
Handler: cache.ResourceEventHandlerFuncs{ | ||
AddFunc: cc.addCommand, | ||
}, | ||
cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
AddFunc: cc.addCommand, | ||
}) | ||
cc.cmdLister = cc.cmdInformer.Lister() | ||
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced | ||
|
||
cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0) | ||
podInformer := cc.sharedInformers.Core().V1().Pods() | ||
|
||
podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ | ||
FilterFunc: func(obj interface{}) bool { | ||
switch t := obj.(type) { | ||
case *v1.Pod: | ||
return helpers.ControlledBy(t, helpers.JobKind) | ||
case cache.DeletedFinalStateUnknown: | ||
if pod, ok := t.Obj.(*v1.Pod); ok { | ||
return helpers.ControlledBy(pod, helpers.JobKind) | ||
} | ||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) | ||
return false | ||
default: | ||
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj)) | ||
return false | ||
} | ||
}, | ||
Handler: cache.ResourceEventHandlerFuncs{ | ||
AddFunc: cc.addPod, | ||
UpdateFunc: cc.updatePod, | ||
DeleteFunc: cc.deletePod, | ||
}, | ||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
AddFunc: cc.addPod, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not needed, we already have many other checks in event handlers, like this
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which is faster, just asking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above check is inevitable, so it's enough, if we check twice, then absolutely double time wasting |
||
UpdateFunc: cc.updatePod, | ||
DeleteFunc: cc.deletePod, | ||
}) | ||
|
||
cc.podLister = podInformer.Lister() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,38 +66,20 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn | |
continue | ||
} | ||
|
||
switch pod.Status.Phase { | ||
case v1.PodRunning: | ||
err := cc.deleteJobPod(job.Name, pod) | ||
if err != nil { | ||
running++ | ||
errs = append(errs, err) | ||
continue | ||
} | ||
if err := cc.deleteJobPod(job.Name, pod); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. err == nil There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch |
||
terminating++ | ||
case v1.PodPending: | ||
err := cc.deleteJobPod(job.Name, pod) | ||
if err != nil { | ||
} else { | ||
errs = append(errs, err) | ||
switch pod.Status.Phase { | ||
case v1.PodRunning: | ||
running++ | ||
case v1.PodPending: | ||
pending++ | ||
errs = append(errs, err) | ||
continue | ||
} | ||
terminating++ | ||
case v1.PodSucceeded: | ||
err := cc.deleteJobPod(job.Name, pod) | ||
if err != nil { | ||
case v1.PodSucceeded: | ||
succeeded++ | ||
errs = append(errs, err) | ||
continue | ||
} | ||
case v1.PodFailed: | ||
err := cc.deleteJobPod(job.Name, pod) | ||
if err != nil { | ||
case v1.PodFailed: | ||
failed++ | ||
errs = append(errs, err) | ||
continue | ||
} | ||
terminating++ | ||
} | ||
} | ||
} | ||
|
@@ -220,32 +202,23 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn | |
} | ||
podToCreate = append(podToCreate, newPod) | ||
} else { | ||
delete(pods, podName) | ||
if pod.DeletionTimestamp != nil { | ||
glog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) | ||
terminating++ | ||
delete(pods, podName) | ||
continue | ||
} | ||
|
||
switch pod.Status.Phase { | ||
case v1.PodPending: | ||
if pod.DeletionTimestamp != nil { | ||
terminating++ | ||
} else { | ||
pending++ | ||
} | ||
pending++ | ||
case v1.PodRunning: | ||
if pod.DeletionTimestamp != nil { | ||
terminating++ | ||
} else { | ||
running++ | ||
} | ||
running++ | ||
case v1.PodSucceeded: | ||
succeeded++ | ||
case v1.PodFailed: | ||
failed++ | ||
} | ||
delete(pods, podName) | ||
} | ||
} | ||
|
||
|
@@ -260,7 +233,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn | |
go func(pod *v1.Pod) { | ||
defer waitCreationGroup.Done() | ||
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod) | ||
if err != nil { | ||
if err != nil && !apierrors.IsAlreadyExists(err) { | ||
// Failed to create Pod, waitCreationGroup a moment and then create it again | ||
// This is to ensure all podsMap under the same Job created | ||
// So gang-scheduling could schedule the Job successfully | ||
|
@@ -280,6 +253,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn | |
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate)) | ||
} | ||
|
||
// TODO: Can hardly imagine when this is necessary. | ||
// Delete unnecessary pods. | ||
waitDeletionGroup := sync.WaitGroup{} | ||
waitDeletionGroup.Add(len(podToDelete)) | ||
|
@@ -337,16 +311,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn | |
return nil | ||
} | ||
|
||
func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 { | ||
if current == 0 { | ||
current += 1 | ||
} | ||
if bumpVersion { | ||
current += 1 | ||
} | ||
return current | ||
} | ||
|
||
func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error { | ||
// If Service does not exist, create one for Job. | ||
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { | ||
|
@@ -397,68 +361,66 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { | |
// If input/output PVC does not exist, create them for Job. | ||
inputPVC := job.Annotations[admissioncontroller.PVCInputName] | ||
outputPVC := job.Annotations[admissioncontroller.PVCOutputName] | ||
if job.Spec.Input != nil { | ||
if job.Spec.Input.VolumeClaim != nil { | ||
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
if job.Spec.Input != nil && job.Spec.Input.VolumeClaim != nil { | ||
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
|
||
pvc := &v1.PersistentVolumeClaim{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Namespace: job.Namespace, | ||
Name: inputPVC, | ||
OwnerReferences: []metav1.OwnerReference{ | ||
*metav1.NewControllerRef(job, helpers.JobKind), | ||
}, | ||
pvc := &v1.PersistentVolumeClaim{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Namespace: job.Namespace, | ||
Name: inputPVC, | ||
OwnerReferences: []metav1.OwnerReference{ | ||
*metav1.NewControllerRef(job, helpers.JobKind), | ||
}, | ||
Spec: *job.Spec.Input.VolumeClaim, | ||
} | ||
}, | ||
Spec: *job.Spec.Input.VolumeClaim, | ||
} | ||
|
||
glog.V(3).Infof("Try to create input PVC: %v", pvc) | ||
glog.V(3).Infof("Try to create input PVC: %v", pvc) | ||
|
||
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { | ||
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { | ||
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
} | ||
} | ||
if job.Spec.Output != nil { | ||
if job.Spec.Output.VolumeClaim != nil { | ||
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
|
||
pvc := &v1.PersistentVolumeClaim{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Namespace: job.Namespace, | ||
Name: outputPVC, | ||
OwnerReferences: []metav1.OwnerReference{ | ||
*metav1.NewControllerRef(job, helpers.JobKind), | ||
}, | ||
if job.Spec.Output != nil && job.Spec.Output.VolumeClaim != nil { | ||
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
|
||
pvc := &v1.PersistentVolumeClaim{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Namespace: job.Namespace, | ||
Name: outputPVC, | ||
OwnerReferences: []metav1.OwnerReference{ | ||
*metav1.NewControllerRef(job, helpers.JobKind), | ||
}, | ||
Spec: *job.Spec.Output.VolumeClaim, | ||
} | ||
}, | ||
Spec: *job.Spec.Output.VolumeClaim, | ||
} | ||
|
||
glog.V(3).Infof("Try to create output PVC: %v", pvc) | ||
glog.V(3).Infof("Try to create output PVC: %v", pvc) | ||
|
||
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { | ||
if !apierrors.IsAlreadyExists(err) { | ||
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { | ||
if !apierrors.IsAlreadyExists(err) { | ||
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", | ||
job.Namespace, job.Name, err) | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the
ControllerdBy
filter logic here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any command resource not managed by volcano?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No for now.