Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
- add unit tests for creating suspended, suspending and resuming
- use fake clock for unit tests
- do not return from the syncHandler after worker pods cleanup on
suspend - this allows to continue with the MPIJob update in the same sync
  • Loading branch information
mimowo committed Feb 2, 2023
1 parent 3b8eb54 commit 7357382
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 28 deletions.
73 changes: 51 additions & 22 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -245,6 +246,9 @@ type MPIJobController struct {

// To allow injection of updateStatus for testing.
updateStatusHandler func(mpijob *kubeflow.MPIJob) error

// Clock for internal use of unit-testing
clock clock.WithTicker
}

// NewMPIJobController returns a new MPIJob controller.
Expand All @@ -260,6 +264,26 @@ func NewMPIJobController(
podgroupsInformer podgroupsinformer.PodGroupInformer,
mpiJobInformer informers.MPIJobInformer,
gangSchedulerName string) *MPIJobController {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet,
configMapInformer, secretInformer, serviceInformer, jobInformer,
podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName,
&clock.RealClock{})
}

// NewMPIJobController returns a new MPIJob controller.
func NewMPIJobControllerWithClock(
kubeClient kubernetes.Interface,
kubeflowClient clientset.Interface,
volcanoClientSet volcanoclient.Interface,
configMapInformer coreinformers.ConfigMapInformer,
secretInformer coreinformers.SecretInformer,
serviceInformer coreinformers.ServiceInformer,
jobInformer batchinformers.JobInformer,
podInformer coreinformers.PodInformer,
podgroupsInformer podgroupsinformer.PodGroupInformer,
mpiJobInformer informers.MPIJobInformer,
gangSchedulerName string,
clock clock.WithTicker) *MPIJobController {

// Create event broadcaster.
klog.V(4).Info("Creating event broadcaster")
Expand Down Expand Up @@ -296,6 +320,7 @@ func NewMPIJobController(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"),
recorder: recorder,
gangSchedulerName: gangSchedulerName,
clock: clock,
}

controller.updateStatusHandler = controller.doUpdateJobStatus
Expand Down Expand Up @@ -451,9 +476,9 @@ func (c *MPIJobController) processNextWorkItem() bool {
// converge the two. It then updates the Status block of the MPIJob resource
// with the current status of the resource.
func (c *MPIJobController) syncHandler(key string) error {
startTime := time.Now()
startTime := c.clock.Now()
defer func() {
klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
klog.Infof("Finished syncing job %q (%v)", key, c.clock.Since(startTime))
}()

// Convert the namespace/name string into a distinct namespace and name.
Expand Down Expand Up @@ -505,7 +530,10 @@ func (c *MPIJobController) syncHandler(key string) error {
// cleanup and stop retrying the MPIJob.
if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil {
if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) {
return cleanUpWorkerPods(mpiJob, c)
if err := cleanUpWorkerPods(mpiJob, c); err != nil {
return err
}
return c.updateStatusHandler(mpiJob)
}
return nil
}
Expand Down Expand Up @@ -570,13 +598,6 @@ func (c *MPIJobController) syncHandler(key string) error {
}
}

// Finally, we update the status block of the MPIJob resource to reflect the
// current state of the world.
err = c.updateMPIJobStatus(mpiJob, launcher, worker)
if err != nil {
return err
}

if launcher != nil {
if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) {
// align the suspension state of launcher with the MPIJob
Expand All @@ -593,6 +614,14 @@ func (c *MPIJobController) syncHandler(key string) error {
return err
}
}

// Finally, we update the status block of the MPIJob resource to reflect the
// current state of the world.
err = c.updateMPIJobStatus(mpiJob, launcher, worker)
if err != nil {
return err
}

return nil
}

Expand All @@ -608,7 +637,7 @@ func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
}
}
mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0
return c.updateStatusHandler(mpiJob)
return nil
}

// getLauncherJob gets the launcher Job controlled by this MPIJob.
Expand Down Expand Up @@ -932,23 +961,23 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {

func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker []*corev1.Pod) error {
oldStatus := mpiJob.Status.DeepCopy()
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
if isMPIJobSuspended(mpiJob) {
// it is suspended now
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") {
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") {
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobSuspended", "MPIJob suspended")
}
} else if getCondition(mpiJob.Status, kubeflow.JobSuspended) != nil {
// it is not suspended now, consider resumed if the condition was set before
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, "MPIJobResumed", "MPIJob resumed") {
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, mpiJobResumedReason, "MPIJob resumed") {
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobResumed", "MPIJob resumed")
now := metav1.NewTime(time.Now())
now := metav1.NewTime(c.clock.Now())
mpiJob.Status.StartTime = &now
}
}
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
// Job.status.Active accounts for Pending and Running pods. Count running pods
// from the lister instead.
launcherPodsCnt := countRunningPods(launcherPods)
Expand Down Expand Up @@ -1001,13 +1030,13 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg)
}

if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
if isMPIJobSuspended(mpiJob) {
msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg)
} else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg)
c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
} else if isMPIJobSuspended(mpiJob) {
msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg)
}

// no need to update the mpijob if the status hasn't changed since last time.
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/mpi_job_controller_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
mpiJobRunningReason = "MPIJobRunning"
// mpiJobSuspendedReason is added in a mpijob when it is suspended.
mpiJobSuspendedReason = "MPIJobSuspended"
// mpiJobResumedReason is added in a mpijob when it is resumed.
mpiJobResumedReason = "MPIJobResumed"
// mpiJobFailedReason is added in a mpijob when it is failed.
mpiJobFailedReason = "MPIJobFailed"
// mpiJobEvict
Expand Down
Loading

0 comments on commit 7357382

Please sign in to comment.