From 735738269e67d1e514bb5f00c4c7564e7b271467 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Thu, 2 Feb 2023 10:17:47 +0100 Subject: [PATCH] Changes - add unit tests for creating suspended, suspending and resuming - use fake clock for unit tests - do not return from the syncHandler after worker pods cleanup on suspend - this allows to continue with the MPIJob update in the same sync --- pkg/controller/mpi_job_controller.go | 73 +++++-- pkg/controller/mpi_job_controller_status.go | 2 + pkg/controller/mpi_job_controller_test.go | 228 +++++++++++++++++++- 3 files changed, 275 insertions(+), 28 deletions(-) diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 225f03dd..44ed126c 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -51,6 +51,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + "k8s.io/utils/clock" "k8s.io/utils/pointer" podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" @@ -245,6 +246,9 @@ type MPIJobController struct { // To allow injection of updateStatus for testing. updateStatusHandler func(mpijob *kubeflow.MPIJob) error + + // Clock for internal use of unit-testing + clock clock.WithTicker } // NewMPIJobController returns a new MPIJob controller. @@ -260,6 +264,26 @@ func NewMPIJobController( podgroupsInformer podgroupsinformer.PodGroupInformer, mpiJobInformer informers.MPIJobInformer, gangSchedulerName string) *MPIJobController { + return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet, + configMapInformer, secretInformer, serviceInformer, jobInformer, + podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName, + &clock.RealClock{}) +} + +// NewMPIJobController returns a new MPIJob controller. +func NewMPIJobControllerWithClock( + kubeClient kubernetes.Interface, + kubeflowClient clientset.Interface, + volcanoClientSet volcanoclient.Interface, + configMapInformer coreinformers.ConfigMapInformer, + secretInformer coreinformers.SecretInformer, + serviceInformer coreinformers.ServiceInformer, + jobInformer batchinformers.JobInformer, + podInformer coreinformers.PodInformer, + podgroupsInformer podgroupsinformer.PodGroupInformer, + mpiJobInformer informers.MPIJobInformer, + gangSchedulerName string, + clock clock.WithTicker) *MPIJobController { // Create event broadcaster. klog.V(4).Info("Creating event broadcaster") @@ -296,6 +320,7 @@ func NewMPIJobController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), recorder: recorder, gangSchedulerName: gangSchedulerName, + clock: clock, } controller.updateStatusHandler = controller.doUpdateJobStatus @@ -451,9 +476,9 @@ func (c *MPIJobController) processNextWorkItem() bool { // converge the two. It then updates the Status block of the MPIJob resource // with the current status of the resource. func (c *MPIJobController) syncHandler(key string) error { - startTime := time.Now() + startTime := c.clock.Now() defer func() { - klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) + klog.Infof("Finished syncing job %q (%v)", key, c.clock.Since(startTime)) }() // Convert the namespace/name string into a distinct namespace and name. @@ -505,7 +530,10 @@ func (c *MPIJobController) syncHandler(key string) error { // cleanup and stop retrying the MPIJob. if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil { if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) { - return cleanUpWorkerPods(mpiJob, c) + if err := cleanUpWorkerPods(mpiJob, c); err != nil { + return err + } + return c.updateStatusHandler(mpiJob) } return nil } @@ -570,13 +598,6 @@ func (c *MPIJobController) syncHandler(key string) error { } } - // Finally, we update the status block of the MPIJob resource to reflect the - // current state of the world. - err = c.updateMPIJobStatus(mpiJob, launcher, worker) - if err != nil { - return err - } - if launcher != nil { if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) { // align the suspension state of launcher with the MPIJob @@ -593,6 +614,14 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } + + // Finally, we update the status block of the MPIJob resource to reflect the + // current state of the world. + err = c.updateMPIJobStatus(mpiJob, launcher, worker) + if err != nil { + return err + } + return nil } @@ -608,7 +637,7 @@ func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error { } } mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0 - return c.updateStatusHandler(mpiJob) + return nil } // getLauncherJob gets the launcher Job controlled by this MPIJob. @@ -932,23 +961,23 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker []*corev1.Pod) error { oldStatus := mpiJob.Status.DeepCopy() - launcherPods, err := c.jobPods(launcher) - if err != nil { - return fmt.Errorf("checking launcher pods running: %w", err) - } if isMPIJobSuspended(mpiJob) { // it is suspended now - if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") { + if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") { c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobSuspended", "MPIJob suspended") } } else if getCondition(mpiJob.Status, kubeflow.JobSuspended) != nil { // it is not suspended now, consider resumed if the condition was set before - if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, "MPIJobResumed", "MPIJob resumed") { + if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, mpiJobResumedReason, "MPIJob resumed") { c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobResumed", "MPIJob resumed") - now := metav1.NewTime(time.Now()) + now := metav1.NewTime(c.clock.Now()) mpiJob.Status.StartTime = &now } } + launcherPods, err := c.jobPods(launcher) + if err != nil { + return fmt.Errorf("checking launcher pods running: %w", err) + } // Job.status.Active accounts for Pending and Running pods. Count running pods // from the lister instead. launcherPodsCnt := countRunningPods(launcherPods) @@ -1001,13 +1030,13 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg) } - if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { + if isMPIJobSuspended(mpiJob) { + msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + } else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg) c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name) - } else if isMPIJobSuspended(mpiJob) { - msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) } // no need to update the mpijob if the status hasn't changed since last time. diff --git a/pkg/controller/mpi_job_controller_status.go b/pkg/controller/mpi_job_controller_status.go index 12edcf8c..31a8c0f2 100644 --- a/pkg/controller/mpi_job_controller_status.go +++ b/pkg/controller/mpi_job_controller_status.go @@ -31,6 +31,8 @@ const ( mpiJobRunningReason = "MPIJobRunning" // mpiJobSuspendedReason is added in a mpijob when it is suspended. mpiJobSuspendedReason = "MPIJobSuspended" + // mpiJobResumedReason is added in a mpijob when it is resumed. + mpiJobResumedReason = "MPIJobResumed" // mpiJobFailedReason is added in a mpijob when it is failed. mpiJobFailedReason = "MPIJobFailed" // mpiJobEvict diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index d62d1030..dfb847c2 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -35,6 +35,9 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/utils/clock" + clocktesting "k8s.io/utils/clock/testing" + "k8s.io/utils/pointer" podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" volcanofake "volcano.sh/apis/pkg/client/clientset/versioned/fake" @@ -146,7 +149,7 @@ func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.T return mpiJob } -func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newController(gangSchedulerName string, clock clock.WithTicker) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { f.client = fake.NewSimpleClientset(f.objects...) f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) @@ -156,7 +159,7 @@ func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, in volcanoInformerFactory := volcanoinformers.NewSharedInformerFactory(f.volcanoClient, 0) podgroupsInformer := volcanoInformerFactory.Scheduling().V1beta1().PodGroups() - c := NewMPIJobController( + c := NewMPIJobControllerWithClock( f.kubeClient, f.client, f.volcanoClient, @@ -168,6 +171,7 @@ func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, in podgroupsInformer, i.Kubeflow().V2beta1().MPIJobs(), gangSchedulerName, + clock, ) c.configMapSynced = alwaysReady @@ -231,15 +235,19 @@ func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, in } func (f *fixture) run(mpiJobName string) { - f.runController(mpiJobName, true, false, "") + f.runWithClock(mpiJobName, clock.RealClock{}) +} + +func (f *fixture) runWithClock(mpiJobName string, clock clock.WithTicker) { + f.runController(mpiJobName, true, false, "", clock) } func (f *fixture) runExpectError(mpiJobName string) { - f.runController(mpiJobName, true, true, "") + f.runController(mpiJobName, true, true, "", clock.RealClock{}) } -func (f *fixture) runController(mpiJobName string, startInformers, expectError bool, gangSchedulerName string) { - c, i, k8sI := f.newController(gangSchedulerName) +func (f *fixture) runController(mpiJobName string, startInformers, expectError bool, gangSchedulerName string, clock clock.WithTicker) { + c, i, k8sI := f.newController(gangSchedulerName, clock) if startInformers { stopCh := make(chan struct{}) defer close(stopCh) @@ -360,6 +368,11 @@ func (f *fixture) expectCreateJobAction(d *batchv1.Job) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "jobs", Group: "batch"}, d.Namespace, d)) } +func (f *fixture) expectUpdateJobAction(job *batchv1.Job) { + action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "jobs", Group: "batch", Version: "v1"}, job.Namespace, job) + f.kubeActions = append(f.kubeActions, action) +} + func (f *fixture) expectCreatePodAction(d *corev1.Pod) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "pods"}, d.Namespace, d)) } @@ -752,6 +765,209 @@ func TestShutdownWorker(t *testing.T) { f.run(getKey(mpiJob, t)) } +func TestCreateSuspendedMPIJob(t *testing.T) { + impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel} + for _, implementation := range impls { + t.Run(string(implementation), func(t *testing.T) { + f := newFixture(t) + + // create a suspended job + var replicas int32 = 8 + mpiJob := newMPIJob("test", &replicas, nil, nil) + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + mpiJob.Spec.MPIImplementation = implementation + f.setUpMPIJob(mpiJob) + + // expect creation of objects + scheme.Scheme.Default(mpiJob) + f.expectCreateServiceAction(newWorkersService(mpiJob)) + cfgMap := newConfigMap(mpiJob, replicas) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + f.expectCreateConfigMapAction(cfgMap) + secret, err := newSSHAuthSecret(mpiJob) + if err != nil { + t.Fatalf("Failed creating secret") + } + f.expectCreateSecretAction(secret) + if implementation == kubeflow.MPIImplementationIntel { + f.expectCreateServiceAction(newLauncherService(mpiJob)) + } + + // expect creating of the launcher + fmjc := f.newFakeMPIJobController() + launcher := fmjc.newLauncherJob(mpiJob) + launcher.Spec.Suspend = pointer.Bool(true) + f.expectCreateJobAction(launcher) + + // expect an update to add the conditions + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ + common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {}, + common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + } + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, common.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") + msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) + }) + } +} + +func TestSuspendedRunningMPIJob(t *testing.T) { + f := newFixture(t) + + // setup a running MPIJob with a launcher + var replicas int32 = 8 + startTime := metav1.Now() + mpiJob := newMPIJob("test", &replicas, &startTime, nil) + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false) + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, common.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) + msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg) + + mpiJob.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ + common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + Active: 1, + }, + common.ReplicaType(kubeflow.MPIReplicaTypeWorker): { + Active: replicas, + }, + } + + f.setUpMPIJob(mpiJob) + + // setup workers + fmjc := f.newFakeMPIJobController() + var runningPodList []*corev1.Pod + for i := 0; i < int(replicas); i++ { + worker := fmjc.newWorker(mpiJob, i) + worker.Status.Phase = corev1.PodRunning + runningPodList = append(runningPodList, worker) + f.setUpPod(worker) + } + + // setup objects + scheme.Scheme.Default(mpiJob) + f.setUpService(newWorkersService(mpiJob)) + + cfgMap := newConfigMap(mpiJob, replicas) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList) + f.setUpConfigMap(cfgMap) + secret, err := newSSHAuthSecret(mpiJob) + if err != nil { + t.Fatalf("Failed creating secret") + } + f.setUpSecret(secret) + + // setup launcher and its pod + launcher := fmjc.newLauncherJob(mpiJob) + launcher.Spec.Suspend = pointer.Bool(false) + launcherPod := mockJobPod(launcher) + launcherPod.Status.Phase = corev1.PodRunning + f.setUpLauncher(launcher) + f.setUpPod(launcherPod) + + // transition the MPIJob into suspended state + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + + // expect moving the launcher pod into suspended state + launcherCopy := launcher.DeepCopy() + launcherCopy.Spec.Suspend = pointer.Bool(true) + f.expectUpdateJobAction(launcherCopy) + + // expect removal of the pods + for i := 0; i < int(replicas); i++ { + name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) + f.kubeActions = append(f.kubeActions, core.NewDeleteAction(schema.GroupVersionResource{Resource: "pods"}, mpiJob.Namespace, name)) + } + + // expect MPI job status update to add the suspend condition + mpiJobCopy := mpiJob.DeepCopy() + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") + msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJobCopy.Namespace, mpiJobCopy.Name) + updateMPIJobConditions(mpiJobCopy, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ + // the launcher pod remains active. In live system it gets deleted by + // the launcher's Job controller. + common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): { + Active: 1, + }, + common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + } + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestResumeMPIJob(t *testing.T) { + fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) + f := newFixture(t) + + // create a suspended job + var replicas int32 = 8 + startTime := metav1.Now() + mpiJob := newMPIJob("test", &replicas, &startTime, nil) + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, common.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") + msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + mpiJob.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{ + common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {}, + common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {}, + } + f.setUpMPIJob(mpiJob) + + // expect creation of objects + scheme.Scheme.Default(mpiJob) + f.expectCreateServiceAction(newWorkersService(mpiJob)) + cfgMap := newConfigMap(mpiJob, replicas) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + f.setUpConfigMap(cfgMap) + secret, err := newSSHAuthSecret(mpiJob) + if err != nil { + t.Fatalf("Failed creating secret") + } + f.setUpSecret(secret) + + // expect creating of the launcher + fmjc := f.newFakeMPIJobController() + launcher := fmjc.newLauncherJob(mpiJob) + launcher.Spec.Suspend = pointer.Bool(true) + f.setUpLauncher(launcher) + + // move the timer by a second so that the StartTime is updated after resume + fakeClock.Sleep(time.Second) + + // resume the MPIJob + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false) + + // expect creation of the pods + for i := 0; i < int(replicas); i++ { + worker := fmjc.newWorker(mpiJob, i) + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "pods"}, mpiJob.Namespace, worker)) + } + + // expect the launcher update to resume it + launcherCopy := launcher.DeepCopy() + launcherCopy.Spec.Suspend = pointer.Bool(false) + f.expectUpdateJobAction(launcherCopy) + + // expect an update to add the conditions + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.StartTime = &metav1.Time{Time: fakeClock.Now()} + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionFalse, "MPIJobResumed", "MPIJob resumed") + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.runWithClock(getKey(mpiJob, t), fakeClock) +} + func TestWorkerNotControlledByUs(t *testing.T) { f := newFixture(t) startTime := metav1.Now()