Skip to content

Commit

Permalink
Implement support for suspend semantics for MPIJob
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Jan 30, 2023
1 parent 382da78 commit 3cdd581
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 22 deletions.
12 changes: 12 additions & 0 deletions crd/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7884,6 +7884,18 @@ spec:
description: SSHAuthMountPath is the directory where SSH keys are
mounted. Defaults to "/root/.ssh".
type: string
suspend:
default: false
description: "suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to true,
no Pods are created by the Job controller. If a Job is suspended
after creation (i.e. the flag goes from false to true), the Job
controller will delete all active Pods associated with this Job.
Users must design their workload to gracefully handle this. Suspending
a Job will reset the StartTime field of the Job, effectively resetting
the ActiveDeadlineSeconds timer too. Defaults to false. \n Defaults
to false."
type: boolean
required:
- mpiReplicaSpecs
type: object
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596
k8s.io/sample-controller v0.25.6
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
sigs.k8s.io/controller-runtime v0.13.1
volcano.sh/apis v1.7.0
)
Expand Down Expand Up @@ -73,7 +74,6 @@ require (
k8s.io/component-base v0.25.6 // indirect
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions manifests/base/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ spec:
spec:
type: object
properties:
suspend:
type: boolean
slotsPerWorker:
type: integer
minimum: 1
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@
"sshAuthMountPath": {
"description": "SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\".",
"type": "string"
},
"suspend": {
"description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false.\n\nDefaults to false.",
"type": "boolean"
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ type MPIJobSpec struct {
// +kubebuilder:validation:Enum:=OpenMPI;Intel
// +kubebuilder:default:=OpenMPI
MPIImplementation MPIImplementation `json:"mpiImplementation,omitempty"`

// suspend specifies whether the Job controller should create Pods or not. If
// a Job is created with suspend set to true, no Pods are created by the Job
// controller. If a Job is suspended after creation (i.e. the flag goes from
// false to true), the Job controller will delete all active Pods associated
// with this Job. Users must design their workload to gracefully handle this.
// Suspending a Job will reset the StartTime field of the Job, effectively
// resetting the ActiveDeadlineSeconds timer too. Defaults to false.
//
// Defaults to false.
// +kubebuilder:default:=false
Suspend *bool `json:"suspend,omitempty"`
}

// MPIReplicaType is the type for MPIReplica.
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 61 additions & 18 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/pointer"
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
Expand Down Expand Up @@ -449,6 +450,7 @@ func (c *MPIJobController) processNextWorkItem() bool {
// converge the two. It then updates the Status block of the MPIJob resource
// with the current status of the resource.
func (c *MPIJobController) syncHandler(key string) error {
klog.Infof("___ MYDEBUG starting for %s", key)
startTime := time.Now()
defer func() {
klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
Expand Down Expand Up @@ -503,24 +505,13 @@ func (c *MPIJobController) syncHandler(key string) error {
// cleanup and stop retrying the MPIJob.
if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil {
if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) {
// set worker StatefulSet Replicas to 0.
if err := c.deleteWorkerPods(mpiJob); err != nil {
return err
}
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
if c.gangSchedulerName != "" {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
}
mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0
return c.updateStatusHandler(mpiJob)
return cleanUpPods(mpiJob, c)
}
return nil
}

// first set StartTime.
if mpiJob.Status.StartTime == nil {
if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) {
now := metav1.Now()
mpiJob.Status.StartTime = &now
}
Expand Down Expand Up @@ -555,10 +546,11 @@ func (c *MPIJobController) syncHandler(key string) error {
return err
}
}

worker, err = c.getOrCreateWorker(mpiJob)
if err != nil {
return err
if !isMPIJobSuspended(mpiJob) {
worker, err = c.getOrCreateWorker(mpiJob)
if err != nil {
return err
}
}
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel {
// The Intel implementation requires workers to communicate with the
Expand All @@ -585,9 +577,48 @@ func (c *MPIJobController) syncHandler(key string) error {
return err
}

if launcher != nil {
launcherSuspendUpdate := false
if isMPIJobSuspended(mpiJob) && !isJobSuspended(launcher) {
// suspend the launcher first if the MPI job is suspended
launcherSuspendUpdate = true
launcher.Spec.Suspend = pointer.Bool(true)
} else if !isMPIJobSuspended(mpiJob) && isJobSuspended(launcher) {
launcherSuspendUpdate = true
// unsuspend the launcher first if the MPI job is unsuspended
launcher.Spec.Suspend = pointer.Bool(false)
}
if launcherSuspendUpdate {
if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil {
return err
}
}
}

// cleanup the running worker pods if the MPI job is suspended
if isMPIJobSuspended(mpiJob) {
if err := cleanUpPods(mpiJob, c); err != nil {
return err
}
}
return nil
}

func cleanUpPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
// set worker StatefulSet Replicas to 0.
if err := c.deleteWorkerPods(mpiJob); err != nil {
return err
}
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
if c.gangSchedulerName != "" {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
}
mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0
return c.updateStatusHandler(mpiJob)
}

// getLauncherJob gets the launcher Job controlled by this MPIJob.
func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) {
launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix)
Expand Down Expand Up @@ -857,6 +888,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
return workerPods, nil
}

func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool {
return pointer.BoolDeref(mpiJob.Spec.Suspend, false)
}

func isJobSuspended(job *batchv1.Job) bool {
return pointer.BoolDeref(job.Spec.Suspend, false)
}

func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
var (
workerPrefix = mpiJob.Name + workerSuffix
Expand Down Expand Up @@ -1304,7 +1343,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
}

func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job {
return &batchv1.Job{
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: mpiJob.Name + launcherSuffix,
Namespace: mpiJob.Namespace,
Expand All @@ -1322,6 +1361,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
Template: c.newLauncherPodTemplate(mpiJob),
},
}
if isMPIJobSuspended(mpiJob) {
job.Spec.Suspend = pointer.Bool(true)
}
return job
}

// newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 31 additions & 3 deletions sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3cdd581

Please sign in to comment.