diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 3b96a310a..7a707df00 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -172,6 +172,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{Requeue: true}, err } logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + defer logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace) // Check if the spark application is being deleted if !app.DeletionTimestamp.IsZero() { @@ -259,17 +260,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl. } app := old.DeepCopy() - if err := r.submitSparkApplication(app); err != nil { - logger.Error(err, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace) - app.Status = v1beta2.SparkApplicationStatus{ - AppState: v1beta2.ApplicationState{ - State: v1beta2.ApplicationStateFailedSubmission, - ErrorMessage: err.Error(), - }, - SubmissionAttempts: app.Status.SubmissionAttempts + 1, - LastSubmissionAttemptTime: metav1.Now(), - } - } + _ = r.submitSparkApplication(app) if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err } @@ -315,6 +306,9 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName + + var result ctrl.Result + retryErr := retry.RetryOnConflict( retry.DefaultRetry, func() error { @@ -328,15 +322,22 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte app := old.DeepCopy() if util.ShouldRetry(app) { - if isNextRetryDue(app) { + timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app) + if err != nil { + return err + } + if timeUntilNextRetryDue <= 0 { if r.validateSparkResourceDeletion(ctx, app) { _ = r.submitSparkApplication(app) } else { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) } - return err + return fmt.Errorf("resources associated with SparkApplication name: %s namespace: %s, needed to be deleted", app.Name, app.Namespace) } + } else { + // If we're waiting before retrying then reconcile will not modify anything, so we need to requeue. + result.RequeueAfter = timeUntilNextRetryDue } } else { app.Status.AppState.State = v1beta2.ApplicationStateFailed @@ -352,9 +353,9 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte ) if retryErr != nil { logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) - return ctrl.Result{}, retryErr + return result, retryErr } - return ctrl.Result{}, nil + return result, nil } func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -408,9 +409,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context, logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) r.recordSparkApplicationEvent(app) r.resetSparkApplicationStatus(app) - if err = r.submitSparkApplication(app); err != nil { - logger.Error(err, "Failed to run spark-submit", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - } + _ = r.submitSparkApplication(app) } if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err @@ -497,6 +496,9 @@ func (r *Reconciler) reconcileSucceedingSparkApplication(ctx context.Context, re func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName + + var result ctrl.Result + retryErr := retry.RetryOnConflict( retry.DefaultRetry, func() error { @@ -510,12 +512,19 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c app := old.DeepCopy() if util.ShouldRetry(app) { - if isNextRetryDue(app) { + timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app) + if err != nil { + return err + } + if timeUntilNextRetryDue <= 0 { if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace) return err } app.Status.AppState.State = v1beta2.ApplicationStatePendingRerun + } else { + // If we're waiting before retrying then reconcile will not modify anything, so we need to requeue. + result.RequeueAfter = timeUntilNextRetryDue } } else { app.Status.AppState.State = v1beta2.ApplicationStateFailed @@ -528,9 +537,9 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c ) if retryErr != nil { logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) - return ctrl.Result{}, retryErr + return result, retryErr } - return ctrl.Result{}, nil + return result, nil } func (r *Reconciler) reconcileCompletedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -632,8 +641,28 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa } // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. -func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error { +func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) { logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + // SubmissionID must be set before creating any resources to ensure all the resources are labeled. + app.Status.SubmissionID = uuid.New().String() + app.Status.LastSubmissionAttemptTime = metav1.Now() + app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 + + defer func() { + if submitErr == nil { + app.Status.AppState = v1beta2.ApplicationState{ + State: v1beta2.ApplicationStateSubmitted, + } + app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1 + } else { + logger.Info("Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State, "error", submitErr) + app.Status.AppState = v1beta2.ApplicationState{ + State: v1beta2.ApplicationStateFailedSubmission, + ErrorMessage: submitErr.Error(), + } + } + r.recordSparkApplicationEvent(app) + }() if util.PrometheusMonitoringEnabled(app) { logger.Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace) @@ -709,7 +738,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error driverPodName := util.GetDriverPodName(app) app.Status.DriverInfo.PodName = driverPodName - app.Status.SubmissionID = uuid.New().String() sparkSubmitArgs, err := buildSparkSubmitArgs(app) if err != nil { return fmt.Errorf("failed to build spark-submit arguments: %v", err) @@ -717,44 +745,12 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error // Try submitting the application by running spark-submit. logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs) - submitted, err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)) - if err != nil { - r.recordSparkApplicationEvent(app) + if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil { return fmt.Errorf("failed to run spark-submit: %v", err) } - if !submitted { - // The application may not have been submitted even if err == nil, e.g., when some - // state update caused an attempt to re-submit the application, in which case no - // error gets returned from runSparkSubmit. If this is the case, we simply return. - return nil - } - - app.Status.AppState = v1beta2.ApplicationState{ - State: v1beta2.ApplicationStateSubmitted, - } - app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 - app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1 - app.Status.LastSubmissionAttemptTime = metav1.Now() - r.recordSparkApplicationEvent(app) return nil } -// Helper func to determine if the next retry the SparkApplication is due now. -func isNextRetryDue(app *v1beta2.SparkApplication) bool { - retryInterval := app.Spec.RestartPolicy.OnFailureRetryInterval - attemptsDone := app.Status.SubmissionAttempts - lastEventTime := app.Status.LastSubmissionAttemptTime - if retryInterval == nil || lastEventTime.IsZero() || attemptsDone <= 0 { - return false - } - - // Retry if we have waited at-least equal to attempts*RetryInterval since we do a linear back-off. - interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) - currentTime := time.Now() - logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval)) - return currentTime.After(lastEventTime.Add(interval)) -} - // updateDriverState finds the driver pod of the application // and updates the driver state based on the current phase of the pod. func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkApplication) error { diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index a51bd2e4e..ce7cb9733 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -43,10 +43,10 @@ func newSubmission(args []string, app *v1beta2.SparkApplication) *submission { } } -func runSparkSubmit(submission *submission) (bool, error) { +func runSparkSubmit(submission *submission) error { sparkHome, present := os.LookupEnv(common.EnvSparkHome) if !present { - return false, fmt.Errorf("env %s is not specified", common.EnvSparkHome) + return fmt.Errorf("env %s is not specified", common.EnvSparkHome) } command := filepath.Join(sparkHome, "bin", "spark-submit") cmd := exec.Command(command, submission.args...) @@ -58,14 +58,14 @@ func runSparkSubmit(submission *submission) (bool, error) { } // The driver pod of the application already exists. if strings.Contains(errorMsg, common.ErrorCodePodAlreadyExists) { - return false, fmt.Errorf("driver pod already exist") + return fmt.Errorf("driver pod already exist") } if errorMsg != "" { - return false, fmt.Errorf("failed to run spark-submit: %s", errorMsg) + return fmt.Errorf("failed to run spark-submit: %s", errorMsg) } - return false, fmt.Errorf("failed to run spark-submit: %v", err) + return fmt.Errorf("failed to run spark-submit: %v", err) } - return true, nil + return nil } // buildSparkSubmitArgs builds the arguments for spark-submit. diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index 29b8dab81..a0aadd93f 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -104,6 +104,27 @@ func ShouldRetry(app *v1beta2.SparkApplication) bool { return false } +func TimeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) { + var retryInterval *int64 + switch app.Status.AppState.State { + case v1beta2.ApplicationStateFailedSubmission: + retryInterval = app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval + case v1beta2.ApplicationStateFailing: + retryInterval = app.Spec.RestartPolicy.OnFailureRetryInterval + } + + attemptsDone := app.Status.SubmissionAttempts + lastAttemptTime := app.Status.LastSubmissionAttemptTime + if retryInterval == nil || lastAttemptTime.IsZero() || attemptsDone <= 0 { + return -1, fmt.Errorf("invalid retry interval (%v), last attempt time (%v) or attemptsDone (%v)", retryInterval, lastAttemptTime, attemptsDone) + } + + // Retry wait time is attempts*RetryInterval to do a linear backoff. + interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone) + currentTime := time.Now() + return interval - currentTime.Sub(lastAttemptTime.Time), nil +} + func GetLocalVolumes(app *v1beta2.SparkApplication) map[string]corev1.Volume { volumes := make(map[string]corev1.Volume) for _, volume := range app.Spec.Volumes { diff --git a/test/e2e/bad_examples/fail-application.yaml b/test/e2e/bad_examples/fail-application.yaml new file mode 100644 index 000000000..05991fd72 --- /dev/null +++ b/test/e2e/bad_examples/fail-application.yaml @@ -0,0 +1,44 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: fail-submission + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.2 + imagePullPolicy: IfNotPresent + mainClass: non-existent + mainApplicationFile: local:///non-existent.jar + sparkVersion: 3.5.2 + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 1 + driver: + labels: + version: 3.5.2 + cores: 1 + memory: 512m + serviceAccount: spark-operator-spark + executor: + labels: + version: 3.5.2 + instances: 1 + cores: 1 + memory: 512m diff --git a/test/e2e/bad_examples/fail-submission.yaml b/test/e2e/bad_examples/fail-submission.yaml new file mode 100644 index 000000000..827fca741 --- /dev/null +++ b/test/e2e/bad_examples/fail-submission.yaml @@ -0,0 +1,44 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: fail-submission + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.2 + imagePullPolicy: IfNotPresent + mainClass: dummy + mainApplicationFile: local:///dummy.jar + sparkVersion: 3.5.2 + restartPolicy: + type: OnFailure + onSubmissionFailureRetries: 3 + onSubmissionFailureRetryInterval: 1 + driver: + labels: + version: 3.5.2 + cores: 1 + memory: 512m + serviceAccount: non-existent # This is the important part that causes submission to fail. + executor: + labels: + version: 3.5.2 + instances: 1 + cores: 1 + memory: 512m diff --git a/test/e2e/sparkapplication_test.go b/test/e2e/sparkapplication_test.go index 113326cea..825129df0 100644 --- a/test/e2e/sparkapplication_test.go +++ b/test/e2e/sparkapplication_test.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -236,6 +237,122 @@ var _ = Describe("Example SparkApplication", func() { }) }) + Context("fail-submission", func() { + ctx := context.Background() + path := filepath.Join("bad_examples", "fail-submission.yaml") + app := &v1beta2.SparkApplication{} + + BeforeEach(func() { + By("Parsing SparkApplication from file") + file, err := os.Open(path) + Expect(err).NotTo(HaveOccurred()) + Expect(file).NotTo(BeNil()) + + decoder := yaml.NewYAMLOrJSONDecoder(file, 100) + Expect(decoder).NotTo(BeNil()) + Expect(decoder.Decode(app)).NotTo(HaveOccurred()) + + By("Creating SparkApplication") + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + }) + + AfterEach(func() { + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + + It("Fails submission and retries until retries are exhausted", func() { + By("Waiting for SparkApplication to terminate") + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + apps, polling_err := collectSparkApplicationsUntilTermination(ctx, key) + Expect(polling_err).To(HaveOccurred()) + + By("Should eventually fail") + finalApp := apps[len(apps)-1] + Expect(finalApp.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) + Expect(finalApp.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit")) + Expect(finalApp.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1)) + + By("Only valid statuses appear in other apps") + validStatuses := []v1beta2.ApplicationStateType{ + v1beta2.ApplicationStateNew, + v1beta2.ApplicationStateFailedSubmission, + } + for _, app := range apps[:len(apps)-1] { + Expect(validStatuses).To(ContainElement(app.Status.AppState.State)) + } + + By("Checking driver does not exist") + driverPodName := util.GetDriverPodName(app) + driverPodKey := types.NamespacedName{Namespace: app.Namespace, Name: driverPodName} + err := k8sClient.Get(ctx, driverPodKey, &corev1.Pod{}) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) + }) + + Context("application-fails", func() { + ctx := context.Background() + path := filepath.Join("bad_examples", "fail-application.yaml") + app := &v1beta2.SparkApplication{} + + BeforeEach(func() { + By("Parsing SparkApplication from file") + file, err := os.Open(path) + Expect(err).NotTo(HaveOccurred()) + Expect(file).NotTo(BeNil()) + + decoder := yaml.NewYAMLOrJSONDecoder(file, 100) + Expect(decoder).NotTo(BeNil()) + Expect(decoder.Decode(app)).NotTo(HaveOccurred()) + + By("Creating SparkApplication") + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + }) + + AfterEach(func() { + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + + It("Application fails and retries until retries are exhausted", func() { + By("Waiting for SparkApplication to terminate") + key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name} + apps, polling_err := collectSparkApplicationsUntilTermination(ctx, key) + Expect(polling_err).To(HaveOccurred()) + + By("Should eventually fail") + final_app := apps[len(apps)-1] + Expect(final_app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed)) + Expect(final_app.Status.AppState.ErrorMessage).To(ContainSubstring("driver container failed")) + Expect(final_app.Status.ExecutionAttempts).To(Equal(*app.Spec.RestartPolicy.OnFailureRetries + 1)) + + By("Only valid statuses appear in other apps") + validStatuses := []v1beta2.ApplicationStateType{ + v1beta2.ApplicationStateNew, + v1beta2.ApplicationStateSubmitted, + v1beta2.ApplicationStateRunning, + v1beta2.ApplicationStateFailing, + v1beta2.ApplicationStatePendingRerun, + } + for _, app := range apps[:len(apps)-1] { + Expect(validStatuses).To(ContainElement(app.Status.AppState.State)) + } + + By("Checking out driver logs") + driverPodName := util.GetDriverPodName(app) + bytes, err := clientset.CoreV1().Pods(app.Namespace).GetLogs(driverPodName, &corev1.PodLogOptions{}).Do(ctx).Raw() + Expect(err).NotTo(HaveOccurred()) + Expect(bytes).NotTo(BeEmpty()) + Expect(strings.Contains(string(bytes), "NoSuchFileException")).To(BeTrue()) + }) + }) + Context("spark-pi-python", func() { ctx := context.Background() path := filepath.Join("..", "..", "examples", "spark-pi-python.yaml") diff --git a/test/e2e/suit_test.go b/test/e2e/suit_test.go index 92167082a..e9c534d94 100644 --- a/test/e2e/suit_test.go +++ b/test/e2e/suit_test.go @@ -271,3 +271,26 @@ func waitForSparkApplicationCompleted(ctx context.Context, key types.NamespacedN }) return err } + +func collectSparkApplicationsUntilTermination(ctx context.Context, key types.NamespacedName) ([]v1beta2.SparkApplication, error) { + cancelCtx, cancelFunc := context.WithTimeout(ctx, WaitTimeout) + defer cancelFunc() + + apps := []v1beta2.SparkApplication{} + + err := wait.PollUntilContextCancel(cancelCtx, PollInterval, true, func(ctx context.Context) (bool, error) { + app := v1beta2.SparkApplication{} + if err := k8sClient.Get(ctx, key, &app); err != nil { + return false, err + } + apps = append(apps, app) + switch app.Status.AppState.State { + case v1beta2.ApplicationStateFailed: + return true, errors.New(app.Status.AppState.ErrorMessage) + case v1beta2.ApplicationStateCompleted: + return true, nil + } + return false, nil + }) + return apps, err +}