From b0375a8a98211dda325e884520ab837283351f30 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 9 Oct 2024 22:10:47 +0100 Subject: [PATCH 01/31] Attempt to requeue after correct period Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 87 +++++++++++++------ 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 3b96a310a..8e22042fb 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -315,40 +315,51 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName - retryErr := retry.RetryOnConflict( - retry.DefaultRetry, - func() error { - old, err := r.getSparkApplication(key) + + func() (ctrl.Result, error) { + old, err := r.getSparkApplication(key) + if err != nil { + return ctrl.Result{}, err + } + if old.Status.AppState.State != v1beta2.ApplicationStateFailedSubmission { + return ctrl.Result{}, nil + } + app := old.DeepCopy() + + if util.ShouldRetry(app) { + timeUntilNextRetryDue, err := timeUntilNextRetryDue(app) if err != nil { - return err - } - if old.Status.AppState.State != v1beta2.ApplicationStateFailedSubmission { - return nil + return ctrl.Result{}, err } - app := old.DeepCopy() - - if util.ShouldRetry(app) { - if isNextRetryDue(app) { - if r.validateSparkResourceDeletion(ctx, app) { - _ = r.submitSparkApplication(app) - } else { - if err := r.deleteSparkResources(ctx, app); err != nil { - logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) - } - return err + if timeUntilNextRetryDue < 0 { + if r.validateSparkResourceDeletion(ctx, app) { + _ = r.submitSparkApplication(app) + } else { + if err := r.deleteSparkResources(ctx, app); err != nil { + logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) } + return ctrl.Result{}, err } } else { - app.Status.AppState.State = v1beta2.ApplicationStateFailed - app.Status.TerminationTime = metav1.Now() - r.recordSparkApplicationEvent(app) + return ctrl.Result{RequeueAfter: timeUntilNextRetryDue}, nil } + } else { + app.Status.AppState.State = v1beta2.ApplicationStateFailed + app.Status.TerminationTime = metav1.Now() + r.recordSparkApplicationEvent(app) + } - if err := r.updateSparkApplicationStatus(ctx, app); err != nil { - return err - } - return nil - }, + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + + result, err = func() + + retryErr := retry.RetryOnConflict( + retry.DefaultRetry, + func() error { return err }, ) if retryErr != nil { logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) @@ -755,6 +766,28 @@ func isNextRetryDue(app *v1beta2.SparkApplication) bool { return currentTime.After(lastEventTime.Add(interval)) } +func timeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) { + var retryInterval *int64 + switch app.Status.AppState.State { + case v1beta2.ApplicationStateFailedSubmission: + retryInterval = app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval + case v1beta2.ApplicationStateFailing: + retryInterval = app.Spec.RestartPolicy.OnFailureRetryInterval + } + + attemptsDone := app.Status.SubmissionAttempts + lastAttemptTime := app.Status.LastSubmissionAttemptTime + if retryInterval == nil || lastAttemptTime.IsZero() || attemptsDone <= 0 { + return -1, fmt.Errorf("invalid retry interval (%v), last attempt time (%v) or attemptsDone (%v)", retryInterval, lastAttemptTime, attemptsDone) + } + + // Retry wait time is attempts*RetryInterval to do a linear backoff. + interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) + currentTime := time.Now() + logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval)) + return interval - currentTime.Sub(lastAttemptTime.Time), nil +} + // updateDriverState finds the driver pod of the application // and updates the driver state based on the current phase of the pod. func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkApplication) error { From 6c530570857a89689c30073fcfed2e0087b43547 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 9 Oct 2024 22:12:37 +0100 Subject: [PATCH 02/31] Syntactically correct Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 8e22042fb..0fa767f70 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -316,7 +316,7 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName - func() (ctrl.Result, error) { + result, err := func() (ctrl.Result, error) { old, err := r.getSparkApplication(key) if err != nil { return ctrl.Result{}, err @@ -353,9 +353,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte return ctrl.Result{}, err } return ctrl.Result{}, nil - } - - result, err = func() + }() retryErr := retry.RetryOnConflict( retry.DefaultRetry, @@ -363,9 +361,9 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte ) if retryErr != nil { logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) - return ctrl.Result{}, retryErr + return result, retryErr } - return ctrl.Result{}, nil + return result, nil } func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { From a468bbab2851bde8550513c2a5200681c1901a3b Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 9 Oct 2024 23:16:11 +0100 Subject: [PATCH 03/31] I think correct requeueing Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 69 ++++++++++--------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 0fa767f70..69237e561 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -316,48 +316,49 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName - result, err := func() (ctrl.Result, error) { - old, err := r.getSparkApplication(key) - if err != nil { - return ctrl.Result{}, err - } - if old.Status.AppState.State != v1beta2.ApplicationStateFailedSubmission { - return ctrl.Result{}, nil - } - app := old.DeepCopy() + var result ctrl.Result - if util.ShouldRetry(app) { - timeUntilNextRetryDue, err := timeUntilNextRetryDue(app) + retryErr := retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + old, err := r.getSparkApplication(key) if err != nil { - return ctrl.Result{}, err + return err } - if timeUntilNextRetryDue < 0 { - if r.validateSparkResourceDeletion(ctx, app) { - _ = r.submitSparkApplication(app) - } else { - if err := r.deleteSparkResources(ctx, app); err != nil { - logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) + if old.Status.AppState.State != v1beta2.ApplicationStateFailedSubmission { + return nil + } + app := old.DeepCopy() + + if util.ShouldRetry(app) { + timeUntilNextRetryDue, err := timeUntilNextRetryDue(app) + if err != nil { + return err + } + if timeUntilNextRetryDue < 0 { + if r.validateSparkResourceDeletion(ctx, app) { + _ = r.submitSparkApplication(app) + } else { + if err := r.deleteSparkResources(ctx, app); err != nil { + logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) + } + return err } - return ctrl.Result{}, err + } else { + // Make sure its requeued to retry + result.RequeueAfter = timeUntilNextRetryDue } } else { - return ctrl.Result{RequeueAfter: timeUntilNextRetryDue}, nil + app.Status.AppState.State = v1beta2.ApplicationStateFailed + app.Status.TerminationTime = metav1.Now() + r.recordSparkApplicationEvent(app) } - } else { - app.Status.AppState.State = v1beta2.ApplicationStateFailed - app.Status.TerminationTime = metav1.Now() - r.recordSparkApplicationEvent(app) - } - if err := r.updateSparkApplicationStatus(ctx, app); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - }() - - retryErr := retry.RetryOnConflict( - retry.DefaultRetry, - func() error { return err }, + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return err + } + return nil + }, ) if retryErr != nil { logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) From 127f2613c88a6ab5a14fb685c3e966b0763a9459 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 9 Oct 2024 23:32:30 +0100 Subject: [PATCH 04/31] Same treatment for the other retries Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 69237e561..55cd5f032 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -345,7 +345,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte return err } } else { - // Make sure its requeued to retry + // If we're waiting before retrying then reconcile will not modify anything, so we need to requeue. result.RequeueAfter = timeUntilNextRetryDue } } else { @@ -507,6 +507,9 @@ func (r *Reconciler) reconcileSucceedingSparkApplication(ctx context.Context, re func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName + + var result ctrl.Result + retryErr := retry.RetryOnConflict( retry.DefaultRetry, func() error { @@ -520,12 +523,19 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c app := old.DeepCopy() if util.ShouldRetry(app) { - if isNextRetryDue(app) { + timeUntilNextRetryDue, err := timeUntilNextRetryDue(app) + if err != nil { + return err + } + if timeUntilNextRetryDue < 0 { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace) return err } app.Status.AppState.State = v1beta2.ApplicationStatePendingRerun + } else { + // If we're waiting before retrying then reconcile will not modify anything, so we need to requeue. + result.RequeueAfter = timeUntilNextRetryDue } } else { app.Status.AppState.State = v1beta2.ApplicationStateFailed @@ -538,9 +548,9 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c ) if retryErr != nil { logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) - return ctrl.Result{}, retryErr + return result, retryErr } - return ctrl.Result{}, nil + return result, nil } func (r *Reconciler) reconcileCompletedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { From 6bb6c744e13ad4e5c44cda4f1a59e9d7eeab2c32 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 9 Oct 2024 23:51:50 +0100 Subject: [PATCH 05/31] Tidy Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 55cd5f032..a819bd46d 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -335,7 +335,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte if err != nil { return err } - if timeUntilNextRetryDue < 0 { + if timeUntilNextRetryDue <= 0 { if r.validateSparkResourceDeletion(ctx, app) { _ = r.submitSparkApplication(app) } else { @@ -527,7 +527,7 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c if err != nil { return err } - if timeUntilNextRetryDue < 0 { + if timeUntilNextRetryDue <= 0 { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace) return err @@ -759,22 +759,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error return nil } -// Helper func to determine if the next retry the SparkApplication is due now. -func isNextRetryDue(app *v1beta2.SparkApplication) bool { - retryInterval := app.Spec.RestartPolicy.OnFailureRetryInterval - attemptsDone := app.Status.SubmissionAttempts - lastEventTime := app.Status.LastSubmissionAttemptTime - if retryInterval == nil || lastEventTime.IsZero() || attemptsDone <= 0 { - return false - } - - // Retry if we have waited at-least equal to attempts*RetryInterval since we do a linear back-off. - interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) - currentTime := time.Now() - logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval)) - return currentTime.After(lastEventTime.Add(interval)) -} - func timeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) { var retryInterval *int64 switch app.Status.AppState.State { From 0d8e9462957ce5b72b2533cfe8a2fd10f09041d9 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 9 Oct 2024 23:56:21 +0100 Subject: [PATCH 06/31] Requeue after deleting resources Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index a819bd46d..97a6e705d 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -342,7 +342,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) } - return err + return fmt.Errorf("resources associated with SparkApplication name: %s namespace: %s, needed to be deleted", app.Name, app.Namespace) } } else { // If we're waiting before retrying then reconcile will not modify anything, so we need to requeue. From d2739a946e4006dac0f224c44c28e06406c68f55 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 00:36:02 +0100 Subject: [PATCH 07/31] Try to fix submission status updates Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 97a6e705d..5c002a308 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -337,7 +337,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte } if timeUntilNextRetryDue <= 0 { if r.validateSparkResourceDeletion(ctx, app) { - _ = r.submitSparkApplication(app) + err = r.submitSparkApplication(app) } else { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) @@ -652,9 +652,28 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa } // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. -func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error { +func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (returned_error error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + + defer func() { + app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 + app.Status.SubmissionID = uuid.New().String() + app.Status.LastSubmissionAttemptTime = metav1.Now() + + if returned_error == nil { + app.Status.AppState = v1beta2.ApplicationState{ + State: v1beta2.ApplicationStateSubmitted, + } + } else { + app.Status.AppState = v1beta2.ApplicationState{ + State: v1beta2.ApplicationStateFailedSubmission, + ErrorMessage: returned_error.Error(), + } + } + r.recordSparkApplicationEvent(app) + }() + if util.PrometheusMonitoringEnabled(app) { logger.Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace) if err := configPrometheusMonitoring(app, r.client); err != nil { @@ -729,7 +748,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error driverPodName := util.GetDriverPodName(app) app.Status.DriverInfo.PodName = driverPodName - app.Status.SubmissionID = uuid.New().String() sparkSubmitArgs, err := buildSparkSubmitArgs(app) if err != nil { return fmt.Errorf("failed to build spark-submit arguments: %v", err) @@ -739,7 +757,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) submitted, err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)) if err != nil { - r.recordSparkApplicationEvent(app) return fmt.Errorf("failed to run spark-submit: %v", err) } if !submitted { @@ -748,14 +765,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error // error gets returned from runSparkSubmit. If this is the case, we simply return. return nil } - - app.Status.AppState = v1beta2.ApplicationState{ - State: v1beta2.ApplicationStateSubmitted, - } - app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 - app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1 - app.Status.LastSubmissionAttemptTime = metav1.Now() - r.recordSparkApplicationEvent(app) return nil } From b2e169927c3858b40c204de7413e31de7108eb08 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 00:40:47 +0100 Subject: [PATCH 08/31] Tidy Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 10 ++-------- internal/controller/sparkapplication/submission.go | 12 ++++++------ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 5c002a308..a9b07b6b6 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -755,16 +755,10 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu // Try submitting the application by running spark-submit. logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) - submitted, err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)) - if err != nil { + spark_submit_err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)) + if spark_submit_err != nil { return fmt.Errorf("failed to run spark-submit: %v", err) } - if !submitted { - // The application may not have been submitted even if err == nil, e.g., when some - // state update caused an attempt to re-submit the application, in which case no - // error gets returned from runSparkSubmit. If this is the case, we simply return. - return nil - } return nil } diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index a51bd2e4e..ce7cb9733 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -43,10 +43,10 @@ func newSubmission(args []string, app *v1beta2.SparkApplication) *submission { } } -func runSparkSubmit(submission *submission) (bool, error) { +func runSparkSubmit(submission *submission) error { sparkHome, present := os.LookupEnv(common.EnvSparkHome) if !present { - return false, fmt.Errorf("env %s is not specified", common.EnvSparkHome) + return fmt.Errorf("env %s is not specified", common.EnvSparkHome) } command := filepath.Join(sparkHome, "bin", "spark-submit") cmd := exec.Command(command, submission.args...) @@ -58,14 +58,14 @@ func runSparkSubmit(submission *submission) (bool, error) { } // The driver pod of the application already exists. if strings.Contains(errorMsg, common.ErrorCodePodAlreadyExists) { - return false, fmt.Errorf("driver pod already exist") + return fmt.Errorf("driver pod already exist") } if errorMsg != "" { - return false, fmt.Errorf("failed to run spark-submit: %s", errorMsg) + return fmt.Errorf("failed to run spark-submit: %s", errorMsg) } - return false, fmt.Errorf("failed to run spark-submit: %v", err) + return fmt.Errorf("failed to run spark-submit: %v", err) } - return true, nil + return nil } // buildSparkSubmitArgs builds the arguments for spark-submit. From 9e18dcf78f78be784bc6fcdb86f4f2274e1a2ae1 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 00:49:33 +0100 Subject: [PATCH 09/31] Correct usage of submitSparkApplication Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index a9b07b6b6..cafae9f28 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -259,17 +259,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl. } app := old.DeepCopy() - if err := r.submitSparkApplication(app); err != nil { - logger.Error(err, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace) - app.Status = v1beta2.SparkApplicationStatus{ - AppState: v1beta2.ApplicationState{ - State: v1beta2.ApplicationStateFailedSubmission, - ErrorMessage: err.Error(), - }, - SubmissionAttempts: app.Status.SubmissionAttempts + 1, - LastSubmissionAttemptTime: metav1.Now(), - } - } + r.submitSparkApplication(app) if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err } @@ -337,7 +327,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte } if timeUntilNextRetryDue <= 0 { if r.validateSparkResourceDeletion(ctx, app) { - err = r.submitSparkApplication(app) + r.submitSparkApplication(app) } else { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) @@ -418,9 +408,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context, logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) r.recordSparkApplicationEvent(app) r.resetSparkApplicationStatus(app) - if err = r.submitSparkApplication(app); err != nil { - logger.Error(err, "Failed to run spark-submit", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - } + r.submitSparkApplication(app) } if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err @@ -655,7 +643,6 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (returned_error error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - defer func() { app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 app.Status.SubmissionID = uuid.New().String() @@ -665,6 +652,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateSubmitted, } + logger.Error(returned_error, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) } else { app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateFailedSubmission, From b9ee75e285321c99b77aee9294f4675ee33f2ef8 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 01:02:52 +0100 Subject: [PATCH 10/31] Fix error logging Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index cafae9f28..437f843c6 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -652,8 +652,8 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateSubmitted, } - logger.Error(returned_error, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) } else { + logger.Error(returned_error, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateFailedSubmission, ErrorMessage: returned_error.Error(), @@ -745,7 +745,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) spark_submit_err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)) if spark_submit_err != nil { - return fmt.Errorf("failed to run spark-submit: %v", err) + return fmt.Errorf("failed to run spark-submit: %v", spark_submit_err) } return nil } From 0f86884b72f359a1e4f340443ace726643a2c354 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 01:59:40 +0100 Subject: [PATCH 11/31] Bring back ExecutionAttempts increment that I forgot about Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 437f843c6..aced98652 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -652,6 +652,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateSubmitted, } + app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1 } else { logger.Error(returned_error, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) app.Status.AppState = v1beta2.ApplicationState{ From 413bb53560b14ae544f3a6ab05eddca45c0689ac Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 15:20:37 +0100 Subject: [PATCH 12/31] Log after reconcile complete Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index aced98652..74149d2d7 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -172,6 +172,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{Requeue: true}, err } logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + defer func() { + logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + }() // Check if the spark application is being deleted if !app.DeletionTimestamp.IsZero() { @@ -662,7 +665,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu } r.recordSparkApplicationEvent(app) }() - + if util.PrometheusMonitoringEnabled(app) { logger.Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace) if err := configPrometheusMonitoring(app, r.client); err != nil { From ff431875e2f367cfcf91b8428611af713a97bcbb Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 19:31:02 +0100 Subject: [PATCH 13/31] Fix setting submission ID Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 5 +++-- internal/controller/sparkapplication/event_handler.go | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 74149d2d7..88e647f16 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -173,6 +173,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) defer func() { + // TODO(tomnewton): This doesn't seem to be showing the state at end of reconsile. logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) }() @@ -645,11 +646,11 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (returned_error error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + app.Status.SubmissionID = uuid.New().String() // Must be set before calling spark-submit so it can be added to the driver pod labels. + app.Status.LastSubmissionAttemptTime = metav1.Now() defer func() { app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 - app.Status.SubmissionID = uuid.New().String() - app.Status.LastSubmissionAttemptTime = metav1.Now() if returned_error == nil { app.Status.AppState = v1beta2.ApplicationState{ diff --git a/internal/controller/sparkapplication/event_handler.go b/internal/controller/sparkapplication/event_handler.go index 0e2ee5896..ab99d1814 100644 --- a/internal/controller/sparkapplication/event_handler.go +++ b/internal/controller/sparkapplication/event_handler.go @@ -141,6 +141,7 @@ func (h *SparkPodEventHandler) enqueueSparkAppForUpdate(ctx context.Context, pod return } + logger.Info("Enqueueing SparkApplication for update", "key", key) queue.AddRateLimited(ctrl.Request{NamespacedName: key}) } From 6b5c4bff2586c3d8c163b5890116cac0350ce021 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 21:46:12 +0100 Subject: [PATCH 14/31] Tidy logging Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 88e647f16..b736c345c 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -173,8 +173,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) defer func() { - // TODO(tomnewton): This doesn't seem to be showing the state at end of reconsile. - logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace) }() // Check if the spark application is being deleted From 8a12c0c1b0d5df6541b719ce3d8b37be344a3200 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 22:10:57 +0100 Subject: [PATCH 15/31] Tidy Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index b736c345c..b7647fc3d 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -172,9 +172,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{Requeue: true}, err } logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - defer func() { - logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace) - }() + defer logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace) // Check if the spark application is being deleted if !app.DeletionTimestamp.IsZero() { From af81e48215b87ef5ab5ecd72408bc6913ca63936 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 10 Oct 2024 22:52:28 +0100 Subject: [PATCH 16/31] Tidy Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/event_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/controller/sparkapplication/event_handler.go b/internal/controller/sparkapplication/event_handler.go index ab99d1814..0e2ee5896 100644 --- a/internal/controller/sparkapplication/event_handler.go +++ b/internal/controller/sparkapplication/event_handler.go @@ -141,7 +141,6 @@ func (h *SparkPodEventHandler) enqueueSparkAppForUpdate(ctx context.Context, pod return } - logger.Info("Enqueueing SparkApplication for update", "key", key) queue.AddRateLimited(ctrl.Request{NamespacedName: key}) } From c683e2a41a5856d3c8d74f844192a6ecc21512eb Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 14:01:33 +0100 Subject: [PATCH 17/31] Update comment Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index b7647fc3d..1fcc263ce 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -643,7 +643,8 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (returned_error error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - app.Status.SubmissionID = uuid.New().String() // Must be set before calling spark-submit so it can be added to the driver pod labels. + // SubmissionID must be set before creating any resources to ensure all the resources are labeled. + app.Status.SubmissionID = uuid.New().String() app.Status.LastSubmissionAttemptTime = metav1.Now() defer func() { From 8041a129b3d0bd5877db9b1d5b223aa3ccd81325 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 14:02:47 +0100 Subject: [PATCH 18/31] Start a new test Signed-off-by: Thomas Newton --- examples/fail-submission.yaml | 44 ++++++++++++++++++ test/e2e/sparkapplication_test.go | 77 +++++++++++++++++++++++++++++++ test/e2e/suit_test.go | 5 +- 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 examples/fail-submission.yaml diff --git a/examples/fail-submission.yaml b/examples/fail-submission.yaml new file mode 100644 index 000000000..b5eaf217a --- /dev/null +++ b/examples/fail-submission.yaml @@ -0,0 +1,44 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: fail-submission + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.2 + imagePullPolicy: IfNotPresent + mainClass: non-existent + mainApplicationFile: local:///non-existent.jar + sparkVersion: 3.5.2 + restartPolicy: + type: OnFailure + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 1 + driver: + serviceAccount: non-existent + labels: + version: 3.5.2 + cores: 1 + memory: 512m + executor: + labels: + version: 3.5.2 + instances: 1 + cores: 1 + memory: 512m diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index 113326cea..d0a33fb23 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -18,17 +18,22 @@ package e2e_test import ( "context" + "fmt" "os" "path/filepath" "strings" + // "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" @@ -236,6 +241,78 @@ var _ = Describe("Example SparkApplication", func() { }) }) + Context("fail-submission", func() { + ctx := context.Background() + path := filepath.Join("..", "..", "examples", "fail-submission.yaml") + app := &v1beta2.SparkApplication{} + + BeforeEach(func() { + By("Parsing SparkApplication from file") + file, err := os.Open(path) + Expect(err).NotTo(HaveOccurred()) + Expect(file).NotTo(BeNil()) + + decoder := yaml.NewYAMLOrJSONDecoder(file, 100) + Expect(decoder).NotTo(BeNil()) + Expect(decoder.Decode(app)).NotTo(HaveOccurred()) + + By("Creating SparkApplication") + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + }) + + AfterEach(func() { + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + + It("Fails submission and retries until retries are exhausted", func() { + By("Waiting for SparkApplication to complete") + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + Expect(waitForSparkApplicationCompleted(ctx, key)).To(HaveOccurred()) + + app := &v1beta2.SparkApplication{} + fetch_app_err := k8sClient.Get(ctx, key, app) + Expect(fetch_app_err).NotTo(HaveOccurred()) + Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) + Expect(app.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit")) + Expect(app.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1)) + + By("Checking SparkApplication events") + eventList := &corev1.EventList{} + err := k8sClient.List(ctx, eventList, &client.ListOptions{ + Namespace: app.Namespace, + FieldSelector: fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.kind", "SparkApplication"), + fields.OneTermEqualSelector("involvedObject.name", app.Name), + // fields.OneTermEqualSelector("involvedObject.uid", string(app.ObjectMeta.UID)), + ), + }) + Expect(err).NotTo(HaveOccurred()) + + var sparkAppEvents []corev1.Event + for _, event := range eventList.Items { + if event.InvolvedObject.Kind == "SparkApplication" && event.InvolvedObject.Name == app.Name { + sparkAppEvents = append(sparkAppEvents, event) + } + } + // "spark-application-controller" + By("Printing SparkApplication events") + for _, event := range sparkAppEvents { + fmt.Printf("Event: %v, Reason: %v, Message: %v\n", event.LastTimestamp, event.Reason, event.Message) + } + + By("Checking driver does not exist") + driverPodName := util.GetDriverPodName(app) + _, get_driver_err := clientset.CoreV1().Pods(app.Namespace).Get(ctx, driverPodName, metav1.GetOptions{}) + Expect(get_driver_err).To(HaveOccurred()) + // TODO(tomnewton): Switch to proper not found error code + Expect(strings.Contains(get_driver_err.Error(), "not found")).To(BeTrue()) + }) + }) + Context("spark-pi-python", func() { ctx := context.Background() path := filepath.Join("..", "..", "examples", "spark-pi-python.yaml") diff --git a/test/e2e/suit_test.go b/test/e2e/suit_test.go index 92167082a..407d66e7a 100644 --- a/test/e2e/suit_test.go +++ b/test/e2e/suit_test.go @@ -262,8 +262,9 @@ func waitForSparkApplicationCompleted(ctx context.Context, key types.NamespacedN return false, err } switch app.Status.AppState.State { - case v1beta2.ApplicationStateFailedSubmission, v1beta2.ApplicationStateFailed: - return false, errors.New(app.Status.AppState.ErrorMessage) + case v1beta2.ApplicationStateFailed: + // TODO: Try combining this case with the one below. + return true, errors.New(app.Status.AppState.ErrorMessage) case v1beta2.ApplicationStateCompleted: return true, nil } From 2f32f3b1b52f2633116de3cf309696fcd0c838d6 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 14:34:30 +0100 Subject: [PATCH 19/31] Working Fails submission and retries until retries are exhausted test Signed-off-by: Thomas Newton --- test/e2e/sparkapplication_test.go | 55 ++++++++++--------------------- test/e2e/suit_test.go | 24 ++++++++++++++ 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index d0a33fb23..c785c16c1 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -18,22 +18,18 @@ package e2e_test import ( "context" - "fmt" "os" "path/filepath" "strings" - // "time" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "sigs.k8s.io/controller-runtime/pkg/client" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" @@ -271,45 +267,28 @@ var _ = Describe("Example SparkApplication", func() { It("Fails submission and retries until retries are exhausted", func() { By("Waiting for SparkApplication to complete") key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} - Expect(waitForSparkApplicationCompleted(ctx, key)).To(HaveOccurred()) - - app := &v1beta2.SparkApplication{} - fetch_app_err := k8sClient.Get(ctx, key, app) - Expect(fetch_app_err).NotTo(HaveOccurred()) - Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) - Expect(app.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit")) - Expect(app.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1)) - - By("Checking SparkApplication events") - eventList := &corev1.EventList{} - err := k8sClient.List(ctx, eventList, &client.ListOptions{ - Namespace: app.Namespace, - FieldSelector: fields.AndSelectors( - fields.OneTermEqualSelector("involvedObject.kind", "SparkApplication"), - fields.OneTermEqualSelector("involvedObject.name", app.Name), - // fields.OneTermEqualSelector("involvedObject.uid", string(app.ObjectMeta.UID)), - ), - }) - Expect(err).NotTo(HaveOccurred()) - - var sparkAppEvents []corev1.Event - for _, event := range eventList.Items { - if event.InvolvedObject.Kind == "SparkApplication" && event.InvolvedObject.Name == app.Name { - sparkAppEvents = append(sparkAppEvents, event) - } + apps, polling_err := collectStatusesUntilSparkApplicationTerminates(ctx, key) + Expect(polling_err).To(HaveOccurred()) + + By("Should eventually fail") + final_app := apps[len(apps)-1] + Expect(final_app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) + Expect(final_app.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit")) + Expect(final_app.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1)) + + By("Only valid statuses appear in other apps") + validStatuses := []v1beta2.ApplicationStateType{ + v1beta2.ApplicationStateNew, + v1beta2.ApplicationStateFailedSubmission, } - // "spark-application-controller" - By("Printing SparkApplication events") - for _, event := range sparkAppEvents { - fmt.Printf("Event: %v, Reason: %v, Message: %v\n", event.LastTimestamp, event.Reason, event.Message) + for _, app := range apps[:len(apps)-1] { + Expect(validStatuses).To(ContainElement(app.Status.AppState.State)) } By("Checking driver does not exist") driverPodName := util.GetDriverPodName(app) _, get_driver_err := clientset.CoreV1().Pods(app.Namespace).Get(ctx, driverPodName, metav1.GetOptions{}) - Expect(get_driver_err).To(HaveOccurred()) - // TODO(tomnewton): Switch to proper not found error code - Expect(strings.Contains(get_driver_err.Error(), "not found")).To(BeTrue()) + Expect(errors.IsNotFound(get_driver_err)).To(BeTrue()) }) }) diff --git a/test/e2e/suit_test.go b/test/e2e/suit_test.go index 407d66e7a..2a4c46bdd 100644 --- a/test/e2e/suit_test.go +++ b/test/e2e/suit_test.go @@ -272,3 +272,27 @@ func waitForSparkApplicationCompleted(ctx context.Context, key types.NamespacedN }) return err } + + +func collectStatusesUntilSparkApplicationTerminates(ctx context.Context, key types.NamespacedName) ([]v1beta2.SparkApplication, error) { + cancelCtx, cancelFunc := context.WithTimeout(ctx, WaitTimeout) + defer cancelFunc() + + apps := []v1beta2.SparkApplication{} + + err := wait.PollUntilContextCancel(cancelCtx, PollInterval, true, func(ctx context.Context) (bool, error) { + app := v1beta2.SparkApplication{} + if err := k8sClient.Get(ctx, key, &app); err != nil { + return false, err + } + apps = append(apps, app) + switch app.Status.AppState.State { + case v1beta2.ApplicationStateFailed: + return true, errors.New(app.Status.AppState.ErrorMessage) + case v1beta2.ApplicationStateCompleted: + return true, nil + } + return false, nil + }) + return apps, err +} From 9df8df818f3da130e70d736174e6a4cb77f8d9af Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 14:53:45 +0100 Subject: [PATCH 20/31] Add Application fails and retries until retries are exhausted Signed-off-by: Thomas Newton --- examples/application-fail.yaml | 44 +++++++++++++++++++++++ test/e2e/sparkapplication_test.go | 60 +++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 examples/application-fail.yaml diff --git a/examples/application-fail.yaml b/examples/application-fail.yaml new file mode 100644 index 000000000..05991fd72 --- /dev/null +++ b/examples/application-fail.yaml @@ -0,0 +1,44 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: fail-submission + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.2 + imagePullPolicy: IfNotPresent + mainClass: non-existent + mainApplicationFile: local:///non-existent.jar + sparkVersion: 3.5.2 + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 1 + driver: + labels: + version: 3.5.2 + cores: 1 + memory: 512m + serviceAccount: spark-operator-spark + executor: + labels: + version: 3.5.2 + instances: 1 + cores: 1 + memory: 512m diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index c785c16c1..ae5531f89 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -292,6 +292,66 @@ var _ = Describe("Example SparkApplication", func() { }) }) + Context("application-fails", func() { + ctx := context.Background() + path := filepath.Join("..", "..", "examples", "application-fail.yaml") + app := &v1beta2.SparkApplication{} + + BeforeEach(func() { + By("Parsing SparkApplication from file") + file, err := os.Open(path) + Expect(err).NotTo(HaveOccurred()) + Expect(file).NotTo(BeNil()) + + decoder := yaml.NewYAMLOrJSONDecoder(file, 100) + Expect(decoder).NotTo(BeNil()) + Expect(decoder.Decode(app)).NotTo(HaveOccurred()) + + By("Creating SparkApplication") + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + }) + + AfterEach(func() { + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + + It("Application fails and retries until retries are exhausted", func() { + By("Waiting for SparkApplication to complete") + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + apps, polling_err := collectStatusesUntilSparkApplicationTerminates(ctx, key) + Expect(polling_err).To(HaveOccurred()) + + By("Should eventually fail") + final_app := apps[len(apps)-1] + Expect(final_app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) + Expect(final_app.Status.AppState.ErrorMessage).To(ContainSubstring("driver container failed")) + Expect(final_app.Status.ExecutionAttempts).To(Equal(*app.Spec.RestartPolicy.OnFailureRetries + 1)) + + By("Only valid statuses appear in other apps") + validStatuses := []v1beta2.ApplicationStateType{ + v1beta2.ApplicationStateNew, + v1beta2.ApplicationStateSubmitted, + v1beta2.ApplicationStateRunning, + v1beta2.ApplicationStateFailing, + v1beta2.ApplicationStatePendingRerun, + } + for _, app := range apps[:len(apps)-1] { + Expect(validStatuses).To(ContainElement(app.Status.AppState.State)) + } + + By("Checking out driver logs") + driverPodName := util.GetDriverPodName(app) + bytes, err := clientset.CoreV1().Pods(app.Namespace).GetLogs(driverPodName, &corev1.PodLogOptions{}).Do(ctx).Raw() + Expect(err).NotTo(HaveOccurred()) + Expect(bytes).NotTo(BeEmpty()) + Expect(strings.Contains(string(bytes), "NoSuchFileException")).To(BeTrue()) + }) + }) + Context("spark-pi-python", func() { ctx := context.Background() path := filepath.Join("..", "..", "examples", "spark-pi-python.yaml") From 9ffa5a1b93d5acb8cae26532aa501501966d2937 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 15:02:36 +0100 Subject: [PATCH 21/31] Tidy Signed-off-by: Thomas Newton --- test/e2e/sparkapplication_test.go | 8 ++++---- test/e2e/suit_test.go | 10 ++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index ae5531f89..8d943c5e5 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -265,9 +265,9 @@ var _ = Describe("Example SparkApplication", func() { }) It("Fails submission and retries until retries are exhausted", func() { - By("Waiting for SparkApplication to complete") + By("Waiting for SparkApplication to terminate") key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} - apps, polling_err := collectStatusesUntilSparkApplicationTerminates(ctx, key) + apps, polling_err := collectSparkApplicationsUntilTermination(ctx, key) Expect(polling_err).To(HaveOccurred()) By("Should eventually fail") @@ -320,9 +320,9 @@ var _ = Describe("Example SparkApplication", func() { }) It("Application fails and retries until retries are exhausted", func() { - By("Waiting for SparkApplication to complete") + By("Waiting for SparkApplication to terminate") key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} - apps, polling_err := collectStatusesUntilSparkApplicationTerminates(ctx, key) + apps, polling_err := collectSparkApplicationsUntilTermination(ctx, key) Expect(polling_err).To(HaveOccurred()) By("Should eventually fail") diff --git a/test/e2e/suit_test.go b/test/e2e/suit_test.go index 2a4c46bdd..e9c534d94 100644 --- a/test/e2e/suit_test.go +++ b/test/e2e/suit_test.go @@ -262,9 +262,8 @@ func waitForSparkApplicationCompleted(ctx context.Context, key types.NamespacedN return false, err } switch app.Status.AppState.State { - case v1beta2.ApplicationStateFailed: - // TODO: Try combining this case with the one below. - return true, errors.New(app.Status.AppState.ErrorMessage) + case v1beta2.ApplicationStateFailedSubmission, v1beta2.ApplicationStateFailed: + return false, errors.New(app.Status.AppState.ErrorMessage) case v1beta2.ApplicationStateCompleted: return true, nil } @@ -273,13 +272,12 @@ func waitForSparkApplicationCompleted(ctx context.Context, key types.NamespacedN return err } - -func collectStatusesUntilSparkApplicationTerminates(ctx context.Context, key types.NamespacedName) ([]v1beta2.SparkApplication, error) { +func collectSparkApplicationsUntilTermination(ctx context.Context, key types.NamespacedName) ([]v1beta2.SparkApplication, error) { cancelCtx, cancelFunc := context.WithTimeout(ctx, WaitTimeout) defer cancelFunc() apps := []v1beta2.SparkApplication{} - + err := wait.PollUntilContextCancel(cancelCtx, PollInterval, true, func(ctx context.Context) (bool, error) { app := v1beta2.SparkApplication{} if err := k8sClient.Get(ctx, key, &app); err != nil { From 9b5e2c87e1d849edcc939d737df4fba3e2824b4a Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 15:06:49 +0100 Subject: [PATCH 22/31] Comments Signed-off-by: Thomas Newton --- examples/fail-submission.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/fail-submission.yaml b/examples/fail-submission.yaml index b5eaf217a..5d0d4451f 100644 --- a/examples/fail-submission.yaml +++ b/examples/fail-submission.yaml @@ -23,15 +23,15 @@ spec: mode: cluster image: spark:3.5.2 imagePullPolicy: IfNotPresent - mainClass: non-existent - mainApplicationFile: local:///non-existent.jar + mainClass: dummy + mainApplicationFile: local:///dummy.jar sparkVersion: 3.5.2 restartPolicy: type: OnFailure onSubmissionFailureRetries: 3 onSubmissionFailureRetryInterval: 1 driver: - serviceAccount: non-existent + serviceAccount: non-existent # This is the important part that causes submission to fail. labels: version: 3.5.2 cores: 1 From 80f74fa8d23b5e7a6f700b8facbdfa9975893e7c Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 15:16:13 +0100 Subject: [PATCH 23/31] Tidy Signed-off-by: Thomas Newton --- examples/fail-submission.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/fail-submission.yaml b/examples/fail-submission.yaml index 5d0d4451f..827fca741 100644 --- a/examples/fail-submission.yaml +++ b/examples/fail-submission.yaml @@ -31,11 +31,11 @@ spec: onSubmissionFailureRetries: 3 onSubmissionFailureRetryInterval: 1 driver: - serviceAccount: non-existent # This is the important part that causes submission to fail. labels: version: 3.5.2 cores: 1 memory: 512m + serviceAccount: non-existent # This is the important part that causes submission to fail. executor: labels: version: 3.5.2 From 0f890b36f60223a5cfca8bd11a21cfb90c059092 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 15:31:27 +0100 Subject: [PATCH 24/31] Move fail configs out of the examples directory Signed-off-by: Thomas Newton --- .../e2e/bad_examples/fail-application.yaml | 0 {examples => test/e2e/bad_examples}/fail-submission.yaml | 0 test/e2e/sparkapplication_test.go | 4 ++-- 3 files changed, 2 insertions(+), 2 deletions(-) rename examples/application-fail.yaml => test/e2e/bad_examples/fail-application.yaml (100%) rename {examples => test/e2e/bad_examples}/fail-submission.yaml (100%) diff --git a/examples/application-fail.yaml b/test/e2e/bad_examples/fail-application.yaml similarity index 100% rename from examples/application-fail.yaml rename to test/e2e/bad_examples/fail-application.yaml diff --git a/examples/fail-submission.yaml b/test/e2e/bad_examples/fail-submission.yaml similarity index 100% rename from examples/fail-submission.yaml rename to test/e2e/bad_examples/fail-submission.yaml diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index 8d943c5e5..94e1009ab 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -239,7 +239,7 @@ var _ = Describe("Example SparkApplication", func() { Context("fail-submission", func() { ctx := context.Background() - path := filepath.Join("..", "..", "examples", "fail-submission.yaml") + path := filepath.Join("bad_examples", "fail-submission.yaml") app := &v1beta2.SparkApplication{} BeforeEach(func() { @@ -294,7 +294,7 @@ var _ = Describe("Example SparkApplication", func() { Context("application-fails", func() { ctx := context.Background() - path := filepath.Join("..", "..", "examples", "application-fail.yaml") + path := filepath.Join("bad_examples", "fail-application.yaml") app := &v1beta2.SparkApplication{} BeforeEach(func() { From 2c64edbdaca9465636468c7227366444934dfff3 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 13 Oct 2024 23:04:07 +0100 Subject: [PATCH 25/31] Fix lint Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 1fcc263ce..b144c920e 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -260,7 +260,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl. } app := old.DeepCopy() - r.submitSparkApplication(app) + _ = r.submitSparkApplication(app) if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err } @@ -328,7 +328,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte } if timeUntilNextRetryDue <= 0 { if r.validateSparkResourceDeletion(ctx, app) { - r.submitSparkApplication(app) + _ = r.submitSparkApplication(app) } else { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) @@ -409,7 +409,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context, logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) r.recordSparkApplicationEvent(app) r.resetSparkApplicationStatus(app) - r.submitSparkApplication(app) + _ = r.submitSparkApplication(app) } if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err From c62cd54cd4e8d2df039f6c3f43e9c1d2c8e55ea4 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 18 Oct 2024 14:13:47 +0100 Subject: [PATCH 26/31] Move TimeUntilNextRetryDue to `pkg/util/sparkapplication.go` Signed-off-by: Thomas Newton --- .../controller/sparkapplication/controller.go | 26 ++---------------- pkg/util/sparkapplication.go | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index b144c920e..bdd32dfad 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -322,7 +322,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte app := old.DeepCopy() if util.ShouldRetry(app) { - timeUntilNextRetryDue, err := timeUntilNextRetryDue(app) + timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app) if err != nil { return err } @@ -512,7 +512,7 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c app := old.DeepCopy() if util.ShouldRetry(app) { - timeUntilNextRetryDue, err := timeUntilNextRetryDue(app) + timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app) if err != nil { return err } @@ -753,28 +753,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu return nil } -func timeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) { - var retryInterval *int64 - switch app.Status.AppState.State { - case v1beta2.ApplicationStateFailedSubmission: - retryInterval = app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval - case v1beta2.ApplicationStateFailing: - retryInterval = app.Spec.RestartPolicy.OnFailureRetryInterval - } - - attemptsDone := app.Status.SubmissionAttempts - lastAttemptTime := app.Status.LastSubmissionAttemptTime - if retryInterval == nil || lastAttemptTime.IsZero() || attemptsDone <= 0 { - return -1, fmt.Errorf("invalid retry interval (%v), last attempt time (%v) or attemptsDone (%v)", retryInterval, lastAttemptTime, attemptsDone) - } - - // Retry wait time is attempts*RetryInterval to do a linear backoff. - interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) - currentTime := time.Now() - logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval)) - return interval - currentTime.Sub(lastAttemptTime.Time), nil -} - // updateDriverState finds the driver pod of the application // and updates the driver state based on the current phase of the pod. func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkApplication) error { diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index 29b8dab81..b74a07819 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -29,6 +29,11 @@ import ( "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/pkg/common" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +var ( + logger = log.Log.WithName("") ) // GetDriverPodName returns name of the driver pod of the given spark application. @@ -104,6 +109,28 @@ func ShouldRetry(app *v1beta2.SparkApplication) bool { return false } +func TimeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) { + var retryInterval *int64 + switch app.Status.AppState.State { + case v1beta2.ApplicationStateFailedSubmission: + retryInterval = app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval + case v1beta2.ApplicationStateFailing: + retryInterval = app.Spec.RestartPolicy.OnFailureRetryInterval + } + + attemptsDone := app.Status.SubmissionAttempts + lastAttemptTime := app.Status.LastSubmissionAttemptTime + if retryInterval == nil || lastAttemptTime.IsZero() || attemptsDone <= 0 { + return -1, fmt.Errorf("invalid retry interval (%v), last attempt time (%v) or attemptsDone (%v)", retryInterval, lastAttemptTime, attemptsDone) + } + + // Retry wait time is attempts*RetryInterval to do a linear backoff. + interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) + currentTime := time.Now() + logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval)) + return interval - currentTime.Sub(lastAttemptTime.Time), nil +} + func GetLocalVolumes(app *v1beta2.SparkApplication) map[string]corev1.Volume { volumes := make(map[string]corev1.Volume) for _, volume := range app.Spec.Volumes { From b95d38f5396aeb44e0c4e06dfa52aa740255d7a6 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 18 Oct 2024 14:15:28 +0100 Subject: [PATCH 27/31] Update internal/controller/sparkapplication/controller.go Co-authored-by: Yi Chen Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index bdd32dfad..47a44ab18 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -746,9 +746,8 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (retu // Try submitting the application by running spark-submit. logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) - spark_submit_err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)) - if spark_submit_err != nil { - return fmt.Errorf("failed to run spark-submit: %v", spark_submit_err) + if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil { + return fmt.Errorf("failed to run spark-submit: %v", err) } return nil } From ce3bb6e7722c7069e515f80e8a84bd835f71cd3c Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 18 Oct 2024 14:17:38 +0100 Subject: [PATCH 28/31] Update test/e2e/sparkapplication_test.go Co-authored-by: Yi Chen Signed-off-by: Thomas Newton --- test/e2e/sparkapplication_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index 94e1009ab..bb3887583 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -287,8 +287,9 @@ var _ = Describe("Example SparkApplication", func() { By("Checking driver does not exist") driverPodName := util.GetDriverPodName(app) - _, get_driver_err := clientset.CoreV1().Pods(app.Namespace).Get(ctx, driverPodName, metav1.GetOptions{}) - Expect(errors.IsNotFound(get_driver_err)).To(BeTrue()) + driverPodKey := types.NamespacedName{Namespace: app.Namespace, Name: driverPodName} + err := k8sClient.Get(ctx, driverPodKey, &corev1.Pod{}) + Expect(errors.IsNotFound(err)).To(BeTrue()) }) }) From 223687fd441237badbc9a1ae081ee53aac375b92 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 18 Oct 2024 14:31:46 +0100 Subject: [PATCH 29/31] camelCase Signed-off-by: Thomas Newton --- test/e2e/sparkapplication_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index bb3887583..cc3895399 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -271,10 +271,10 @@ var _ = Describe("Example SparkApplication", func() { Expect(polling_err).To(HaveOccurred()) By("Should eventually fail") - final_app := apps[len(apps)-1] - Expect(final_app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) - Expect(final_app.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit")) - Expect(final_app.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1)) + finalApp := apps[len(apps)-1] + Expect(finalApp.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) + Expect(finalApp.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit")) + Expect(finalApp.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1)) By("Only valid statuses appear in other apps") validStatuses := []v1beta2.ApplicationStateType{ From f0dc727da507da279178ae83d3dac29d6cf49734 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 18 Oct 2024 14:36:47 +0100 Subject: [PATCH 30/31] make fo-fmt Signed-off-by: Thomas Newton --- test/e2e/sparkapplication_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index cc3895399..825129df0 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -287,8 +287,8 @@ var _ = Describe("Example SparkApplication", func() { By("Checking driver does not exist") driverPodName := util.GetDriverPodName(app) - driverPodKey := types.NamespacedName{Namespace: app.Namespace, Name: driverPodName} - err := k8sClient.Get(ctx, driverPodKey, &corev1.Pod{}) + driverPodKey := types.NamespacedName{Namespace: app.Namespace, Name: driverPodName} + err := k8sClient.Get(ctx, driverPodKey, &corev1.Pod{}) Expect(errors.IsNotFound(err)).To(BeTrue()) }) }) From 550780071539fe47b055822d407cbb68dfdb61e1 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 18 Oct 2024 17:44:39 +0100 Subject: [PATCH 31/31] PR comments Signed-off-by: Thomas Newton --- internal/controller/sparkapplication/controller.go | 11 +++++------ pkg/util/sparkapplication.go | 6 ------ 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 47a44ab18..7a707df00 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -641,25 +641,24 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa } // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. -func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (returned_error error) { +func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) // SubmissionID must be set before creating any resources to ensure all the resources are labeled. app.Status.SubmissionID = uuid.New().String() app.Status.LastSubmissionAttemptTime = metav1.Now() + app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 defer func() { - app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 - - if returned_error == nil { + if submitErr == nil { app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateSubmitted, } app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1 } else { - logger.Error(returned_error, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + logger.Info("Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State, "error", submitErr) app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateFailedSubmission, - ErrorMessage: returned_error.Error(), + ErrorMessage: submitErr.Error(), } } r.recordSparkApplicationEvent(app) diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index b74a07819..a0aadd93f 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -29,11 +29,6 @@ import ( "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/pkg/common" - "sigs.k8s.io/controller-runtime/pkg/log" -) - -var ( - logger = log.Log.WithName("") ) // GetDriverPodName returns name of the driver pod of the given spark application. @@ -127,7 +122,6 @@ func TimeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) // Retry wait time is attempts*RetryInterval to do a linear backoff. interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) currentTime := time.Now() - logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval)) return interval - currentTime.Sub(lastAttemptTime.Time), nil }