From 3cdd58198f6e9693235df13f472528f48a9e632c Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 27 Jan 2023 13:32:30 +0100 Subject: [PATCH] Implement support for suspend semantics for MPIJob --- crd/kubeflow.org_mpijobs.yaml | 12 +++ go.mod | 2 +- manifests/base/crd.yaml | 2 + pkg/apis/kubeflow/v2beta1/swagger.json | 4 + pkg/apis/kubeflow/v2beta1/types.go | 12 +++ .../kubeflow/v2beta1/zz_generated.deepcopy.go | 5 ++ pkg/controller/mpi_job_controller.go | 79 ++++++++++++++----- sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md | 1 + .../mpijob/models/v2beta1_mpi_job_spec.py | 34 +++++++- 9 files changed, 129 insertions(+), 22 deletions(-) diff --git a/crd/kubeflow.org_mpijobs.yaml b/crd/kubeflow.org_mpijobs.yaml index 21b0b469b..eb44e1276 100644 --- a/crd/kubeflow.org_mpijobs.yaml +++ b/crd/kubeflow.org_mpijobs.yaml @@ -7884,6 +7884,18 @@ spec: description: SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to "/root/.ssh". type: string + suspend: + default: false + description: "suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to true, + no Pods are created by the Job controller. If a Job is suspended + after creation (i.e. the flag goes from false to true), the Job + controller will delete all active Pods associated with this Job. + Users must design their workload to gracefully handle this. Suspending + a Job will reset the StartTime field of the Job, effectively resetting + the ActiveDeadlineSeconds timer too. Defaults to false. \n Defaults + to false." + type: boolean required: - mpiReplicaSpecs type: object diff --git a/go.mod b/go.mod index ad2680579..44752c776 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596 k8s.io/sample-controller v0.25.6 + k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed sigs.k8s.io/controller-runtime v0.13.1 volcano.sh/apis v1.7.0 ) @@ -73,7 +74,6 @@ require ( k8s.io/component-base v0.25.6 // indirect k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect k8s.io/klog/v2 v2.70.1 // indirect - k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/manifests/base/crd.yaml b/manifests/base/crd.yaml index e0a4a2fd9..4f36ee106 100644 --- a/manifests/base/crd.yaml +++ b/manifests/base/crd.yaml @@ -23,6 +23,8 @@ spec: spec: type: object properties: + suspend: + type: boolean slotsPerWorker: type: integer minimum: 1 diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 4aaeb81e8..e5f2fb1f3 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -270,6 +270,10 @@ "sshAuthMountPath": { "description": "SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\".", "type": "string" + }, + "suspend": { + "description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false.\n\nDefaults to false.", + "type": "boolean" } } } diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index c1d37528c..893709057 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -62,6 +62,18 @@ type MPIJobSpec struct { // +kubebuilder:validation:Enum:=OpenMPI;Intel // +kubebuilder:default:=OpenMPI MPIImplementation MPIImplementation `json:"mpiImplementation,omitempty"` + + // suspend specifies whether the Job controller should create Pods or not. If + // a Job is created with suspend set to true, no Pods are created by the Job + // controller. If a Job is suspended after creation (i.e. the flag goes from + // false to true), the Job controller will delete all active Pods associated + // with this Job. Users must design their workload to gracefully handle this. + // Suspending a Job will reset the StartTime field of the Job, effectively + // resetting the ActiveDeadlineSeconds timer too. Defaults to false. + // + // Defaults to false. + // +kubebuilder:default:=false + Suspend *bool `json:"suspend,omitempty"` } // MPIReplicaType is the type for MPIReplica. diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index b1f8d21be..569102ba5 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -109,6 +109,11 @@ func (in *MPIJobSpec) DeepCopyInto(out *MPIJobSpec) { (*out)[key] = outVal } } + if in.Suspend != nil { + in, out := &in.Suspend, &out.Suspend + *out = new(bool) + **out = **in + } return } diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 055dfe473..2f52e01e6 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -50,6 +50,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + "k8s.io/utils/pointer" podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" @@ -449,6 +450,7 @@ func (c *MPIJobController) processNextWorkItem() bool { // converge the two. It then updates the Status block of the MPIJob resource // with the current status of the resource. func (c *MPIJobController) syncHandler(key string) error { + klog.Infof("___ MYDEBUG starting for %s", key) startTime := time.Now() defer func() { klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) @@ -503,24 +505,13 @@ func (c *MPIJobController) syncHandler(key string) error { // cleanup and stop retrying the MPIJob. if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil { if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) { - // set worker StatefulSet Replicas to 0. - if err := c.deleteWorkerPods(mpiJob); err != nil { - return err - } - initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) - if c.gangSchedulerName != "" { - if err := c.deletePodGroups(mpiJob); err != nil { - return err - } - } - mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0 - return c.updateStatusHandler(mpiJob) + return cleanUpPods(mpiJob, c) } return nil } // first set StartTime. - if mpiJob.Status.StartTime == nil { + if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) { now := metav1.Now() mpiJob.Status.StartTime = &now } @@ -555,10 +546,11 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } - - worker, err = c.getOrCreateWorker(mpiJob) - if err != nil { - return err + if !isMPIJobSuspended(mpiJob) { + worker, err = c.getOrCreateWorker(mpiJob) + if err != nil { + return err + } } if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel { // The Intel implementation requires workers to communicate with the @@ -585,9 +577,48 @@ func (c *MPIJobController) syncHandler(key string) error { return err } + if launcher != nil { + launcherSuspendUpdate := false + if isMPIJobSuspended(mpiJob) && !isJobSuspended(launcher) { + // suspend the launcher first if the MPI job is suspended + launcherSuspendUpdate = true + launcher.Spec.Suspend = pointer.Bool(true) + } else if !isMPIJobSuspended(mpiJob) && isJobSuspended(launcher) { + launcherSuspendUpdate = true + // unsuspend the launcher first if the MPI job is unsuspended + launcher.Spec.Suspend = pointer.Bool(false) + } + if launcherSuspendUpdate { + if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil { + return err + } + } + } + + // cleanup the running worker pods if the MPI job is suspended + if isMPIJobSuspended(mpiJob) { + if err := cleanUpPods(mpiJob, c); err != nil { + return err + } + } return nil } +func cleanUpPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error { + // set worker StatefulSet Replicas to 0. + if err := c.deleteWorkerPods(mpiJob); err != nil { + return err + } + initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) + if c.gangSchedulerName != "" { + if err := c.deletePodGroups(mpiJob); err != nil { + return err + } + } + mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0 + return c.updateStatusHandler(mpiJob) +} + // getLauncherJob gets the launcher Job controlled by this MPIJob. func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) { launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) @@ -857,6 +888,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 return workerPods, nil } +func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool { + return pointer.BoolDeref(mpiJob.Spec.Suspend, false) +} + +func isJobSuspended(job *batchv1.Job) bool { + return pointer.BoolDeref(job.Spec.Suspend, false) +} + func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { var ( workerPrefix = mpiJob.Name + workerSuffix @@ -1304,7 +1343,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 } func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job { - return &batchv1.Job{ + job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: mpiJob.Name + launcherSuffix, Namespace: mpiJob.Namespace, @@ -1322,6 +1361,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job Template: c.newLauncherPodTemplate(mpiJob), }, } + if isMPIJobSuspended(mpiJob) { + job.Spec.Suspend = pointer.Bool(true) + } + return job } // newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets diff --git a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md index fc32dade9..64474a3a9 100644 --- a/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md +++ b/sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **run_policy** | [**V1RunPolicy**](V1RunPolicy.md) | | [optional] **slots_per_worker** | **int** | Specifies the number of slots per worker used in hostfile. Defaults to 1. | [optional] **ssh_auth_mount_path** | **str** | SSHAuthMountPath is the directory where SSH keys are mounted. Defaults to \"/root/.ssh\". | [optional] +**suspend** | **bool** | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false. Defaults to false. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py index 7a0527a5f..a5f52181b 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py @@ -37,7 +37,8 @@ class V2beta1MPIJobSpec(object): 'mpi_replica_specs': 'dict(str, V1ReplicaSpec)', 'run_policy': 'V1RunPolicy', 'slots_per_worker': 'int', - 'ssh_auth_mount_path': 'str' + 'ssh_auth_mount_path': 'str', + 'suspend': 'bool' } attribute_map = { @@ -45,10 +46,11 @@ class V2beta1MPIJobSpec(object): 'mpi_replica_specs': 'mpiReplicaSpecs', 'run_policy': 'runPolicy', 'slots_per_worker': 'slotsPerWorker', - 'ssh_auth_mount_path': 'sshAuthMountPath' + 'ssh_auth_mount_path': 'sshAuthMountPath', + 'suspend': 'suspend' } - def __init__(self, mpi_implementation=None, mpi_replica_specs=None, run_policy=None, slots_per_worker=None, ssh_auth_mount_path=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, mpi_implementation=None, mpi_replica_specs=None, run_policy=None, slots_per_worker=None, ssh_auth_mount_path=None, suspend=None, local_vars_configuration=None): # noqa: E501 """V2beta1MPIJobSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -59,6 +61,7 @@ def __init__(self, mpi_implementation=None, mpi_replica_specs=None, run_policy=N self._run_policy = None self._slots_per_worker = None self._ssh_auth_mount_path = None + self._suspend = None self.discriminator = None if mpi_implementation is not None: @@ -70,6 +73,8 @@ def __init__(self, mpi_implementation=None, mpi_replica_specs=None, run_policy=N self.slots_per_worker = slots_per_worker if ssh_auth_mount_path is not None: self.ssh_auth_mount_path = ssh_auth_mount_path + if suspend is not None: + self.suspend = suspend @property def mpi_implementation(self): @@ -186,6 +191,29 @@ def ssh_auth_mount_path(self, ssh_auth_mount_path): self._ssh_auth_mount_path = ssh_auth_mount_path + @property + def suspend(self): + """Gets the suspend of this V2beta1MPIJobSpec. # noqa: E501 + + suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false. Defaults to false. # noqa: E501 + + :return: The suspend of this V2beta1MPIJobSpec. # noqa: E501 + :rtype: bool + """ + return self._suspend + + @suspend.setter + def suspend(self, suspend): + """Sets the suspend of this V2beta1MPIJobSpec. + + suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false. Defaults to false. # noqa: E501 + + :param suspend: The suspend of this V2beta1MPIJobSpec. # noqa: E501 + :type suspend: bool + """ + + self._suspend = suspend + def to_dict(self, serialize=False): """Returns the model properties as a dict""" result = {}