From 38e5068279440c0bf97009c235dcc6b4606184f7 Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Sun, 5 May 2019 16:01:19 +0800 Subject: [PATCH] fix state convert --- pkg/admission/admit_job.go | 8 +- pkg/controllers/job/job_controller.go | 2 +- pkg/controllers/job/job_controller_actions.go | 73 ++++++++++++++----- pkg/controllers/job/job_controller_handler.go | 12 +-- pkg/controllers/job/state/aborted.go | 2 +- pkg/controllers/job/state/aborting.go | 2 +- pkg/controllers/job/state/factory.go | 4 +- pkg/controllers/job/state/failed.go | 30 -------- pkg/controllers/job/state/restarting.go | 15 ++-- test/e2e/job_error_handling.go | 22 +++--- 10 files changed, 91 insertions(+), 79 deletions(-) delete mode 100644 pkg/controllers/job/state/failed.go diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 93eea205962..76988954843 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -54,11 +54,10 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { msg = validateJob(job, &reviewResponse) break case v1beta1.Update: - oldJob, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource) + _, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource) if err != nil { return ToAdmissionResponse(err) } - msg = specDeepEqual(job, oldJob, &reviewResponse) break default: err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'") @@ -82,6 +81,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st return fmt.Sprintf("'minAvailable' cannot be less than zero.") } + if job.Spec.MaxRetry < 0 { + reviewResponse.Allowed = false + return fmt.Sprintf("'maxRetry' cannot be less than zero.") + } + if len(job.Spec.Tasks) == 0 { reviewResponse.Allowed = false return fmt.Sprintf("No task specified in job spec") diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index b38518f2f05..7ecdd999f28 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -128,7 +128,7 @@ func NewJobController(config *rest.Config) *Controller { cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: cc.addJob, // TODO: enable this until we find an appropriate way. - // UpdateFunc: cc.updateJob, + UpdateFunc: cc.updateJob, DeleteFunc: cc.deleteJob, }) cc.jobLister = cc.jobInformer.Lister() diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index ca9c7e5132b..25fb33ef5c2 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -41,8 +41,6 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name) job := jobInfo.Job - // Job version is bumped only when job is killed - job.Status.Version = job.Status.Version + 1 glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) if job.DeletionTimestamp != nil { glog.Infof("Job <%s/%s> is terminating, skip management process.", @@ -88,6 +86,10 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return fmt.Errorf("failed to kill %d pods of %d", len(errs), total) } + job = job.DeepCopy() + //Job version is bumped only when job is killed + job.Status.Version = job.Status.Version + 1 + job.Status = vkv1.JobStatus{ State: job.Status.State, @@ -112,6 +114,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return err } else { if e := cc.cache.Update(job); e != nil { + glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) return e } } @@ -138,21 +142,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name) - job := jobInfo.Job + job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) - newJob, err := cc.needUpdateForVolumeClaim(job) - if err != nil { + if update, err := cc.filljob(job); err != nil || update { return err } - if newJob != nil { - if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { - glog.Errorf("Failed to update Job %v/%v: %v", - job.Namespace, job.Name, err) - return err - } - return nil - } if err := cc.pluginOnJobAdd(job); err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError), @@ -168,6 +163,18 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta return err } + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } else { + if e := cc.cache.Update(job); e != nil { + glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) + return e + } + } + return nil } @@ -175,7 +182,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt glog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name) - job := jobInfo.Job + job := jobInfo.Job.DeepCopy() glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) if job.DeletionTimestamp != nil { @@ -313,6 +320,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return err } else { if e := cc.cache.Update(job); e != nil { + glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v", + job.Namespace, job.Name, e) return e } } @@ -356,10 +365,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { return nil } -func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) { +func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) { // If VolumeClaimName does not exist, generate them for Job. var newJob *vkv1.Job volumes := job.Spec.Volumes + update := false for index, volume := range volumes { vcName := volume.VolumeClaimName if len(vcName) == 0 { @@ -368,7 +378,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr) exist, err := cc.checkPVCExist(job, vcName) if err != nil { - return nil, err + return false, nil, err } if exist { continue @@ -377,11 +387,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) newJob = job.DeepCopy() } newJob.Spec.Volumes[index].VolumeClaimName = vcName + update = true break } } } - return newJob, nil + return update, newJob, nil } func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) { @@ -494,3 +505,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { return minAvailableTasksRes.Convert2K8sResource() } + +func (cc *Controller) filljob(job *vkv1.Job) (bool, error) { + update, newJob, err := cc.needUpdateForVolumeClaim(job) + if err != nil { + return false, err + } + if update { + if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil { + glog.Errorf("Failed to update Job %v/%v: %v", + job.Namespace, job.Name, err) + return false, err + } + return true, nil + } else if job.Status.State.Phase == "" { + job.Status.State.Phase = vkv1.Pending + if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil { + glog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + } else { + if e := cc.cache.Update(j); e != nil { + glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e) + } + } + return true, nil + } + + return false, nil +} diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 98b414f59b5..8df166507ac 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -81,18 +81,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { return } - if err := cc.cache.Update(newJob); err != nil { - glog.Errorf("Failed to update job <%s/%s>: %v in cache", - newJob.Namespace, newJob.Name, err) - } - // NOTE: Since we only reconcile job based on Spec, we will ignore other attributes // For Job status, it's used internally and always been updated via our controller. - if reflect.DeepEqual(newJob.Spec, oldJob.Spec) { + if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase { glog.Infof("Job update event is ignored since no update in 'Spec'.") return } + if err := cc.cache.Update(newJob); err != nil { + glog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache", + newJob.Namespace, newJob.Name, err) + } + req := apis.Request{ Namespace: newJob.Namespace, JobName: newJob.Name, diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index f7439c1bbca..104170615b4 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -28,7 +28,7 @@ type abortedState struct { func (as *abortedState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - return SyncJob(as.job, func(status *vkv1.JobStatus) { + return KillJob(as.job, func(status *vkv1.JobStatus) { status.State.Phase = vkv1.Restarting status.RetryCount++ }) diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index f688daf9090..cae21cb466d 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: // Already in Restarting phase, just sync it - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) { status.State.Phase = vkv1.Restarting status.RetryCount++ }) diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index f9a6be522a0..a24f6055437 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State { return &runningState{job: jobInfo} case vkv1.Restarting: return &restartingState{job: jobInfo} - case vkv1.Terminated, vkv1.Completed: + case vkv1.Terminated, vkv1.Completed, vkv1.Failed: return &finishedState{job: jobInfo} case vkv1.Terminating: return &terminatingState{job: jobInfo} @@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State { return &abortedState{job: jobInfo} case vkv1.Completing: return &completingState{job: jobInfo} - case vkv1.Failed: - return &failedState{job: jobInfo} case vkv1.Inqueue: return &inqueueState{job: jobInfo} } diff --git a/pkg/controllers/job/state/failed.go b/pkg/controllers/job/state/failed.go deleted file mode 100644 index d969612a1f5..00000000000 --- a/pkg/controllers/job/state/failed.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2017 The Volcano Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - "volcano.sh/volcano/pkg/controllers/apis" -) - -type failedState struct { - job *apis.JobInfo -} - -func (ps *failedState) Execute(action vkv1.Action) error { - return KillJob(ps.job, nil) -} diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index c59395e5e4e..a58dbd78111 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -26,7 +26,7 @@ type restartingState struct { } func (ps *restartingState) Execute(action vkv1.Action) error { - return SyncJob(ps.job, func(status *vkv1.JobStatus) { + return KillJob(ps.job, func(status *vkv1.JobStatus) { phase := vkv1.Restarting // Get the maximum number of retries. @@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error { // Failed is the phase that the job is restarted failed reached the maximum number of retries. phase = vkv1.Failed } else { - if status.Terminating == 0 { - if status.Running >= ps.job.Job.Spec.MinAvailable { - phase = vkv1.Running - } else { - phase = vkv1.Pending - } + total := int32(0) + for _, task := range ps.job.Job.Spec.Tasks { + total += task.Replicas + } + + if total-status.Terminating >= status.MinAvailable { + phase = vkv1.Pending } } diff --git a/test/e2e/job_error_handling.go b/test/e2e/job_error_handling.go index 500aa1adb32..1a875cb851a 100644 --- a/test/e2e/job_error_handling.go +++ b/test/e2e/job_error_handling.go @@ -61,7 +61,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> restarting - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Restarting}) Expect(err).NotTo(HaveOccurred()) }) @@ -98,7 +98,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> Terminating -> Terminated - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Terminating, vkv1.Terminated}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Terminating, vkv1.Terminated}) Expect(err).NotTo(HaveOccurred()) }) @@ -135,7 +135,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> Aborting -> Aborted - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Aborting, vkv1.Aborted}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Aborting, vkv1.Aborted}) Expect(err).NotTo(HaveOccurred()) }) @@ -170,7 +170,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -179,7 +179,7 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) // job phase: Restarting -> Running - err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Running}) + err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) }) @@ -214,7 +214,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -258,7 +258,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -302,7 +302,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) By("delete one pod of job") @@ -311,7 +311,7 @@ var _ = Describe("Job Error Handling", func() { Expect(err).NotTo(HaveOccurred()) // job phase: Restarting -> Running - err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Running}) + err = waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Restarting, vkv1.Pending, vkv1.Inqueue, vkv1.Running}) Expect(err).NotTo(HaveOccurred()) }) @@ -464,7 +464,7 @@ var _ = Describe("Job Error Handling", func() { By("job scheduled, then task 'completed_task' finished and job finally complete") // job phase: pending -> running -> completing -> completed err := waitJobStates(context, job, []vkv1.JobPhase{ - vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed}) + vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completing, vkv1.Completed}) Expect(err).NotTo(HaveOccurred()) }) @@ -503,7 +503,7 @@ var _ = Describe("Job Error Handling", func() { }) // job phase: pending -> running -> restarting - err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Running, vkv1.Restarting}) + err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Restarting}) Expect(err).NotTo(HaveOccurred()) })